You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2012/02/14 23:55:01 UTC

svn commit: r1244256 - in /hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-...

Author: mahadev
Date: Tue Feb 14 22:55:00 2012
New Revision: 1244256

URL: http://svn.apache.org/viewvc?rev=1244256&view=rev
Log:
MAPREDUCE-3858. Task attempt failure during commit results in task never completing. (Tom White via mahadev) - Merging r1244254 from trunk.

Modified:
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt?rev=1244256&r1=1244255&r2=1244256&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/CHANGES.txt Tue Feb 14 22:55:00 2012
@@ -747,6 +747,9 @@ Release 0.23.1 - 2012-02-08 
     MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
     still can recover successfully after MAPREDUCE-3846. (vinodkv)
 
+    MAPREDUCE-3858. Task attempt failure during commit results in task never completing.
+    (Tom White via mahadev)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1244256&r1=1244255&r2=1244256&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Feb 14 22:55:00 2012
@@ -832,6 +832,9 @@ public abstract class TaskImpl implement
     public TaskState transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
+        task.commitAttempt = null;
+      }
       TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
       if (attempt.getAssignedContainerMgrAddress() != null) {
         //container was assigned
@@ -877,6 +880,7 @@ public abstract class TaskImpl implement
 
     protected void unSucceed(TaskImpl task) {
       ++task.numberUncompletedAttempts;
+      task.commitAttempt = null;
       task.successfulAttempt = null;
     }
   }

Modified: hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1244256&r1=1244255&r2=1244256&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Feb 14 22:55:00 2012
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -261,6 +263,12 @@ public class TestTaskImpl {
     assertTaskRunningState();    
   }
   
+  private void commitTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskTAttemptEvent(attemptId, 
+        TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+    assertTaskRunningState();    
+  }
+  
   private MockTaskAttemptImpl getLastAttempt() {
     return taskAttempts.get(taskAttempts.size()-1);
   }
@@ -279,32 +287,45 @@ public class TestTaskImpl {
     assertTaskRunningState();  
   }
   
+  private void failRunningTaskAttempt(TaskAttemptId attemptId) {
+    mockTask.handle(new TaskTAttemptEvent(attemptId, 
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertTaskRunningState();
+  }
+  
   /**
    * {@link TaskState#NEW}
    */
   private void assertTaskNewState() {
-    assertEquals(mockTask.getState(), TaskState.NEW);
+    assertEquals(TaskState.NEW, mockTask.getState());
   }
   
   /**
    * {@link TaskState#SCHEDULED}
    */
   private void assertTaskScheduledState() {
-    assertEquals(mockTask.getState(), TaskState.SCHEDULED);
+    assertEquals(TaskState.SCHEDULED, mockTask.getState());
   }
 
   /**
    * {@link TaskState#RUNNING}
    */
   private void assertTaskRunningState() {
-    assertEquals(mockTask.getState(), TaskState.RUNNING);        
+    assertEquals(TaskState.RUNNING, mockTask.getState());
   }
     
   /**
    * {@link TaskState#KILL_WAIT}
    */
   private void assertTaskKillWaitState() {
-    assertEquals(mockTask.getState(), TaskState.KILL_WAIT);
+    assertEquals(TaskState.KILL_WAIT, mockTask.getState());
+  }
+  
+  /**
+   * {@link TaskState#SUCCEEDED}
+   */
+  private void assertTaskSucceededState() {
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
   }
   
   @Test
@@ -409,5 +430,32 @@ public class TestTaskImpl {
     assert(mockTask.getProgress() == progress);
         
   }
+  
+  @Test
+  public void testFailureDuringTaskAttemptCommit() {
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+
+    // During the task attempt commit there is an exception which causes
+    // the attempt to fail
+    updateLastAttemptState(TaskAttemptState.FAILED);
+    failRunningTaskAttempt(getLastAttempt().getAttemptId());
+
+    assertEquals(2, taskAttempts.size());
+    updateLastAttemptState(TaskAttemptState.SUCCEEDED);
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), 
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    
+    assertFalse("First attempt should not commit",
+        mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
+    assertTrue("Second attempt should commit",
+        mockTask.canCommit(getLastAttempt().getAttemptId()));
+
+    assertTaskSucceededState();
+  }
 
 }