You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2013/12/20 20:57:17 UTC
svn commit: r1552799 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobcl...
Author: sandy
Date: Fri Dec 20 19:57:16 2013
New Revision: 1552799
URL: http://svn.apache.org/r1552799
Log:
MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due to speculative execution (Gera Shegalov via Sandy Ryza)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1552799&r1=1552798&r2=1552799&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Dec 20 19:57:16 2013
@@ -44,6 +44,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5052. Job History UI and web services confusing job start time and
job submit time (Chen He via jeagles)
+ MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due
+ to speculative execution (Gera Shegalov via Sandy Ryza)
+
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1552799&r1=1552798&r2=1552799&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Dec 20 19:57:16 2013
@@ -1552,6 +1552,12 @@ public abstract class TaskAttemptImpl im
TaskAttemptEvent event) {
//set the finish time
taskAttempt.setFinishTime();
+
+ if (event instanceof TaskAttemptKillEvent) {
+ taskAttempt.addDiagnosticInfo(
+ ((TaskAttemptKillEvent) event).getMessage());
+ }
+
//send the deallocate event to ContainerAllocator
taskAttempt.eventHandler.handle(
new ContainerAllocatorEvent(taskAttempt.attemptId,
@@ -1855,6 +1861,12 @@ public abstract class TaskAttemptImpl im
LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID());
}
+
+ if (event instanceof TaskAttemptKillEvent) {
+ taskAttempt.addDiagnosticInfo(
+ ((TaskAttemptKillEvent) event).getMessage());
+ }
+
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
@@ -1872,6 +1884,12 @@ public abstract class TaskAttemptImpl im
// for it
taskAttempt.taskAttemptListener.unregister(
taskAttempt.attemptId, taskAttempt.jvmID);
+
+ if (event instanceof TaskAttemptKillEvent) {
+ taskAttempt.addDiagnosticInfo(
+ ((TaskAttemptKillEvent) event).getMessage());
+ }
+
taskAttempt.reportedStatus.progress = 1.0f;
taskAttempt.updateProgressSplits();
//send the cleanup event to containerLauncher
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1552799&r1=1552798&r2=1552799&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Dec 20 19:57:16 2013
@@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -100,6 +101,7 @@ import com.google.common.annotations.Vis
public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+ private static final String SPECULATION = "Speculation: ";
protected final JobConf conf;
protected final Path jobFile;
@@ -906,8 +908,8 @@ public abstract class TaskImpl implement
LOG.info(task.commitAttempt
+ " already given a go for committing the task output, so killing "
+ attemptID);
- task.eventHandler.handle(new TaskAttemptEvent(
- attemptID, TaskAttemptEventType.TA_KILL));
+ task.eventHandler.handle(new TaskAttemptKillEvent(attemptID,
+ SPECULATION + task.commitAttempt + " committed first!"));
}
}
}
@@ -932,9 +934,8 @@ public abstract class TaskImpl implement
// other reasons.
!attempt.isFinished()) {
LOG.info("Issuing kill to other attempt " + attempt.getID());
- task.eventHandler.handle(
- new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_KILL));
+ task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(),
+ SPECULATION + task.successfulAttempt + " succeeded first!"));
}
}
task.finished(TaskStateInternal.SUCCEEDED);
@@ -1199,8 +1200,7 @@ public abstract class TaskImpl implement
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
if (attempt != null && !attempt.isFinished()) {
eventHandler.handle(
- new TaskAttemptEvent(attempt.getID(),
- TaskAttemptEventType.TA_KILL));
+ new TaskAttemptKillEvent(attempt.getID(), logMsg));
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java?rev=1552799&r1=1552798&r2=1552799&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java Fri Dec 20 19:57:16 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2;
+import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
@@ -106,17 +107,21 @@ public class TestSpeculativeExecutionWit
int maxTimeWait = 10;
boolean successfullySpeculated = false;
+ TaskAttempt[] ta = null;
while (maxTimeWait > 0 && !successfullySpeculated) {
if (taskToBeSpeculated.getAttempts().size() != 2) {
Thread.sleep(1000);
clock.setTime(System.currentTimeMillis() + 20000);
} else {
successfullySpeculated = true;
+ // finish 1st TA, 2nd will be killed
+ ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
}
maxTimeWait--;
}
Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated);
+ verifySpeculationMessage(app, ta);
}
@Test(timeout = 60000)
@@ -197,16 +202,47 @@ public class TestSpeculativeExecutionWit
int maxTimeWait = 5;
boolean successfullySpeculated = false;
+ TaskAttempt[] ta = null;
while (maxTimeWait > 0 && !successfullySpeculated) {
if (speculatedTask.getAttempts().size() != 2) {
Thread.sleep(1000);
} else {
successfullySpeculated = true;
+ ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
}
maxTimeWait--;
}
Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated);
+ verifySpeculationMessage(app, ta);
+ }
+
+ private static TaskAttempt[] makeFirstAttemptWin(
+ EventHandler appEventHandler, Task speculatedTask) {
+
+ // finish 1st TA, 2nd will be killed
+ Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
+ TaskAttempt[] ta = new TaskAttempt[attempts.size()];
+ attempts.toArray(ta);
+ appEventHandler.handle(
+ new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
+ appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ return ta;
+ }
+
+ private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
+ throws Exception {
+ app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
+ app.waitForState(ta[1], TaskAttemptState.KILLED);
+ boolean foundSpecMsg = false;
+ for (String msg : ta[1].getDiagnostics()) {
+ if (msg.contains("Speculation")) {
+ foundSpecMsg = true;
+ break;
+ }
+ }
+ Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
}
private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,