You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/10/25 02:41:52 UTC

svn commit: r1188467 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...

Author: acmurthy
Date: Tue Oct 25 00:41:52 2011
New Revision: 1188467

URL: http://svn.apache.org/viewvc?rev=1188467&view=rev
Log:
Merge -c 1188388 from trunk to branch-0.23 to complete fix for MAPREDUCE-3252.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Oct 25 00:41:52 2011
@@ -1691,6 +1691,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via
     acmurthy) 
 
+    MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery. 
+    (vinodkv via acmurthy) 
+
     MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when
     map output fits in spill buffer. (todd)
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Oct 25 00:41:52 2011
@@ -568,7 +568,8 @@ public abstract class TaskImpl implement
     //raise the completion event only if the container is assigned
     // to nextAttemptNumber
     if (attempt.getNodeHttpAddress() != null) {
-      TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
+      TaskAttemptCompletionEvent tce = recordFactory
+          .newRecordInstance(TaskAttemptCompletionEvent.class);
       tce.setEventId(-1);
       tce.setMapOutputServerAddress("http://"
           + attempt.getNodeHttpAddress().split(":")[0] + ":"

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Oct 25 00:41:52 2011
@@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 /*
  * Recovers the completed tasks from the previous life of Application Master.
@@ -313,8 +314,8 @@ public class RecoveryService extends Com
         TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
             .getTaskAttemptID();
         TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
-        //TODO need to get the real port number MAPREDUCE-2666
-        actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1));
+        actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
+            attInfo.getShufflePort()));
         // send the status update event
         sendStatusUpdateEvent(aId, attInfo);
 
@@ -392,16 +393,15 @@ public class RecoveryService extends Com
         TaskAttemptInfo attemptInfo) {
       LOG.info("Sending assigned event to " + yarnAttemptID);
       ContainerId cId = attemptInfo.getContainerId();
-      Container container = recordFactory
-          .newRecordInstance(Container.class);
-      container.setId(cId);
-      container.setNodeId(recordFactory
-          .newRecordInstance(NodeId.class));
-      // NodeId can be obtained from TaskAttemptInfo.hostname - but this will
-      // eventually contain rack info.
-      container.setContainerToken(null);
-      container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" + 
-          attemptInfo.getHttpPort());
+      String[] splits = attemptInfo.getHostname().split(":");
+      NodeId nodeId = BuilderUtils.newNodeId(splits[0], Integer
+          .parseInt(splits[1]));
+      // Resource/Priority/ApplicationACLs are only needed while launching the
+      // container on an NM, these are already completed tasks, so setting them
+      // to null
+      Container container = BuilderUtils.newContainer(cId, nodeId,
+          attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
+          null, null, null);
       actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
           container, null));
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Oct 25 00:41:52 2011
@@ -315,15 +315,17 @@ public class MRApp extends MRAppMaster {
   }
 
   class MockContainerLauncher implements ContainerLauncher {
+
+    //We are running locally so set the shuffle port to -1 
+    int shufflePort = -1;
+
     @Override
     public void handle(ContainerLauncherEvent event) {
       switch (event.getType()) {
       case CONTAINER_REMOTE_LAUNCH:
-        //We are running locally so set the shuffle port to -1 
         getContext().getEventHandler().handle(
             new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
-                -1)
-            );
+                shufflePort));
         
         attemptLaunched(event.getTaskAttemptID());
         break;
@@ -355,13 +357,9 @@ public class MRApp extends MRAppMaster {
         ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
         cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
         cId.setId(containerCount++);
-        Container container = recordFactory.newRecordInstance(Container.class);
-        container.setId(cId);
-        container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
-        container.getNodeId().setHost("dummy");
-        container.getNodeId().setPort(1234);
-        container.setContainerToken(null);
-        container.setNodeHttpAddress("localhost:9999");
+        NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
+        Container container = BuilderUtils.newContainer(cId, nodeId,
+            "localhost:9999", null, null, null);
         getContext().getEventHandler().handle(
             new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
                 container, null));

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Oct 25 00:41:52 2011
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
@@ -269,6 +270,9 @@ public class TestRecovery {
     
     //wait for map task to complete
     app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
     
     app.waitForState(reduceTask1, TaskState.RUNNING);
     TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
@@ -290,7 +294,8 @@ public class TestRecovery {
 
     //rerun
     //in rerun the map will be recovered from previous run
-    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+        ++runCount);
     conf = new Configuration();
     conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
     conf.setBoolean("mapred.mapper.new-api", true);
@@ -308,6 +313,10 @@ public class TestRecovery {
     
     // map will be recovered, no need to send done
     app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
     
     // first reduce will be recovered, no need to send done
     app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
@@ -398,6 +407,13 @@ public class TestRecovery {
     }
 
     @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      MockContainerLauncher launcher = new MockContainerLauncher();
+      launcher.shufflePort = 5467;
+      return launcher;
+    }
+
+    @Override
     protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
         AppContext context) {
       JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,