You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/10/25 02:41:52 UTC
svn commit: r1188467 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Author: acmurthy
Date: Tue Oct 25 00:41:52 2011
New Revision: 1188467
URL: http://svn.apache.org/viewvc?rev=1188467&view=rev
Log:
Merge -c 1188388 from trunk to branch-0.23 to complete fix for MAPREDUCE-3252.
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Oct 25 00:41:52 2011
@@ -1691,6 +1691,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via
acmurthy)
+ MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery.
+ (vinodkv via acmurthy)
+
MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when
map output fits in spill buffer. (todd)
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Oct 25 00:41:52 2011
@@ -568,7 +568,8 @@ public abstract class TaskImpl implement
//raise the completion event only if the container is assigned
// to nextAttemptNumber
if (attempt.getNodeHttpAddress() != null) {
- TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
+ TaskAttemptCompletionEvent tce = recordFactory
+ .newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
tce.setMapOutputServerAddress("http://"
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Oct 25 00:41:52 2011
@@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.BuilderUtils;
/*
* Recovers the completed tasks from the previous life of Application Master.
@@ -313,8 +314,8 @@ public class RecoveryService extends Com
TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
.getTaskAttemptID();
TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
- //TODO need to get the real port number MAPREDUCE-2666
- actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1));
+ actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
+ attInfo.getShufflePort()));
// send the status update event
sendStatusUpdateEvent(aId, attInfo);
@@ -392,16 +393,15 @@ public class RecoveryService extends Com
TaskAttemptInfo attemptInfo) {
LOG.info("Sending assigned event to " + yarnAttemptID);
ContainerId cId = attemptInfo.getContainerId();
- Container container = recordFactory
- .newRecordInstance(Container.class);
- container.setId(cId);
- container.setNodeId(recordFactory
- .newRecordInstance(NodeId.class));
- // NodeId can be obtained from TaskAttemptInfo.hostname - but this will
- // eventually contain rack info.
- container.setContainerToken(null);
- container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" +
- attemptInfo.getHttpPort());
+ String[] splits = attemptInfo.getHostname().split(":");
+ NodeId nodeId = BuilderUtils.newNodeId(splits[0], Integer
+ .parseInt(splits[1]));
+ // Resource/Priority/ApplicationACLs are only needed while launching the
+ // container on an NM, these are already completed tasks, so setting them
+ // to null
+ Container container = BuilderUtils.newContainer(cId, nodeId,
+ attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
+ null, null, null);
actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
container, null));
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Oct 25 00:41:52 2011
@@ -315,15 +315,17 @@ public class MRApp extends MRAppMaster {
}
class MockContainerLauncher implements ContainerLauncher {
+
+ //We are running locally so set the shuffle port to -1
+ int shufflePort = -1;
+
@Override
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
- //We are running locally so set the shuffle port to -1
getContext().getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
- -1)
- );
+ shufflePort));
attemptLaunched(event.getTaskAttemptID());
break;
@@ -355,13 +357,9 @@ public class MRApp extends MRAppMaster {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
cId.setId(containerCount++);
- Container container = recordFactory.newRecordInstance(Container.class);
- container.setId(cId);
- container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
- container.getNodeId().setHost("dummy");
- container.getNodeId().setPort(1234);
- container.setContainerToken(null);
- container.setNodeHttpAddress("localhost:9999");
+ NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234);
+ Container container = BuilderUtils.newContainer(cId, nodeId,
+ "localhost:9999", null, null, null);
getContext().getEventHandler().handle(
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1188467&r1=1188466&r2=1188467&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Oct 25 00:41:52 2011
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@@ -269,6 +270,9 @@ public class TestRecovery {
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
@@ -290,7 +294,8 @@ public class TestRecovery {
//rerun
//in rerun the map will be recovered from previous run
- app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount);
+ app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+ ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
@@ -308,6 +313,10 @@ public class TestRecovery {
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port after recovery
+ task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
// first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
@@ -398,6 +407,13 @@ public class TestRecovery {
}
@Override
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
+ MockContainerLauncher launcher = new MockContainerLauncher();
+ launcher.shufflePort = 5467;
+ return launcher;
+ }
+
+ @Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,