You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/12 18:19:59 UTC

svn commit: r1408361 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...

Author: bobby
Date: Mon Nov 12 17:19:58 2012
New Revision: 1408361

URL: http://svn.apache.org/viewvc?rev=1408361&view=rev
Log:
svn merge -c 1408360 FIXES: MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe via bobby)

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1408361&r1=1408360&r2=1408361&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Nov 12 17:19:58 2012
@@ -507,6 +507,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
 
     MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby)
+
+    MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe
+    via bobby)
  
 Release 0.23.4 - UNRELEASED
 

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1408361&r1=1408360&r2=1408361&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Mon Nov 12 17:19:58 2012
@@ -217,13 +217,15 @@ public abstract class TaskImpl implement
     .addTransition(TaskStateInternal.SUCCEEDED,
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
         TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
+    .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+        TaskEventType.T_ATTEMPT_SUCCEEDED,
+        new AttemptSucceededAtSucceededTransition())
     // Ignore-able transitions.
     .addTransition(
         TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
         EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
             TaskEventType.T_ATTEMPT_LAUNCHED,
-            TaskEventType.T_ATTEMPT_SUCCEEDED,
             TaskEventType.T_KILL))
 
     // Transitions from FAILED state        
@@ -971,6 +973,8 @@ public abstract class TaskImpl implement
           !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
         // don't allow a different task attempt to override a previous
         // succeeded state
+        task.finishedAttempts.add(castEvent.getTaskAttemptID());
+        task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
         return TaskStateInternal.SUCCEEDED;
       }
 
@@ -1013,6 +1017,8 @@ public abstract class TaskImpl implement
             !attemptId.equals(task.successfulAttempt)) {
           // don't allow a different task attempt to override a previous
           // succeeded state
+          task.finishedAttempts.add(castEvent.getTaskAttemptID());
+          task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
           return TaskStateInternal.SUCCEEDED;
         }
       }
@@ -1043,6 +1049,16 @@ public abstract class TaskImpl implement
     }
   }
 
+  private static class AttemptSucceededAtSucceededTransition
+    implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      task.finishedAttempts.add(castEvent.getTaskAttemptID());
+      task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
+    }
+  }
+
   private static class KillNewTransition 
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1408361&r1=1408360&r2=1408361&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Mon Nov 12 17:19:58 2012
@@ -141,7 +141,6 @@ public class TestTaskImpl {
 
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
-    private TaskAttemptId attemptId;
     private TaskType taskType;
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
@@ -152,14 +151,11 @@ public class TestTaskImpl {
         AppContext appContext, TaskType taskType) {
       super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
           dataLocations, committer, jobToken, credentials, clock, appContext);
-      attemptId = Records.newRecord(TaskAttemptId.class);
-      attemptId.setId(id);
-      attemptId.setTaskId(taskId);
       this.taskType = taskType;
     }
 
     public TaskAttemptId getAttemptId() {
-      return attemptId;
+      return getID();
     }
     
     @Override
@@ -561,4 +557,49 @@ public class TestTaskImpl {
     mockTask = createMockTask(TaskType.REDUCE);
     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
   }
+
+  @Test
+  public void testSpeculativeMapFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt killed
+    mockTask = createMockTask(TaskType.MAP);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
+
+  @Test
+  public void testSpeculativeMapMultipleSucceedFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt succeeds
+    mockTask = createMockTask(TaskType.MAP);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
+
+  @Test
+  public void testSpeculativeMapFailedFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt succeeds
+    mockTask = createMockTask(TaskType.MAP);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
 }