You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/07/24 05:42:54 UTC

svn commit: r1506392 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...

Author: vinodkv
Date: Wed Jul 24 03:42:52 2013
New Revision: 1506392

URL: http://svn.apache.org/r1506392
Log:
YARN-926. Modified ContainerManagerProtcol APIs to take in requests for multiple containers. Contributed by Jian He.
MAPREDUCE-5412. Update MR app to use multiple containers API of ContainerManager after YARN-926. Contributed by Jian He.
svn merge --ignore-ancestry -c 1506391 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1506392&r1=1506391&r2=1506392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Jul 24 03:42:52 2013
@@ -450,6 +450,9 @@ Release 2.1.0-beta - 2013-07-02
     MAPREDUCE-5325. MR changes related to YARN-727. ClientRMProtocol.getAllApplications
     should accept ApplicationType as a parameter. (Xuan Gong via hitesh)
 
+    MAPREDUCE-5412. Update MR app to use multiple containers API of
+    ContainerManager after YARN-926. (Jian He via vinodkv)
+
   BREAKDOWN OF HADOOP-8562 SUBTASKS
 
     MAPREDUCE-4739. Some MapReduce tests fail to find winutils.

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1506392&r1=1506391&r2=1506392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Wed Jul 24 03:42:52 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -44,14 +46,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
 import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.util.Records;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -139,13 +142,18 @@ public class ContainerLauncherImpl exten
           event.getContainerLaunchContext();
 
         // Now launch the actual container
