You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/04/19 04:42:20 UTC
svn commit: r1469669 - in
/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-clien...
Author: szetszwo
Date: Fri Apr 19 02:42:16 2013
New Revision: 1469669
URL: http://svn.apache.org/r1469669
Log:
Merge r1469042 through r1469643 from trunk.
Added:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputFormat.java
- copied unchanged from r1469643, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputFormat.java
Modified:
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed)
hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1469042-1469643
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Fri Apr 19 02:42:16 2013
@@ -198,6 +198,10 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-4985. Add compression option to TestDFSIO usage.
(Plamen Jeliazkov via shv)
+ MAPREDUCE-5152. Make MR App to simply pass through the container from RM
+ instead of extracting and populating information itself to start any
+ container. (vinodkv)
+
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
@@ -305,6 +309,17 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5151. Update MR AM to use standard exit codes from the API after
YARN-444. (Sandy Ryza via vinodkv)
+ MAPREDUCE-5140. MR part of YARN-514 (Zhijie Shen via bikas)
+
+ MAPREDUCE-5128. mapred-default.xml is missing a bunch of history server
+ configs. (sandyr via tucu)
+
+ MAPREDUCE-4898. FileOutputFormat.checkOutputSpecs and
+ FileOutputFormat.setOutputPath incompatible with MR1. (rkanter via tucu)
+
+ MAPREDUCE-4932. mapreduce.job#getTaskCompletionEvents incompatible with
+ Hadoop 1. (rkanter via tucu)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1469042-1469643
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1469042-1469643
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Apr 19 02:42:16 2013
@@ -117,7 +117,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -490,14 +489,10 @@ public abstract class TaskAttemptImpl im
<TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
stateMachine;
- private ContainerId containerID;
- private NodeId containerNodeId;
- private String containerMgrAddress;
- private String nodeHttpAddress;
+ @VisibleForTesting
+ public Container container;
private String nodeRackName;
private WrappedJvmID jvmID;
- private ContainerToken containerToken;
- private Resource assignedCapability;
//this takes good amount of memory ~ 30KB. Instantiate it lazily
//and make it null once task is launched.
@@ -825,7 +820,7 @@ public abstract class TaskAttemptImpl im
public ContainerId getAssignedContainerID() {
readLock.lock();
try {
- return containerID;
+ return container == null ? null : container.getId();
} finally {
readLock.unlock();
}
@@ -835,7 +830,8 @@ public abstract class TaskAttemptImpl im
public String getAssignedContainerMgrAddress() {
readLock.lock();
try {
- return containerMgrAddress;
+ return container == null ? null : StringInterner.weakIntern(container
+ .getNodeId().toString());
} finally {
readLock.unlock();
}
@@ -895,7 +891,7 @@ public abstract class TaskAttemptImpl im
public NodeId getNodeId() {
readLock.lock();
try {
- return containerNodeId;
+ return container == null ? null : container.getNodeId();
} finally {
readLock.unlock();
}
@@ -907,7 +903,7 @@ public abstract class TaskAttemptImpl im
public String getNodeHttpAddress() {
readLock.lock();
try {
- return nodeHttpAddress;
+ return container == null ? null : container.getNodeHttpAddress();
} finally {
readLock.unlock();
}
@@ -967,8 +963,8 @@ public abstract class TaskAttemptImpl im
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
- if (this.containerNodeId != null) {
- result.setNodeManagerPort(this.containerNodeId.getPort());
+ if (this.container != null) {
+ result.setNodeManagerPort(this.container.getNodeId().getPort());
}
return result;
} finally {
@@ -1093,13 +1089,17 @@ public abstract class TaskAttemptImpl im
@SuppressWarnings("unchecked")
public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
OutputCommitter committer, boolean recoverOutput) {
- containerID = taInfo.getContainerId();
- containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+ ContainerId containerId = taInfo.getContainerId();
+ NodeId containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+ taInfo.getPort());
- containerMgrAddress = StringInterner.weakIntern(
- containerNodeId.toString());
- nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+ String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+ taInfo.getHttpPort());
+ // Resource/Priority/Tokens are only needed while launching the
+ // container on an NM, these are already completed tasks, so setting them to
+ // null
+ container =
+ BuilderUtils.newContainer(containerId, containerNodeId,
+ nodeHttpAddress, null, null, null);
computeRackAndLocality();
launchTime = taInfo.getStartTime();
finishTime = (taInfo.getFinishTime() != -1) ?
@@ -1227,6 +1227,7 @@ public abstract class TaskAttemptImpl im
}
private void computeRackAndLocality() {
+ NodeId containerNodeId = container.getNodeId();
nodeRackName = RackResolver.resolve(
containerNodeId.getHost()).getNetworkLocation();
@@ -1331,10 +1332,10 @@ public abstract class TaskAttemptImpl im
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
.getTaskType()), attemptState.toString(),
taskAttempt.finishTime,
- taskAttempt.containerNodeId == null ? "UNKNOWN"
- : taskAttempt.containerNodeId.getHost(),
- taskAttempt.containerNodeId == null ? -1
- : taskAttempt.containerNodeId.getPort(),
+ taskAttempt.container == null ? "UNKNOWN"
+ : taskAttempt.container.getNodeId().getHost(),
+ taskAttempt.container == null ? -1
+ : taskAttempt.container.getNodeId().getPort(),
taskAttempt.nodeRackName == null ? "UNKNOWN"
: taskAttempt.nodeRackName,
StringUtils.join(
@@ -1353,12 +1354,12 @@ public abstract class TaskAttemptImpl im
eventHandler.handle(jce);
LOG.info("TaskAttempt: [" + attemptId
- + "] using containerId: [" + containerID + " on NM: ["
- + containerMgrAddress + "]");
+ + "] using containerId: [" + container.getId() + " on NM: ["
+ + StringInterner.weakIntern(container.getNodeId().toString()) + "]");
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
- launchTime, trackerName, httpPort, shufflePort, containerID,
+ launchTime, trackerName, httpPort, shufflePort, container.getId(),
locality.toString(), avataar.toString());
eventHandler.handle(
new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
@@ -1490,19 +1491,14 @@ public abstract class TaskAttemptImpl im
TaskAttemptEvent event) {
final TaskAttemptContainerAssignedEvent cEvent =
(TaskAttemptContainerAssignedEvent) event;
- taskAttempt.containerID = cEvent.getContainer().getId();
- taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
- taskAttempt.containerMgrAddress = StringInterner.weakIntern(
- taskAttempt.containerNodeId.toString());
- taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
- cEvent.getContainer().getNodeHttpAddress());
- taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
- taskAttempt.assignedCapability = cEvent.getContainer().getResource();
+ Container container = cEvent.getContainer();
+ taskAttempt.container = container;
// this is a _real_ Task (classic Hadoop mapred flavor):
taskAttempt.remoteTask = taskAttempt.createRemoteTask();
- taskAttempt.jvmID = new WrappedJvmID(
- taskAttempt.remoteTask.getTaskID().getJobID(),
- taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
+ taskAttempt.jvmID =
+ new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
+ taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId()
+ .getId());
taskAttempt.taskAttemptListener.registerPendingTask(
taskAttempt.remoteTask, taskAttempt.jvmID);
@@ -1514,10 +1510,9 @@ public abstract class TaskAttemptImpl im
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
taskAttempt.taskAttemptListener, taskAttempt.credentials);
- taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
- taskAttempt.attemptId, taskAttempt.containerID,
- taskAttempt.containerMgrAddress, taskAttempt.containerToken,
- launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask));
+ taskAttempt.eventHandler
+ .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
+ launchContext, container, taskAttempt.remoteTask));
// send event to speculator that our container needs are satisfied
taskAttempt.eventHandler.handle
@@ -1604,9 +1599,8 @@ public abstract class TaskAttemptImpl im
taskAttempt.taskAttemptListener
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
//TODO Resolve to host / IP in case of a local address.
- InetSocketAddress nodeHttpInetAddr =
- NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
- // Costly?
+ InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
+ NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
taskAttempt.sendLaunchedEvents();
@@ -1713,6 +1707,10 @@ public abstract class TaskAttemptImpl im
private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
//Log finished events only if an attempt started.
if (getLaunchTime() == 0) return;
+ String containerHostName = this.container == null ? "UNKNOWN"
+ : this.container.getNodeId().getHost();
+ int containerNodePort =
+ this.container == null ? -1 : this.container.getNodeId().getPort();
if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
MapAttemptFinishedEvent mfe =
new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
@@ -1720,9 +1718,8 @@ public abstract class TaskAttemptImpl im
state.toString(),
this.reportedStatus.mapFinishTime,
finishTime,
- this.containerNodeId == null ? "UNKNOWN"
- : this.containerNodeId.getHost(),
- this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
+ containerHostName,
+ containerNodePort,
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
@@ -1737,9 +1734,8 @@ public abstract class TaskAttemptImpl im
this.reportedStatus.shuffleFinishTime,
this.reportedStatus.sortFinishTime,
finishTime,
- this.containerNodeId == null ? "UNKNOWN"
- : this.containerNodeId.getHost(),
- this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
+ containerHostName,
+ containerNodePort,
this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
this.reportedStatus.stateString,
getCounters(),
@@ -1864,8 +1860,9 @@ public abstract class TaskAttemptImpl im
//send the cleanup event to containerLauncher
taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
taskAttempt.attemptId,
- taskAttempt.containerID, taskAttempt.containerMgrAddress,
- taskAttempt.containerToken,
+ taskAttempt.container.getId(), StringInterner
+ .weakIntern(taskAttempt.container.getNodeId().toString()),
+ taskAttempt.container.getContainerToken(),
ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri Apr 19 02:42:16 2013
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -149,16 +148,13 @@ public class ContainerLauncherImpl exten
// Construct the actual Container
ContainerLaunchContext containerLaunchContext =
- event.getContainer();
+ event.getContainerLaunchContext();
- org.apache.hadoop.yarn.api.records.Container container =
- BuilderUtils.newContainer(containerID, null, null,
- event.getResource(), null, containerToken);
// Now launch the actual container
StartContainerRequest startRequest = Records
.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainer(container);
+ startRequest.setContainer(event.getAllocatedContainer());
StartContainerResponse response = proxy.startContainer(startRequest);
ByteBuffer portInfo = response
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Fri Apr 19 02:42:16 2013
@@ -20,35 +20,34 @@ package org.apache.hadoop.mapreduce.v2.a
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerToken;
-import org.apache.hadoop.yarn.api.records.Resource;
public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
- private final ContainerLaunchContext container;
+ private final Container allocatedContainer;
+ private final ContainerLaunchContext containerLaunchContext;
private final Task task;
- private final Resource resource;
public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
- ContainerId containerID, String containerMgrAddress,
- ContainerToken containerToken,
- ContainerLaunchContext containerLaunchContext, Resource resource,
- Task remoteTask) {
- super(taskAttemptID, containerID, containerMgrAddress, containerToken,
- ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
- this.container = containerLaunchContext;
+ ContainerLaunchContext containerLaunchContext,
+ Container allocatedContainer, Task remoteTask) {
+ super(taskAttemptID, allocatedContainer.getId(), StringInterner
+ .weakIntern(allocatedContainer.getNodeId().toString()),
+ allocatedContainer.getContainerToken(),
+ ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
+ this.allocatedContainer = allocatedContainer;
+ this.containerLaunchContext = containerLaunchContext;
this.task = remoteTask;
- this.resource = resource;
}
- public ContainerLaunchContext getContainer() {
- return this.container;
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return this.containerLaunchContext;
}
- public Resource getResource() {
- return this.resource;
+ public Container getAllocatedContainer() {
+ return this.allocatedContainer;
}
public Task getRemoteTask() {
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Fri Apr 19 02:42:16 2013
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
import junit.framework.Assert;
@@ -46,6 +47,11 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -411,7 +417,40 @@ public class TestMRApp {
TypeConverter.fromYarn(state);
}
}
-
+
+ private Container containerObtainedByContainerLauncher;
+ @Test
+ public void testContainerPassThrough() throws Exception {
+ MRApp app = new MRApp(0, 1, true, this.getClass().getName(), true) {
+ @Override
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
+ return new MockContainerLauncher() {
+ @Override
+ public void handle(ContainerLauncherEvent event) {
+ if (event instanceof ContainerRemoteLaunchEvent) {
+ containerObtainedByContainerLauncher =
+ ((ContainerRemoteLaunchEvent) event).getAllocatedContainer();
+ }
+ super.handle(event);
+ }
+ };
+ };
+ };
+ Job job = app.submit(new Configuration());
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+
+ Collection<Task> tasks = job.getTasks().values();
+ Collection<TaskAttempt> taskAttempts =
+ tasks.iterator().next().getAttempts().values();
+ TaskAttemptImpl taskAttempt =
+ (TaskAttemptImpl) taskAttempts.iterator().next();
+ // Container from RM should pass through to the launcher. Container object
+ // should be the same.
+ Assert.assertTrue(taskAttempt.container
+ == containerObtainedByContainerLauncher);
+ }
+
private final class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java Fri Apr 19 02:42:16 2013
@@ -79,7 +79,8 @@ public class TestMapReduceChildJVM {
public void handle(ContainerLauncherEvent event) {
if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event;
- ContainerLaunchContext launchContext = launchEvent.getContainer();
+ ContainerLaunchContext launchContext =
+ launchEvent.getContainerLaunchContext();
String cmdString = launchContext.getCommands().toString();
LOG.info("launchContext " + cmdString);
myCommandLine = cmdString;
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Fri Apr 19 02:42:16 2013
@@ -37,7 +37,6 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -224,10 +223,6 @@ public class TestContainerLauncher {
@Test
public void testSlowNM() throws Exception {
- test();
- }
-
- private void test() throws Exception {
conf = new Configuration();
int maxAttempts = 1;
@@ -382,6 +377,15 @@ public class TestContainerLauncher {
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
+
+ // Validate that the container is what RM is giving.
+ Assert.assertEquals(MRApp.NM_HOST, request.getContainer().getNodeId()
+ .getHost());
+ Assert.assertEquals(MRApp.NM_PORT, request.getContainer().getNodeId()
+ .getPort());
+ Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_HTTP_PORT, request
+ .getContainer().getNodeHttpAddress());
+
StartContainerResponse response = recordFactory
.newRecordInstance(StartContainerResponse.class);
status = recordFactory.newRecordInstance(ContainerStatus.class);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Fri Apr 19 02:42:16 2013
@@ -392,6 +392,7 @@ public class TypeConverter {
FinalApplicationStatus finalApplicationStatus) {
switch (yarnApplicationState) {
case NEW:
+ case NEW_SAVING:
case SUBMITTED:
case ACCEPTED:
return State.PREP;
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Fri Apr 19 02:42:16 2013
@@ -23,6 +23,7 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -48,6 +49,9 @@ public class TestTypeConverter {
for (YarnApplicationState applicationState : YarnApplicationState.values()) {
TypeConverter.fromYarn(applicationState, FinalApplicationStatus.FAILED);
}
+ // ad hoc test of NEW_SAVING, which is newly added
+ Assert.assertEquals(State.PREP, TypeConverter.fromYarn(
+ YarnApplicationState.NEW_SAVING, FinalApplicationStatus.FAILED));
for (TaskType taskType : TaskType.values()) {
TypeConverter.fromYarn(taskType);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Fri Apr 19 02:42:16 2013
@@ -659,8 +659,24 @@ public class Job extends JobContextImpl
startFrom, numEvents);
}
});
+ }
+
+ /**
+ * Get events indicating completion (success/failure) of component tasks.
+ *
+ * @param startFrom index to start fetching events from
+ * @return an array of {@link TaskCompletionEvent}s
+ * @throws IOException
+ */
+ public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom)
+ throws IOException {
+ try {
+ return getTaskCompletionEvents(startFrom, 10);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
}
-
+ }
+
/**
* Kill indicated task attempt.
*
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Fri Apr 19 02:42:16 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@ -150,9 +150,14 @@ public static final String OUTDIR = "map
* @param outputDir the {@link Path} of the output directory for
* the map-reduce job.
*/
- public static void setOutputPath(Job job, Path outputDir) throws IOException {
- outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(
- outputDir);
+ public static void setOutputPath(Job job, Path outputDir) {
+ try {
+ outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(
+ outputDir);
+ } catch (IOException e) {
+ // Throw the IOException as a RuntimeException to be compatible with MR1
+ throw new RuntimeException(e);
+ }
job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Apr 19 02:42:16 2013
@@ -330,6 +330,14 @@
<description>The max percent (0-1) of running tasks that
can be speculatively re-executed at any time.</description>
</property>
+
+<property>
+ <name>mapreduce.job.map.output.collector.class</name>
+ <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
+ <description>
+ It defines the MapOutputCollector implementation to use.
+ </description>
+</property>
<property>
<name>mapreduce.job.speculative.slowtaskthreshold</name>
@@ -1037,11 +1045,89 @@
</property>
<property>
- <name>mapreduce.job.map.output.collector.class</name>
- <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
- <description>
- It defines the MapOutputCollector implementation to use.
+ <name>mapreduce.jobhistory.intermediate-done-dir</name>
+ <value>${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate</value>
+ <description></description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.done-dir</name>
+ <value>${yarn.app.mapreduce.am.staging-dir}/history/done</value>
+ <description></description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.cleaner.enable</name>
+ <value>true</value>
+ <description></description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.cleaner.interval-ms</name>
+ <value>86400000</value>
+ <description> How often the job history cleaner checks for files to delete,
+ in milliseconds. Defaults to 86400000 (one day). Files are only deleted if
+ they are older than mapreduce.jobhistory.max-age-ms.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.max-age-ms</name>
+ <value>604800000</value>
+ <description> Job history files older than this many milliseconds will
+ be deleted when the history cleaner runs. Defaults to 604800000 (1 week).
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.client.thread-count</name>
+ <value>10</value>
+ <description>The number of threads to handle client API requests</description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.datestring.cache.size</name>
+ <value>200000</value>
+ <description>Size of the date string cache. Effects the number of directories
+ which will be scanned to find a job.</description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.joblist.cache.size</name>
+ <value>20000</value>
+ <description>Size of the job list cache</description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.loadedjobs.cache.size</name>
+ <value>5</value>
+ <description>Size of the loaded job cache</description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.move.interval-ms</name>
+ <value>180000</value>
+ <description>Scan for history files to more from intermediate done dir to done
+ dir at this frequency.
</description>
</property>
+<property>
+ <name>mapreduce.jobhistory.move.thread-count</name>
+ <value>3</value>
+ <description>The number of threads used to move files.</description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.store.class</name>
+ <value></value>
+ <description>The HistoryStorage class to use to cache history data.</description>
+</property>
+
+<property>
+ <name>mapreduce.jobhistory.minicluster.fixed.ports</name>
+ <value>false</value>
+ <description>Whether to use fixed ports with the minicluster</description>
+</property>
+
</configuration>
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1469042-1469643
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1469669&r1=1469668&r2=1469669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Fri Apr 19 02:42:16 2013
@@ -234,6 +234,8 @@ public class ClientServiceDelegate {
throw RPCUtil.getRemoteException("User is not set in the application report");
}
if (application.getYarnApplicationState() == YarnApplicationState.NEW
+ || application.getYarnApplicationState() ==
+ YarnApplicationState.NEW_SAVING
|| application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
|| application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
realProxy = null;