You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/09/17 23:38:21 UTC

hadoop git commit: MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li

Repository: hadoop
Updated Branches:
  refs/heads/trunk 9eee97508 -> ee4ee6af6


MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ee4ee6af
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ee4ee6af
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ee4ee6af

Branch: refs/heads/trunk
Commit: ee4ee6af6a5a6299d27462adb6944206039bbbae
Parents: 9eee975
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 17 21:37:39 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 17 21:37:39 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../v2/app/job/impl/TaskAttemptImpl.java        |  92 +++++------
 .../apache/hadoop/mapreduce/v2/app/MRApp.java   |  11 +-
 .../v2/app/job/impl/TestTaskAttempt.java        | 154 +++++++++++++++++++
 4 files changed, 213 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 6cf7abb..cd84a34 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -573,6 +573,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-5002. AM could potentially allocate a reduce container to a map
     attempt (Chang Li via jlowe)
 
+    MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can
+    disappear (Chang Li via jlowe)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 77a7555..a7becdb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -1484,6 +1484,19 @@ public abstract class TaskAttemptImpl implements
     return tauce;
   }
 
+  private static void
+      sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) {
+    TaskAttemptContainerLaunchedEvent event;
+    taskAttempt.launchTime = taskAttempt.clock.getTime();
+
+    InetSocketAddress nodeHttpInetAddr =
+        NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
+    taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
+    taskAttempt.httpPort = nodeHttpInetAddr.getPort();
+    taskAttempt.sendLaunchedEvents();
+  }
+
+
   @SuppressWarnings("unchecked")
   private void sendLaunchedEvents() {
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
@@ -1681,6 +1694,9 @@ public abstract class TaskAttemptImpl implements
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
+      if (taskAttempt.getLaunchTime() == 0) {
+        sendJHStartEventForAssignedFailTask(taskAttempt);
+      }
       //set the finish time
       taskAttempt.setFinishTime();
 
@@ -1715,23 +1731,19 @@ public abstract class TaskAttemptImpl implements
         default:
           LOG.error("Task final state is not FAILED or KILLED: " + finalState);
       }
-      if (taskAttempt.getLaunchTime() != 0) {
-        TaskAttemptUnsuccessfulCompletionEvent tauce =
-            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                finalState);
-        if(finalState == TaskAttemptStateInternal.FAILED) {
-          taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
-        } else if(finalState == TaskAttemptStateInternal.KILLED) {
-          taskAttempt.eventHandler
-          .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
-        }
-        taskAttempt.eventHandler.handle(new JobHistoryEvent(
-            taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-      } else {
-        LOG.debug("Not generating HistoryFinish event since start event not " +
-            "generated for taskAttempt: " + taskAttempt.getID());
+
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+              finalState);
+      if(finalState == TaskAttemptStateInternal.FAILED) {
+        taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+      } else if(finalState == TaskAttemptStateInternal.KILLED) {
+        taskAttempt.eventHandler
+        .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
       }
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(
+          taskAttempt.attemptId.getTaskId().getJobId(), tauce));
     }
   }
 
@@ -2023,27 +2035,25 @@ public abstract class TaskAttemptImpl implements
     @Override
     public void transition(TaskAttemptImpl taskAttempt,
         TaskAttemptEvent event) {
+      if (taskAttempt.getLaunchTime() == 0) {
+        sendJHStartEventForAssignedFailTask(taskAttempt);
+      }
       //set the finish time
       taskAttempt.setFinishTime();
-      if (taskAttempt.getLaunchTime() != 0) {
-        taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
-        TaskAttemptUnsuccessfulCompletionEvent tauce =
-            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptStateInternal.KILLED);
-        taskAttempt.eventHandler.handle(new JobHistoryEvent(
-            taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-      }else {
-        LOG.debug("Not generating HistoryFinish event since start event not " +
-            "generated for taskAttempt: " + taskAttempt.getID());
-      }
+
+      taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+              TaskAttemptStateInternal.KILLED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(
+          taskAttempt.attemptId.getTaskId().getJobId(), tauce));
 
       if (event instanceof TaskAttemptKillEvent) {
         taskAttempt.addDiagnosticInfo(
             ((TaskAttemptKillEvent) event).getMessage());
       }
 
-//      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_KILLED));
@@ -2178,23 +2188,19 @@ public abstract class TaskAttemptImpl implements
 
   @SuppressWarnings("unchecked")
   private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
+    if (taskAttempt.getLaunchTime() == 0) {
+      sendJHStartEventForAssignedFailTask(taskAttempt);
+    }
     // set the finish time
     taskAttempt.setFinishTime();
+    taskAttempt.eventHandler
+        .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+    TaskAttemptUnsuccessfulCompletionEvent tauce =
+        createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+            TaskAttemptStateInternal.FAILED);
+    taskAttempt.eventHandler.handle(new JobHistoryEvent(
+        taskAttempt.attemptId.getTaskId().getJobId(), tauce));
 