-        StartContainerRequest startRequest = Records
-          .newRecord(StartContainerRequest.class);
-        startRequest.setContainerLaunchContext(containerLaunchContext);
-        startRequest.setContainerToken(event.getContainerToken());
-        StartContainerResponse response =
-            proxy.getContainerManagementProtocol().startContainer(startRequest);
-
+        StartContainerRequest startRequest =
+            StartContainerRequest.newInstance(containerLaunchContext,
+              event.getContainerToken());
+        List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+        list.add(startRequest);
+        StartContainersRequest requestList = StartContainersRequest.newInstance(list);
+        StartContainersResponse response =
+            proxy.getContainerManagementProtocol().startContainers(requestList);
+        if (response.getFailedRequests() != null
+            && response.getFailedRequests().containsKey(containerID)) {
+          throw response.getFailedRequests().get(containerID).deSerialize();
+        }
         ByteBuffer portInfo =
             response.getAllServicesMetaData().get(
                 ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
@@ -192,13 +200,17 @@ public class ContainerLauncherImpl exten
           proxy = getCMProxy(this.containerMgrAddress, this.containerID);
 
           // kill the remote container if already launched
-          StopContainerRequest stopRequest = Records
-              .newRecord(StopContainerRequest.class);
-          stopRequest.setContainerId(this.containerID);
-          proxy.getContainerManagementProtocol().stopContainer(stopRequest);
-
+          List<ContainerId> ids = new ArrayList<ContainerId>();
+          ids.add(this.containerID);
+          StopContainersRequest request = StopContainersRequest.newInstance(ids);
+          StopContainersResponse response =
+              proxy.getContainerManagementProtocol().stopContainers(request);
+          if (response.getFailedRequests() != null
+              && response.getFailedRequests().containsKey(this.containerID)) {
+            throw response.getFailedRequests().get(this.containerID)
+              .deSerialize();
+          }
         } catch (Throwable t) {
-
           // ignore the cleanup failure
           String message = "cleanup failed for container "
               + this.containerID + " : "

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1506392&r1=1506391&r2=1506392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Wed Jul 24 03:42:52 2013
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -52,12 +54,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -393,18 +396,18 @@ public class TestContainerLauncher {
     private ContainerStatus status = null;
 
     @Override
-    public GetContainerStatusResponse getContainerStatus(
-        GetContainerStatusRequest request) throws IOException {
-      GetContainerStatusResponse response = recordFactory
-          .newRecordInstance(GetContainerStatusResponse.class);
-      response.setStatus(status);
-      return response;
+    public GetContainerStatusesResponse getContainerStatuses(
+        GetContainerStatusesRequest request) throws IOException {
+      List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+      statuses.add(status);
+      return GetContainerStatusesResponse.newInstance(statuses, null);
     }
 
     @Override
-    public StartContainerResponse startContainer(StartContainerRequest request)
+    public StartContainersResponse startContainers(StartContainersRequest requests)
         throws IOException {
 
+      StartContainerRequest request = requests.getStartContainerRequests().get(0);
       ContainerTokenIdentifier containerTokenIdentifier =
           MRApp.newContainerTokenIdentifier(request.getContainerToken());
 
@@ -412,8 +415,8 @@ public class TestContainerLauncher {
       Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_PORT,
         containerTokenIdentifier.getNmHostAddress());
 
-      StartContainerResponse response = recordFactory
-          .newRecordInstance(StartContainerResponse.class);
+      StartContainersResponse response = recordFactory
+          .newRecordInstance(StartContainersResponse.class);
       status = recordFactory.newRecordInstance(ContainerStatus.class);
       try {
         // make the thread sleep to look like its not going to respond
@@ -429,7 +432,7 @@ public class TestContainerLauncher {
     }
 
     @Override
-    public StopContainerResponse stopContainer(StopContainerRequest request)
+    public StopContainersResponse stopContainers(StopContainersRequest request)
         throws IOException {
       Exception e = new Exception("Dummy function", new Exception(
           "Dummy function cause"));

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1506392&r1=1506391&r2=1506392&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Wed Jul 24 03:42:52 2013
@@ -45,12 +45,12 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -162,8 +162,8 @@ public class TestContainerLauncherImpl {
     try {
       ContainerId contId = makeContainerId(0l, 0, 0, 1);
       TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
-      StartContainerResponse startResp = 
-        recordFactory.newRecordInstance(StartContainerResponse.class);
+      StartContainersResponse startResp =
+        recordFactory.newRecordInstance(StartContainersResponse.class);
       startResp.setAllServicesMetaData(serviceResponse);
       
 
@@ -176,14 +176,14 @@ public class TestContainerLauncherImpl {
         .thenReturn(contId);
       when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
       when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
-      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
       when(mockLaunchEvent.getContainerToken()).thenReturn(
           createNewContainerToken(contId, cmAddress));
       ut.handle(mockLaunchEvent);
       
       ut.waitForPoolToIdle();
       
-      verify(mockCM).startContainer(any(StartContainerRequest.class));
+      verify(mockCM).startContainers(any(StartContainersRequest.class));
       
       LOG.info("inserting cleanup event");
       ContainerLauncherEvent mockCleanupEvent = 
@@ -198,7 +198,7 @@ public class TestContainerLauncherImpl {
       
       ut.waitForPoolToIdle();
       
-      verify(mockCM).stopContainer(any(StopContainerRequest.class));
+      verify(mockCM).stopContainers(any(StopContainersRequest.class));
     } finally {
       ut.stop();
     }
@@ -224,8 +224,8 @@ public class TestContainerLauncherImpl {
       ContainerId contId = makeContainerId(0l, 0, 0, 1);
       TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
       String cmAddress = "127.0.0.1:8000";
-      StartContainerResponse startResp = 
-        recordFactory.newRecordInstance(StartContainerResponse.class);
+      StartContainersResponse startResp =
+        recordFactory.newRecordInstance(StartContainersResponse.class);
       startResp.setAllServicesMetaData(serviceResponse);
 
       LOG.info("inserting cleanup event");
@@ -241,7 +241,7 @@ public class TestContainerLauncherImpl {
       
       ut.waitForPoolToIdle();
       
-      verify(mockCM, never()).stopContainer(any(StopContainerRequest.class));
+      verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));
 
       LOG.info("inserting launch event");
       ContainerRemoteLaunchEvent mockLaunchEvent = 
@@ -252,14 +252,14 @@ public class TestContainerLauncherImpl {
         .thenReturn(contId);
       when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
       when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
-      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
       when(mockLaunchEvent.getContainerToken()).thenReturn(
           createNewContainerToken(contId, cmAddress));
       ut.handle(mockLaunchEvent);
       
       ut.waitForPoolToIdle();
       
-      verify(mockCM, never()).startContainer(any(StartContainerRequest.class));
+      verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
     } finally {
       ut.stop();
     }
@@ -286,8 +286,8 @@ public class TestContainerLauncherImpl {
       ContainerId contId = makeContainerId(0l, 0, 0, 1);
       TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
       String cmAddress = "127.0.0.1:8000";
-      StartContainerResponse startResp =
-        recordFactory.newRecordInstance(StartContainerResponse.class);
+      StartContainersResponse startResp =
+        recordFactory.newRecordInstance(StartContainersResponse.class);
       startResp.setAllServicesMetaData(serviceResponse);
 
       LOG.info("inserting launch event");
@@ -299,20 +299,20 @@ public class TestContainerLauncherImpl {
         .thenReturn(contId);
       when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
       when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
-      when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp);
+      when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
       when(mockLaunchEvent.getContainerToken()).thenReturn(
           createNewContainerToken(contId, cmAddress));
       ut.handle(mockLaunchEvent);
 
       ut.waitForPoolToIdle();
 
-      verify(mockCM).startContainer(any(StartContainerRequest.class));
+      verify(mockCM).startContainers(any(StartContainersRequest.class));
 
       // skip cleanup and make sure stop kills the container
 
     } finally {
       ut.stop();
-      verify(mockCM).stopContainer(any(StopContainerRequest.class));
+      verify(mockCM).stopContainers(any(StopContainersRequest.class));
     }
   }
   
@@ -341,8 +341,8 @@ public class TestContainerLauncherImpl {
       ContainerId contId = makeContainerId(0l, 0, 0, 1);
       TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
       String cmAddress = "127.0.0.1:8000";
-      StartContainerResponse startResp = 
-        recordFactory.newRecordInstance(StartContainerResponse.class);
+      StartContainersResponse startResp =
+        recordFactory.newRecordInstance(StartContainersResponse.class);
       startResp.setAllServicesMetaData(serviceResponse);
       
      
@@ -415,7 +415,7 @@ public class TestContainerLauncherImpl {
       this.completeLaunchBarrier = completeLaunchBarrier;
     }
     @Override
-    public StartContainerResponse startContainer(StartContainerRequest request)
+    public StartContainersResponse startContainers(StartContainersRequest request)
         throws IOException {
       try {
         startLaunchBarrier.await();
@@ -433,16 +433,14 @@ public class TestContainerLauncherImpl {
     }
 
     @Override
-    public StopContainerResponse stopContainer(StopContainerRequest request)
+    public StopContainersResponse stopContainers(StopContainersRequest request)
         throws IOException {
-    
       return null;
     }
 
     @Override
-    public GetContainerStatusResponse getContainerStatus(
-        GetContainerStatusRequest request) throws IOException {
-    
+    public GetContainerStatusesResponse getContainerStatuses(
+        GetContainerStatusesRequest request) throws IOException {
       return null;
     }
   }