You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/07/10 18:12:17 UTC

svn commit: r1359750 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...

Author: bobby
Date: Tue Jul 10 16:12:17 2012
New Revision: 1359750

URL: http://svn.apache.org/viewvc?rev=1359750&view=rev
Log:
svn merge -c 1359747. FIXES: MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via bobby)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1359750&r1=1359749&r2=1359750&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Jul 10 16:12:17 2012
@@ -310,6 +310,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-3728. ShuffleHandler can't access results when configured in a
     secure mode (ahmed via tucu)
 
+    MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via
+    bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1359750&r1=1359749&r2=1359750&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Jul 10 16:12:17 2012
@@ -189,7 +189,7 @@ public abstract class TaskImpl implement
 
     // Transitions from SUCCEEDED state
     .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
-        EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
+        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
     // Ignore-able transitions.
     .addTransition(
@@ -617,7 +617,7 @@ public abstract class TaskImpl implement
     }
   }
 
-  private void internalError(TaskEventType type) {
+  protected void internalError(TaskEventType type) {
     LOG.error("Invalid event " + type + " on Task " + this.taskId);
     eventHandler.handle(new JobDiagnosticsUpdateEvent(
         this.taskId.getJobId(), "Invalid event " + type + 
@@ -893,6 +893,16 @@ public abstract class TaskImpl implement
 
     @Override
     public TaskState transition(TaskImpl task, TaskEvent event) {
+      if (event instanceof TaskTAttemptEvent) {
+        TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+        if (task.getState() == TaskState.SUCCEEDED &&
+            !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+          // don't allow a different task attempt to override a previous
+          // succeeded state
+          return TaskState.SUCCEEDED;
+        }
+      }
+      
       //verify that this occurs only for map task
       //TODO: consider moving it to MapTaskImpl
       if (!TaskType.MAP.equals(task.getType())) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1359750&r1=1359749&r2=1359750&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Jul 10 16:12:17 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -127,8 +128,13 @@ public class TestTaskImpl {
     @Override
     protected int getMaxAttempts() {
       return 100;
-    }    
-    
+    }
+
+    @Override
+    protected void internalError(TaskEventType type) {
+      super.internalError(type);
+      fail("Internal error: " + type);
+    }
   }
   
   private class MockTaskAttemptImpl extends TaskAttemptImpl {
@@ -462,5 +468,32 @@ public class TestTaskImpl {
 
     assertTaskSucceededState();
   }
+  
+  @Test
+  public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt that succeeds
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    
+    // The task should now have succeeded
+    assertTaskSucceededState();
+    
+    // Now fail the first task attempt, after the second has succeeded
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), 
+        TaskEventType.T_ATTEMPT_FAILED));
+    
+    // The task should still be in the succeeded state
+    assertTaskSucceededState();
+    
+  }
 
 }