You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sa...@apache.org on 2013/12/20 20:57:17 UTC

svn commit: r1552799 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobcl...

Author: sandy
Date: Fri Dec 20 19:57:16 2013
New Revision: 1552799

URL: http://svn.apache.org/r1552799
Log:
MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due to speculative execution (Gera Shegalov via Sandy Ryza)

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1552799&r1=1552798&r2=1552799&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Dec 20 19:57:16 2013
@@ -44,6 +44,9 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5052. Job History UI and web services confusing job start time and
     job submit time (Chen He via jeagles)
 
+    MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due
+    to speculative execution (Gera Shegalov via Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1552799&r1=1552798&r2=1552799&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Dec 20 19:57:16 2013
@@ -1552,6 +1552,12 @@ public abstract class TaskAttemptImpl im
         TaskAttemptEvent event) {
       //set the finish time
       taskAttempt.setFinishTime();
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
       //send the deallocate event to ContainerAllocator
       taskAttempt.eventHandler.handle(
           new ContainerAllocatorEvent(taskAttempt.attemptId,
@@ -1855,6 +1861,12 @@ public abstract class TaskAttemptImpl im
         LOG.debug("Not generating HistoryFinish event since start event not " +
             "generated for taskAttempt: " + taskAttempt.getID());
       }
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
 //      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
@@ -1872,6 +1884,12 @@ public abstract class TaskAttemptImpl im
       // for it
       taskAttempt.taskAttemptListener.unregister(
           taskAttempt.attemptId, taskAttempt.jvmID);
+
+      if (event instanceof TaskAttemptKillEvent) {
+        taskAttempt.addDiagnosticInfo(
+            ((TaskAttemptKillEvent) event).getMessage());
+      }
+
       taskAttempt.reportedStatus.progress = 1.0f;
       taskAttempt.updateProgressSplits();
       //send the cleanup event to containerLauncher

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1552799&r1=1552798&r2=1552799&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Dec 20 19:57:16 2013
@@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@@ -100,6 +101,7 @@ import com.google.common.annotations.Vis
 public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+  private static final String SPECULATION = "Speculation: ";
 
   protected final JobConf conf;
   protected final Path jobFile;
@@ -906,8 +908,8 @@ public abstract class TaskImpl implement
         LOG.info(task.commitAttempt
             + " already given a go for committing the task output, so killing "
             + attemptID);
-        task.eventHandler.handle(new TaskAttemptEvent(
-            attemptID, TaskAttemptEventType.TA_KILL));
+        task.eventHandler.handle(new TaskAttemptKillEvent(attemptID,
+            SPECULATION + task.commitAttempt + " committed first!"));
       }
     }
   }
@@ -932,9 +934,8 @@ public abstract class TaskImpl implement
             //  other reasons.
             !attempt.isFinished()) {
           LOG.info("Issuing kill to other attempt " + attempt.getID());
-          task.eventHandler.handle(
-              new TaskAttemptEvent(attempt.getID(), 
-                  TaskAttemptEventType.TA_KILL));
+          task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(),
+              SPECULATION + task.successfulAttempt + " succeeded first!"));
         }
       }
       task.finished(TaskStateInternal.SUCCEEDED);
@@ -1199,8 +1200,7 @@ public abstract class TaskImpl implement
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
     if (attempt != null && !attempt.isFinished()) {
       eventHandler.handle(
-          new TaskAttemptEvent(attempt.getID(),
-              TaskAttemptEventType.TA_KILL));
+          new TaskAttemptKillEvent(attempt.getID(), logMsg));
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java?rev=1552799&r1=1552798&r2=1552799&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java Fri Dec 20 19:57:16 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
@@ -106,17 +107,21 @@ public class TestSpeculativeExecutionWit
 
     int maxTimeWait = 10;
     boolean successfullySpeculated = false;
+    TaskAttempt[] ta = null;
     while (maxTimeWait > 0 && !successfullySpeculated) {
       if (taskToBeSpeculated.getAttempts().size() != 2) {
         Thread.sleep(1000);
         clock.setTime(System.currentTimeMillis() + 20000);
       } else {
         successfullySpeculated = true;
+        // finish 1st TA, 2nd will be killed
+        ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
       }
       maxTimeWait--;
     }
     Assert
       .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+    verifySpeculationMessage(app, ta);
   }
 
   @Test(timeout = 60000)
@@ -197,16 +202,47 @@ public class TestSpeculativeExecutionWit
 
     int maxTimeWait = 5;
     boolean successfullySpeculated = false;
+    TaskAttempt[] ta = null;
     while (maxTimeWait > 0 && !successfullySpeculated) {
       if (speculatedTask.getAttempts().size() != 2) {
         Thread.sleep(1000);
       } else {
         successfullySpeculated = true;
+        ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
       }
       maxTimeWait--;
     }
     Assert
       .assertTrue("Couldn't speculate successfully", successfullySpeculated);
+    verifySpeculationMessage(app, ta);
+  }
+
+  private static TaskAttempt[] makeFirstAttemptWin(
+      EventHandler appEventHandler, Task speculatedTask) {
+
+    // finish 1st TA, 2nd will be killed
+    Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
+    TaskAttempt[] ta = new TaskAttempt[attempts.size()];
+    attempts.toArray(ta);
+    appEventHandler.handle(
+        new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
+    appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    return ta;
+  }
+
+  private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
+      throws Exception {
+    app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
+    app.waitForState(ta[1], TaskAttemptState.KILLED);
+    boolean foundSpecMsg = false;
+    for (String msg : ta[1].getDiagnostics()) {
+      if (msg.contains("Speculation")) {
+        foundSpecMsg = true;
+        break;
+      }
+    }
+    Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
   }
 
   private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,