You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ml...@apache.org on 2013/04/16 23:33:42 UTC

svn commit: r1468619 - in /incubator/tez/trunk/tez-yarn-application/src: main/java/org/apache/hadoop/mapreduce/v2/app2/local/ main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ test/java/org/apache/hadoop/mapreduce/v2/app2/ test/java/org/apache/hadoop/m...

Author: mliddell
Date: Tue Apr 16 21:33:41 2013
New Revision: 1468619

URL: http://svn.apache.org/r1468619
Log:
Tez-4: Integrate with Yarn's removal of AMResponse

Modified:
    incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
    incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
    incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
    incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java

Modified: incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java?rev=1468619&r1=1468618&r2=1468619&view=diff
==============================================================================
--- incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java (original)
+++ incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/local/LocalContainerRequestor.java Tue Apr 16 21:33:41 2013
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -79,10 +78,10 @@ public class LocalContainerRequestor ext
         this.applicationAttemptId, this.lastResponseID, super
             .getApplicationProgress(), new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
-    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response;
+    
+    AllocateResponse response;
     try {
-      response = allocateResponse.getAMResponse();
+      response = scheduler.allocate(allocateRequest);
       // Reset retry count if no exception occurred.
       retrystartTime = System.currentTimeMillis();
     } catch (Exception e) {

Modified: incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1468619&r1=1468618&r2=1468619&view=diff
==============================================================================
--- incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java (original)
+++ incubator/tez/trunk/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Tue Apr 16 21:33:41 2013
@@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,6 +57,8 @@ import org.apache.hadoop.yarn.util.Build
 import org.apache.hadoop.yarn.util.Records;
 
 
+
+
 /**
  * Keeps the data structures to send container requests to RM.
  */
@@ -319,7 +320,7 @@ public class RMContainerRequestor extend
     int headRoom = getAvailableResources() != null ? getAvailableResources()
         .getMemory() : 0;// first time it would be null
     int lastClusterNmCount = clusterNmCount;
-    AMResponse response = errorCheckedMakeRemoteRequest();
+    AllocateResponse response = errorCheckedMakeRemoteRequest();
     
     int newHeadRoom = getAvailableResources() != null ? getAvailableResources()
         .getMemory() : 0;
@@ -373,8 +374,8 @@ public class RMContainerRequestor extend
 
   
   @SuppressWarnings("unchecked")
-  protected AMResponse errorCheckedMakeRemoteRequest() throws Exception {
-    AMResponse response = null;
+  protected AllocateResponse errorCheckedMakeRemoteRequest() throws Exception {
+	AllocateResponse response = null;
     try {
       response = makeRemoteRequest();
       // Reset retry count if no exception occurred.
@@ -405,7 +406,7 @@ public class RMContainerRequestor extend
   }
   
   
-  protected AMResponse makeRemoteRequest() throws Exception {
+  protected AllocateResponse makeRemoteRequest() throws Exception {
     List<ContainerId> clonedReleaseList = cloneAndClearReleaseList();
     List<ResourceRequest> clonedAskList = cloneAndClearAskList();
 
@@ -419,20 +420,20 @@ public class RMContainerRequestor extend
       rePopulateListsOnError(clonedReleaseList, clonedAskList);
       throw e;
     }
-    AMResponse response = allocateResponse.getAMResponse();
-    lastResponseID = response.getResponseId();
-    availableResources = response.getAvailableResources();
+    
+    lastResponseID = allocateResponse.getResponseId();
+    availableResources = allocateResponse.getAvailableResources();
     clusterNmCount = allocateResponse.getNumClusterNodes();
 
     if (clonedAskList.size() > 0 || clonedReleaseList.size() > 0) {
       LOG.info("getResources() for " + applicationId + ":" + " ask="
           + clonedAskList.size() + " release= " + clonedReleaseList.size() 
-          + " newContainers=" + response.getAllocatedContainers().size() 
-          + " finishedContainers="+ response.getCompletedContainersStatuses().size()
+          + " newContainers=" + allocateResponse.getAllocatedContainers().size() 
+          + " finishedContainers="+ allocateResponse.getCompletedContainersStatuses().size()
           + " resourcelimit=" + availableResources + " knownNMs=" + clusterNmCount);
     }
 
-    return response;
+    return allocateResponse;
   }
 
   @Override

Modified: incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java?rev=1468619&r1=1468618&r2=1468619&view=diff
==============================================================================
--- incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java (original)
+++ incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java Tue Apr 16 21:33:41 2013
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -301,10 +300,8 @@ public class MRAppBenchmark {
                   }
                 }
 
-                AMResponse amResponse = Records.newRecord(AMResponse.class);
-                amResponse.setAllocatedContainers(containers);
-                amResponse.setResponseId(request.getResponseId() + 1);
-                response.setAMResponse(amResponse);
+                response.setAllocatedContainers(containers);
+                response.setResponseId(request.getResponseId() + 1);
                 response.setNumClusterNodes(350);
                 return response;
               }

Modified: incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java?rev=1468619&r1=1468618&r2=1468619&view=diff
==============================================================================
--- incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java (original)
+++ incubator/tez/trunk/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/rm/TestRMContainerRequestor.java Tue Apr 16 21:33:41 2013
@@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -167,12 +166,10 @@ public class TestRMContainerRequestor {
   }
   
   private AMRMProtocolForFailedAllocate createAMRMProtocolForFailedAllocate() {
-    AMResponse amResponse = 
-        newAMResponse(new ArrayList<Container>(),
+    AllocateResponse allocateResponse = 
+        newAllocateResponse(new ArrayList<Container>(),
             BuilderUtils.newResource(1024, 1), new ArrayList<ContainerStatus>(),
-            false, 1, new ArrayList<NodeReport>());
-    AllocateResponse allocateResponse = newAllocateResponse(
-        amResponse, 2);
+            false, 1, new ArrayList<NodeReport>(), 2);
     return new AMRMProtocolForFailedAllocate(allocateResponse);
   }
 
@@ -238,12 +235,10 @@ public class TestRMContainerRequestor {
     public AMRMProtocol createSchedulerProxy() {
       if (amRmProtocol == null) {
         amRmProtocol = mock(AMRMProtocol.class);
-        AMResponse amResponse = newAMResponse(
+        AllocateResponse allocateResponse  = newAllocateResponse(
             new ArrayList<Container>(), BuilderUtils.newResource(1024, 1),
             new ArrayList<ContainerStatus>(), false, 1,
-            new ArrayList<NodeReport>());
-        AllocateResponse allocateResponse = newAllocateResponse(
-            amResponse, 2);
+            new ArrayList<NodeReport>(), 2);
         try {
           when(amRmProtocol.allocate(any(AllocateRequest.class))).thenReturn(allocateResponse);
         } catch (YarnRemoteException e) {
@@ -288,12 +283,10 @@ public class TestRMContainerRequestor {
     public AllocateResponse allocate(AllocateRequest request)
         throws YarnRemoteException {
       this.allocateRequest = request;
-      AMResponse amResponse = newAMResponse(
+      AllocateResponse allocateResponse = newAllocateResponse(
           new ArrayList<Container>(), BuilderUtils.newResource(1024, 1),
           new ArrayList<ContainerStatus>(), false, 1,
-          new ArrayList<NodeReport>());
-      AllocateResponse allocateResponse = newAllocateResponse(
-          amResponse, 2);
+          new ArrayList<NodeReport>(),2);
       return allocateResponse;
     }
   }
@@ -343,24 +336,17 @@ public class TestRMContainerRequestor {
     return allocateRequest;
   }
   
-  public static AllocateResponse newAllocateResponse(AMResponse amResponse,
-      int numNodes) {
-    AllocateResponse response = Records.newRecord(AllocateResponse.class);
-    response.setAMResponse(amResponse);
-    response.setNumClusterNodes(numNodes);
-    return response;
-  }
-  
-  public static AMResponse newAMResponse(List<Container> allocated,
+  public static AllocateResponse newAllocateResponse(List<Container> allocated,
       Resource available, List<ContainerStatus> completed, boolean reboot,
-      int responseId, List<NodeReport> nodeUpdates) {
-    AMResponse amResponse = Records.newRecord(AMResponse.class);
+      int responseId, List<NodeReport> nodeUpdates, int numNodes) {
+	AllocateResponse amResponse = Records.newRecord(AllocateResponse.class);
     amResponse.setAllocatedContainers(allocated);
     amResponse.setAvailableResources(available);
     amResponse.setCompletedContainersStatuses(completed);
     amResponse.setReboot(reboot);
     amResponse.setResponseId(responseId);
     amResponse.setUpdatedNodes(nodeUpdates);
+    amResponse.setNumClusterNodes(numNodes);
     return amResponse;
   }
 }