You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/07/24 05:42:54 UTC
svn commit: r1506392 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Author: vinodkv
Date: Wed Jul 24 03:42:52 2013
New Revision: 1506392
URL: http://svn.apache.org/r1506392
Log:
YARN-926. Modified ContainerManagerProtcol APIs to take in requests for multiple containers. Contributed by Jian He.
MAPREDUCE-5412. Update MR app to use multiple containers API of ContainerManager after YARN-926. Contributed by Jian He.
svn merge --ignore-ancestry -c 1506391 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1506392&r1=1506391&r2=1506392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Jul 24 03:42:52 2013
@@ -450,6 +450,9 @@ Release 2.1.0-beta - 2013-07-02
MAPREDUCE-5325. MR changes related to YARN-727. ClientRMProtocol.getAllApplications
should accept ApplicationType as a parameter. (Xuan Gong via hitesh)
+ MAPREDUCE-5412. Update MR app to use multiple containers API of
+ ContainerManager after YARN-926. (Jian He via vinodkv)
+
BREAKDOWN OF HADOOP-8562 SUBTASKS
MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1506392&r1=1506391&r2=1506392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Wed Jul 24 03:42:52 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.mapreduce.v2.a
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,14 +46,15 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.util.Records;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -139,13 +142,18 @@ public class ContainerLauncherImpl exten
event.getContainerLaunchContext();
// Now launch the actual container
- StartContainerRequest startRequest = Records
- .newRecord(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- startRequest.setContainerToken(event.getContainerToken());
- StartContainerResponse response =
- proxy.getContainerManagementProtocol().startContainer(startRequest);
-
+ StartContainerRequest startRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ event.getContainerToken());
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(startRequest);
+ StartContainersRequest requestList = StartContainersRequest.newInstance(list);
+ StartContainersResponse response =
+ proxy.getContainerManagementProtocol().startContainers(requestList);
+ if (response.getFailedRequests() != null
+ && response.getFailedRequests().containsKey(containerID)) {
+ throw response.getFailedRequests().get(containerID).deSerialize();
+ }
ByteBuffer portInfo =
response.getAllServicesMetaData().get(
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
@@ -192,13 +200,17 @@ public class ContainerLauncherImpl exten
proxy = getCMProxy(this.containerMgrAddress, this.containerID);
// kill the remote container if already launched
- StopContainerRequest stopRequest = Records
- .newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(this.containerID);
- proxy.getContainerManagementProtocol().stopContainer(stopRequest);
-
+ List<ContainerId> ids = new ArrayList<ContainerId>();
+ ids.add(this.containerID);
+ StopContainersRequest request = StopContainersRequest.newInstance(ids);
+ StopContainersResponse response =
+ proxy.getContainerManagementProtocol().stopContainers(request);
+ if (response.getFailedRequests() != null
+ && response.getFailedRequests().containsKey(this.containerID)) {
+ throw response.getFailedRequests().get(this.containerID)
+ .deSerialize();
+ }
} catch (Throwable t) {
-
// ignore the cleanup failure
String message = "cleanup failed for container "
+ this.containerID + " : "
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1506392&r1=1506391&r2=1506392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Wed Jul 24 03:42:52 2013
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,12 +54,13 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -393,18 +396,18 @@ public class TestContainerLauncher {
private ContainerStatus status = null;
@Override
- public GetContainerStatusResponse getContainerStatus(
- GetContainerStatusRequest request) throws IOException {
- GetContainerStatusResponse response = recordFactory
- .newRecordInstance(GetContainerStatusResponse.class);
- response.setStatus(status);
- return response;
+ public GetContainerStatusesResponse getContainerStatuses(
+ GetContainerStatusesRequest request) throws IOException {
+ List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+ statuses.add(status);
+ return GetContainerStatusesResponse.newInstance(statuses, null);
}
@Override
- public StartContainerResponse startContainer(StartContainerRequest request)
+ public StartContainersResponse startContainers(StartContainersRequest requests)
throws IOException {
+ StartContainerRequest request = requests.getStartContainerRequests().get(0);
ContainerTokenIdentifier containerTokenIdentifier =
MRApp.newContainerTokenIdentifier(request.getContainerToken());
@@ -412,8 +415,8 @@ public class TestContainerLauncher {
Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_PORT,
containerTokenIdentifier.getNmHostAddress());
- StartContainerResponse response = recordFactory
- .newRecordInstance(StartContainerResponse.class);
+ StartContainersResponse response = recordFactory
+ .newRecordInstance(StartContainersResponse.class);
status = recordFactory.newRecordInstance(ContainerStatus.class);
try {
// make the thread sleep to look like its not going to respond
@@ -429,7 +432,7 @@ public class TestContainerLauncher {
}
@Override
- public StopContainerResponse stopContainer(StopContainerRequest request)
+ public StopContainersResponse stopContainers(StopContainersRequest request)
throws IOException {
Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1506392&r1=1506391&r2=1506392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Wed Jul 24 03:42:52 2013
@@ -45,12 +45,12 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -162,8 +162,8 @@ public class TestContainerLauncherImpl {
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
- StartContainerResponse startResp =
- recordFactory.newRecordInstance(StartContainerResponse.class);
+ StartContainersResponse startResp =
+ recordFactory.newRecordInstance(StartContainersResponse.class);
startResp.setAllServicesMetaData(serviceResponse);
@@ -176,14 +176,14 @@ public class TestContainerLauncherImpl {
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
- when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+ when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
when(mockLaunchEvent.getContainerToken()).thenReturn(
createNewContainerToken(contId, cmAddress));
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
- verify(mockCM).startContainer(any(StartContainerRequest.class));
+ verify(mockCM).startContainers(any(StartContainersRequest.class));
LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
@@ -198,7 +198,7 @@ public class TestContainerLauncherImpl {
ut.waitForPoolToIdle();
- verify(mockCM).stopContainer(any(StopContainerRequest.class));
+ verify(mockCM).stopContainers(any(StopContainersRequest.class));
} finally {
ut.stop();
}
@@ -224,8 +224,8 @@ public class TestContainerLauncherImpl {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
- StartContainerResponse startResp =
- recordFactory.newRecordInstance(StartContainerResponse.class);
+ StartContainersResponse startResp =
+ recordFactory.newRecordInstance(StartContainersResponse.class);
startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting cleanup event");
@@ -241,7 +241,7 @@ public class TestContainerLauncherImpl {
ut.waitForPoolToIdle();
- verify(mockCM, never()).stopContainer(any(StopContainerRequest.class));
+ verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
@@ -252,14 +252,14 @@ public class TestContainerLauncherImpl {
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
- when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+ when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
when(mockLaunchEvent.getContainerToken()).thenReturn(
createNewContainerToken(contId, cmAddress));
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
- verify(mockCM, never()).startContainer(any(StartContainerRequest.class));
+ verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
} finally {
ut.stop();
}
@@ -286,8 +286,8 @@ public class TestContainerLauncherImpl {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
- StartContainerResponse startResp =
- recordFactory.newRecordInstance(StartContainerResponse.class);
+ StartContainersResponse startResp =
+ recordFactory.newRecordInstance(StartContainersResponse.class);
startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting launch event");
@@ -299,20 +299,20 @@ public class TestContainerLauncherImpl {
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
- when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+ when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
when(mockLaunchEvent.getContainerToken()).thenReturn(
createNewContainerToken(contId, cmAddress));
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
- verify(mockCM).startContainer(any(StartContainerRequest.class));
+ verify(mockCM).startContainers(any(StartContainersRequest.class));
// skip cleanup and make sure stop kills the container
} finally {
ut.stop();
- verify(mockCM).stopContainer(any(StopContainerRequest.class));
+ verify(mockCM).stopContainers(any(StopContainersRequest.class));
}
}
@@ -341,8 +341,8 @@ public class TestContainerLauncherImpl {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
- StartContainerResponse startResp =
- recordFactory.newRecordInstance(StartContainerResponse.class);
+ StartContainersResponse startResp =
+ recordFactory.newRecordInstance(StartContainersResponse.class);
startResp.setAllServicesMetaData(serviceResponse);
@@ -415,7 +415,7 @@ public class TestContainerLauncherImpl {
this.completeLaunchBarrier = completeLaunchBarrier;
}
@Override
- public StartContainerResponse startContainer(StartContainerRequest request)
+ public StartContainersResponse startContainers(StartContainersRequest request)
throws IOException {
try {
startLaunchBarrier.await();
@@ -433,16 +433,14 @@ public class TestContainerLauncherImpl {
}
@Override
- public StopContainerResponse stopContainer(StopContainerRequest request)
+ public StopContainersResponse stopContainers(StopContainersRequest request)
throws IOException {
-
return null;
}
@Override
- public GetContainerStatusResponse getContainerStatus(
- GetContainerStatusRequest request) throws IOException {
-
+ public GetContainerStatusesResponse getContainerStatuses(
+ GetContainerStatusesRequest request) throws IOException {
return null;
}
}