You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/04/11 20:20:19 UTC

svn commit: r1324904 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...

Author: sseth
Date: Wed Apr 11 18:20:19 2012
New Revision: 1324904

URL: http://svn.apache.org/viewvc?rev=1324904&view=rev
Log:
MAPREDUCE-3932. Fix the TaskAttempt state machine to handle CONTIANER_LAUNCHED and CONTIANER_LAUNCH_FAILED events in additional states. (Contributed by Robert Joseph Evans)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1324904&r1=1324903&r2=1324904&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Apr 11 18:20:19 2012
@@ -80,6 +80,10 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory
     after the history service is stopped. (Jason Lowe via sseth)
 
+    MAPREDUCE-3932. Fix the TaskAttempt state machine to handle
+    CONTIANER_LAUNCHED and CONTIANER_LAUNCH_FAILED events in additional
+    states. (Robert Joseph Evans via sseth)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1324904&r1=1324903&r2=1324904&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Wed Apr 11 18:20:19 2012
@@ -347,6 +347,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.INTERNAL_ERROR))
+          .addTransition(JobState.ERROR, JobState.ERROR,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
           // create the topology tables
           .installTopology();
  

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1324904&r1=1324903&r2=1324904&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed Apr 11 18:20:19 2012
@@ -316,7 +316,9 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
              TaskAttemptEventType.TA_COMMIT_PENDING,
+             // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+             TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_TIMED_OUT))
@@ -338,6 +340,7 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_UPDATE,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+             TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_TIMED_OUT))
@@ -359,7 +362,10 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_UPDATE,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
-             TaskAttemptEventType.TA_FAILMSG))
+             TaskAttemptEventType.TA_FAILMSG,
+             // Container launch events can arrive late
+             TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+             TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
 
      // Transitions from KILL_TASK_CLEANUP
      .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
@@ -377,7 +383,10 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_UPDATE,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
-             TaskAttemptEventType.TA_FAILMSG))
+             TaskAttemptEventType.TA_FAILMSG,
+             // Container launch events can arrive late
+             TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+             TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
 
       // Transitions from SUCCEEDED
      .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
@@ -405,7 +414,9 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_ASSIGNED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
+             // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+             TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG))
@@ -420,7 +431,9 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_ASSIGNED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
+             // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+             TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG))

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1324904&r1=1324903&r2=1324904&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Wed Apr 11 18:20:19 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -68,6 +69,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -81,9 +85,12 @@ import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -91,12 +98,16 @@ import org.apache.hadoop.yarn.util.Build
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestTaskAttempt{
-
-  @SuppressWarnings("rawtypes")
   @Test
   public void testAttemptContainerRequest() throws Exception {
+    //WARNING: This test must run first.  This is because there is an 
+    // optimization where the credentials passed in are cached statically so 
+    // they do not need to be recomputed when creating a new 
+    // ContainerLaunchContext. if other tests run first this code will cache
+    // their credentials and this test will fail trying to look for the
+    // credentials it inserted in.
     final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
     final byte[] SECRET_KEY = ("secretkey").getBytes();
     Map<ApplicationAccessType, String> acls =
@@ -125,7 +136,7 @@ public class TestTaskAttempt{
     Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
         ("tokenid").getBytes(), ("tokenpw").getBytes(),
         new Text("tokenkind"), new Text("tokenservice"));
-
+    
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
             mock(TaskSplitMetaInfo.class), jobConf, taListener,
@@ -134,7 +145,7 @@ public class TestTaskAttempt{
 
     jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
     ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
-
+    
     ContainerLaunchContext launchCtx =
         TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
             jobConf, jobToken, taImpl.createRemoteTask(),
@@ -185,7 +196,6 @@ public class TestTaskAttempt{
     testMRAppHistory(app);
   }
 
-  @SuppressWarnings("rawtypes")
   @Test
   public void testSingleRackRequest() throws Exception {
     TaskAttemptImpl.RequestContainerTransition rct =
@@ -213,11 +223,10 @@ public class TestTaskAttempt{
     ContainerRequestEvent cre =
         (ContainerRequestEvent) arg.getAllValues().get(1);
     String[] requestedRacks = cre.getRacks();
-    //Only a single occurance of /DefaultRack
+    //Only a single occurrence of /DefaultRack
     assertEquals(1, requestedRacks.length);
   }
  
-  @SuppressWarnings("rawtypes")
   @Test
   public void testHostResolveAttempt() throws Exception {
     TaskAttemptImpl.RequestContainerTransition rct =
@@ -316,14 +325,12 @@ public class TestTaskAttempt{
             .getValue());
   }
   
-  @SuppressWarnings("rawtypes")
   private TaskAttemptImpl createMapTaskAttemptImplForTest(
       EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
     Clock clock = new SystemClock();
     return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
   }
   
-  @SuppressWarnings("rawtypes")
   private TaskAttemptImpl createMapTaskAttemptImplForTest(
       EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
     ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@@ -394,4 +401,67 @@ public class TestTaskAttempt{
       };
     }
   }
+  
+  @Test
+  public void testLaunchFailedWhileKilling() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = 
+      BuilderUtils.newApplicationAttemptId(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+    
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+    
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+    
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+    
+    TaskAttemptImpl taImpl =
+      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+          splits, jobConf, taListener,
+          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          new SystemClock(), null);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+    assertFalse(eventHandler.internalError);
+  }
+  
+  public static class MockEventHandler implements EventHandler {
+    public boolean internalError;
+    
+    @Override
+    public void handle(Event event) {
+      if (event instanceof JobEvent) {
+        JobEvent je = ((JobEvent) event);
+        if (JobEventType.INTERNAL_ERROR == je.getType()) {
+          internalError = true;
+        }
+      }
+    }
+    
+  };
 }