-    if (taskAttempt.getLaunchTime() != 0) {
-      taskAttempt.eventHandler
-          .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
-      TaskAttemptUnsuccessfulCompletionEvent tauce =
-          createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-              TaskAttemptStateInternal.FAILED);
-      taskAttempt.eventHandler.handle(new JobHistoryEvent(
-          taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-      // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
-      // handling failed map/reduce events.
-    }else {
-      LOG.debug("Not generating HistoryFinish event since start event not " +
-          "generated for taskAttempt: " + taskAttempt.getID());
-    }
     taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
         taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index b51adf2..f0c10d3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -544,10 +544,7 @@ public class MRApp extends MRAppMaster {
     public void handle(ContainerLauncherEvent event) {
       switch (event.getType()) {
       case CONTAINER_REMOTE_LAUNCH:
-        getContext().getEventHandler().handle(
-            new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
-                shufflePort));
-        
+        containerLaunched(event.getTaskAttemptID(), shufflePort);
         attemptLaunched(event.getTaskAttemptID());
         break;
       case CONTAINER_REMOTE_CLEANUP:
@@ -561,6 +558,12 @@ public class MRApp extends MRAppMaster {
     }
   }
 
+  protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) {
+    getContext().getEventHandler().handle(
+      new TaskAttemptContainerLaunchedEvent(attemptID,
+          shufflePort));
+  }
+
   protected void attemptLaunched(TaskAttemptId attemptID) {
     if (autoComplete) {
       // send the done event

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee4ee6af/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index a88a935..6b4656a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -115,6 +115,69 @@ public class TestTaskAttempt{
   }
 
   @Test
+  public void testMRAppHistoryForTAFailedInAssigned() throws Exception {
+    // test TA_CONTAINER_LAUNCH_FAILED for map
+    FailingAttemptsDuringAssignedMRApp app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_CONTAINER_LAUNCH_FAILED for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_CONTAINER_COMPLETED for map
+    app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_CONTAINER_COMPLETED for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_FAILMSG for map
+    app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_FAILMSG);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_FAILMSG for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_FAILMSG);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_FAILMSG_BY_CLIENT for map
+    app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_FAILMSG_BY_CLIENT);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_FAILMSG_BY_CLIENT for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_FAILMSG_BY_CLIENT);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_KILL for map
+    app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_KILL);
+    testTaskAttemptAssignedKilledHistory(app);
+
+    // test TA_KILL for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_KILL);
+    testTaskAttemptAssignedKilledHistory(app);
+  }
+
+  @Test
   public void testSingleRackRequest() throws Exception {
     TaskAttemptImpl.RequestContainerTransition rct =
         new TaskAttemptImpl.RequestContainerTransition(false);
@@ -301,6 +364,31 @@ public class TestTaskAttempt{
         report.getTaskAttemptState());
   }
 
+  private void testTaskAttemptAssignedFailHistory
+      (FailingAttemptsDuringAssignedMRApp app) throws Exception {
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
+    Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent());
+  }
+
+  private void testTaskAttemptAssignedKilledHistory
+      (FailingAttemptsDuringAssignedMRApp app) throws Exception {
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskId, Task> tasks = job.getTasks();
+    Task task = tasks.values().iterator().next();
+    app.waitForState(task, TaskState.SCHEDULED);
+    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
+    TaskAttempt attempt = attempts.values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.KILLED);
+    Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
+    Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent());
+  }
+
   static class FailingAttemptsMRApp extends MRApp {
     FailingAttemptsMRApp(int maps, int reduces) {
       super(maps, reduces, true, "FailingAttemptsMRApp", true);
@@ -331,6 +419,72 @@ public class TestTaskAttempt{
     }
   }
 
+  static class FailingAttemptsDuringAssignedMRApp extends MRApp {
+    FailingAttemptsDuringAssignedMRApp(int maps, int reduces,
+        TaskAttemptEventType event) {
+      super(maps, reduces, true, "FailingAttemptsMRApp", true);
+      sendFailEvent = event;
+    }
+
+   TaskAttemptEventType sendFailEvent;
+
+   @Override
+    protected void containerLaunched(TaskAttemptId attemptID,
+        int shufflePort) {
+      //do nothing, not send TA_CONTAINER_LAUNCHED event
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(attemptID, sendFailEvent));
+    }
+
+    private boolean receiveTaStartJHEvent = false;
+    private boolean receiveTaFailedJHEvent = false;
+    private boolean receiveTaKilledJHEvent = false;
+
+    public boolean getTaStartJHEvent(){
+      return receiveTaStartJHEvent;
+    }
+
+    public boolean getTaFailedJHEvent(){
+      return receiveTaFailedJHEvent;
+    }
+
+    public boolean getTaKilledJHEvent(){
+        return receiveTaKilledJHEvent;
+    }
+
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      return new EventHandler<JobHistoryEvent>() {
+        @Override
+        public void handle(JobHistoryEvent event) {
+          if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.
+              EventType.MAP_ATTEMPT_FAILED) {
+            receiveTaFailedJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+              jobhistory.EventType.MAP_ATTEMPT_KILLED) {
+            receiveTaKilledJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+              jobhistory.EventType.MAP_ATTEMPT_STARTED) {
+            receiveTaStartJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+              jobhistory.EventType.REDUCE_ATTEMPT_FAILED) {
+            receiveTaFailedJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+                  jobhistory.EventType.REDUCE_ATTEMPT_KILLED) {
+            receiveTaKilledJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+              jobhistory.EventType.REDUCE_ATTEMPT_STARTED) {
+            receiveTaStartJHEvent = true;
+          }
+        }
+      };
+    }
+  }
+
   @Test
   public void testLaunchFailedWhileKilling() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);