You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/06/13 09:28:00 UTC

[40/51] [abbrv] hadoop git commit: YARN-5124. Modify AMRMClient to set the ExecutionType in the ResourceRequest. (asuresh)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index 6d93eb3..a556aa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,20 +22,31 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -43,12 +54,23 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Validates End2End Distributed Scheduling flow which includes the AM
@@ -57,11 +79,70 @@ import java.util.List;
  * the NM and the DistributedSchedulingProtocol used by the framework to talk
  * to the DistributedSchedulingService running on the RM.
  */
-public class TestDistributedScheduling extends TestAMRMProxy {
+public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
 
   private static final Log LOG =
       LogFactory.getLog(TestDistributedScheduling.class);
 
+  protected MiniYARNCluster cluster;
+  protected YarnClient rmClient;
+  protected ApplicationMasterProtocol client;
+  protected Configuration conf;
+  protected Configuration yarnConf;
+  protected ApplicationAttemptId attemptId;
+  protected ApplicationId appId;
+
+  @Before
+  public void doBefore() throws Exception {
+    cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
+
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+    cluster.init(conf);
+    cluster.start();
+    yarnConf = cluster.getConfig();
+
+    // the client has to connect to AMRMProxy
+    yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+    rmClient = YarnClient.createYarnClient();
+    rmClient.init(yarnConf);
+    rmClient.start();
+
+    // Submit application
+    attemptId = createApp(rmClient, cluster, conf);
+    appId = attemptId.getApplicationId();
+    client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+  }
+
+  @After
+  public void doAfter() throws Exception {
+    if (client != null) {
+      try {
+        client.finishApplicationMaster(FinishApplicationMasterRequest
+            .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+        rmClient.killApplication(attemptId.getApplicationId());
+        attemptId = null;
+      } catch (Exception e) {
+      }
+    }
+    if (rmClient != null) {
+      try {
+        rmClient.stop();
+      } catch (Exception e) {
+      }
+    }
+    if (cluster != null) {
+      try {
+        cluster.stop();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+
   /**
    * Validates if Allocate Requests containing only OPPORTUNISTIC container
    * requests are satisfied instantly.
@@ -70,104 +151,63 @@ public class TestDistributedScheduling extends TestAMRMProxy {
    */
   @Test(timeout = 60000)
   public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
-    MiniYARNCluster cluster =
-        new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
-    YarnClient rmClient = null;
-    ApplicationMasterProtocol client;
-
-    try {
-      Configuration conf = new YarnConfiguration();
-      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
-      cluster.init(conf);
-      cluster.start();
-      final Configuration yarnConf = cluster.getConfig();
-
-      // the client has to connect to AMRMProxy
-
-      yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-      rmClient = YarnClient.createYarnClient();
-      rmClient.init(yarnConf);
-      rmClient.start();
-
-      // Submit application
-
-      ApplicationId appId = createApp(rmClient, cluster);
-
-      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-
-      LOG.info("testDistributedSchedulingE2E - Register");
-
-      RegisterApplicationMasterResponse responseRegister =
-          client.registerApplicationMaster(RegisterApplicationMasterRequest
-              .newInstance(NetUtils.getHostname(), 1024, ""));
-
-      Assert.assertNotNull(responseRegister);
-      Assert.assertNotNull(responseRegister.getQueue());
-      Assert.assertNotNull(responseRegister.getApplicationACLs());
-      Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
-      Assert
-          .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
-      Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
-      Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
-      RMApp rmApp =
-          cluster.getResourceManager().getRMContext().getRMApps().get(appId);
-      Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
-      LOG.info("testDistributedSchedulingE2E - Allocate");
-
-      AllocateRequest request =
-          createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-
-      // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
-      // everything else
-      List<ResourceRequest> newAskList = new ArrayList<>();
-      for (ResourceRequest rr : request.getAskList()) {
-        if (ResourceRequest.ANY.equals(rr.getResourceName())) {
-          ResourceRequest newRR = ResourceRequest.newInstance(rr
-                  .getPriority(), rr.getResourceName(),
-              rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
-              rr.getNodeLabelExpression(),
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true));
-          newAskList.add(newRR);
-        }
-      }
-      request.setAskList(newAskList);
-
-      AllocateResponse allocResponse = client.allocate(request);
-      Assert.assertNotNull(allocResponse);
-
-      // Ensure that all the requests are satisfied immediately
-      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-      // Verify that the allocated containers are OPPORTUNISTIC
-      for (Container allocatedContainer : allocResponse
-          .getAllocatedContainers()) {
-        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-            .newContainerTokenIdentifier(
-                allocatedContainer.getContainerToken());
-        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-            containerTokenIdentifier.getExecutionType());
-      }
-
-      LOG.info("testDistributedSchedulingE2E - Finish");
-
-      FinishApplicationMasterResponse responseFinish =
-          client.finishApplicationMaster(FinishApplicationMasterRequest
-              .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
-
-      Assert.assertNotNull(responseFinish);
-
-    } finally {
-      if (rmClient != null) {
-        rmClient.stop();
+    LOG.info("testDistributedSchedulingE2E - Register");
+
+    RegisterApplicationMasterResponse responseRegister =
+        client.registerApplicationMaster(RegisterApplicationMasterRequest
+            .newInstance(NetUtils.getHostname(), 1024, ""));
+
+    Assert.assertNotNull(responseRegister);
+    Assert.assertNotNull(responseRegister.getQueue());
+    Assert.assertNotNull(responseRegister.getApplicationACLs());
+    Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+    Assert
+        .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+    Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+    Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+    RMApp rmApp =
+        cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+    LOG.info("testDistributedSchedulingE2E - Allocate");
+
+    AllocateRequest request =
+        createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+
+    // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
+    // everything else
+    List<ResourceRequest> newAskList = new ArrayList<>();
+    for (ResourceRequest rr : request.getAskList()) {
+      if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+        ResourceRequest newRR = ResourceRequest.newInstance(rr
+                .getPriority(), rr.getResourceName(),
+            rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+            rr.getNodeLabelExpression(),
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true));
+        newAskList.add(newRR);
       }
-      cluster.stop();
     }
+    request.setAskList(newAskList);
+
+    AllocateResponse allocResponse = client.allocate(request);
+    Assert.assertNotNull(allocResponse);
+
+    // Ensure that all the requests are satisfied immediately
+    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+    // Verify that the allocated containers are OPPORTUNISTIC
+    for (Container allocatedContainer : allocResponse
+        .getAllocatedContainers()) {
+      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+          .newContainerTokenIdentifier(
+              allocatedContainer.getContainerToken());
+      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+          containerTokenIdentifier.getExecutionType());
+    }
+
+    LOG.info("testDistributedSchedulingE2E - Finish");
   }
 
   /**
@@ -178,135 +218,305 @@ public class TestDistributedScheduling extends TestAMRMProxy {
    */
   @Test(timeout = 60000)
   public void testMixedExecutionTypeRequestE2E() throws Exception {
-    MiniYARNCluster cluster =
-        new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
-    YarnClient rmClient = null;
-    ApplicationMasterProtocol client;
+    LOG.info("testDistributedSchedulingE2E - Register");
+
+    RegisterApplicationMasterResponse responseRegister =
+        client.registerApplicationMaster(RegisterApplicationMasterRequest
+            .newInstance(NetUtils.getHostname(), 1024, ""));
+
+    Assert.assertNotNull(responseRegister);
+    Assert.assertNotNull(responseRegister.getQueue());
+    Assert.assertNotNull(responseRegister.getApplicationACLs());
+    Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+    Assert
+        .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+    Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+    Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+    RMApp rmApp =
+        cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+    LOG.info("testDistributedSchedulingE2E - Allocate");
+
+    AllocateRequest request =
+        createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+    List<ResourceRequest> askList = request.getAskList();
+    List<ResourceRequest> newAskList = new ArrayList<>(askList);
+
+    // Duplicate all ANY requests marking them as opportunistic
+    for (ResourceRequest rr : askList) {
+      if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+        ResourceRequest newRR = ResourceRequest.newInstance(rr
+                .getPriority(), rr.getResourceName(),
+            rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+            rr.getNodeLabelExpression(),
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true));
+        newAskList.add(newRR);
+      }
+    }
+    request.setAskList(newAskList);
+
+    AllocateResponse allocResponse = client.allocate(request);
+    Assert.assertNotNull(allocResponse);
+
+    // Ensure that all the requests are satisfied immediately
+    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+    // Verify that the allocated containers are OPPORTUNISTIC
+    for (Container allocatedContainer : allocResponse
+        .getAllocatedContainers()) {
+      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+          .newContainerTokenIdentifier(
+              allocatedContainer.getContainerToken());
+      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+          containerTokenIdentifier.getExecutionType());
+    }
+
+    request.setAskList(new ArrayList<ResourceRequest>());
+    request.setResponseId(request.getResponseId() + 1);
 
+    Thread.sleep(1000);
+
+    // RM should allocate GUARANTEED containers within 2 calls to allocate()
+    allocResponse = client.allocate(request);
+    Assert.assertNotNull(allocResponse);
+    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+    // Verify that the allocated containers are GUARANTEED
+    for (Container allocatedContainer : allocResponse
+        .getAllocatedContainers()) {
+      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+          .newContainerTokenIdentifier(
+              allocatedContainer.getContainerToken());
+      Assert.assertEquals(ExecutionType.GUARANTEED,
+          containerTokenIdentifier.getExecutionType());
+    }
+
+    LOG.info("testDistributedSchedulingE2E - Finish");
+  }
+
+  /**
+   * Validates if AMRMClient can be used with Distributed Scheduling turned on.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  @SuppressWarnings("unchecked")
+  public void testAMRMClient() throws Exception {
+    AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
     try {
-      Configuration conf = new YarnConfiguration();
-      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
-      cluster.init(conf);
-      cluster.start();
-      final Configuration yarnConf = cluster.getConfig();
-
-      // the client has to connect to AMRMProxy
-
-      yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-      rmClient = YarnClient.createYarnClient();
-      rmClient.init(yarnConf);
-      rmClient.start();
-
-      // Submit application
-
-      ApplicationId appId = createApp(rmClient, cluster);
-
-      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-
-      LOG.info("testDistributedSchedulingE2E - Register");
-
-      RegisterApplicationMasterResponse responseRegister =
-          client.registerApplicationMaster(RegisterApplicationMasterRequest
-              .newInstance(NetUtils.getHostname(), 1024, ""));
-
-      Assert.assertNotNull(responseRegister);
-      Assert.assertNotNull(responseRegister.getQueue());
-      Assert.assertNotNull(responseRegister.getApplicationACLs());
-      Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
-      Assert
-          .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
-      Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
-      Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
-      RMApp rmApp =
-          cluster.getResourceManager().getRMContext().getRMApps().get(appId);
-      Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
-      LOG.info("testDistributedSchedulingE2E - Allocate");
-
-      AllocateRequest request =
-          createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-      List<ResourceRequest> askList = request.getAskList();
-      List<ResourceRequest> newAskList = new ArrayList<>(askList);
-
-      // Duplicate all ANY requests marking them as opportunistic
-      for (ResourceRequest rr : askList) {
-        if (ResourceRequest.ANY.equals(rr.getResourceName())) {
-          ResourceRequest newRR = ResourceRequest.newInstance(rr
-              .getPriority(), rr.getResourceName(),
-              rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
-              rr.getNodeLabelExpression(),
+      Priority priority = Priority.newInstance(1);
+      Priority priority2 = Priority.newInstance(2);
+      Resource capability = Resource.newInstance(1024, 1);
+
+      List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
+      String node = nodeReports.get(0).getNodeId().getHost();
+      String rack = nodeReports.get(0).getRackName();
+      String[] nodes = new String[]{node};
+      String[] racks = new String[]{rack};
+
+      // start am rm client
+      amClient = new AMRMClientImpl(client);
+      amClient.init(yarnConf);
+      amClient.start();
+      amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
+
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, null, null, priority2,
+              true, null,
+              ExecutionTypeRequest.newInstance(
+                  ExecutionType.OPPORTUNISTIC, true)));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, null, null, priority2,
+              true, null,
+              ExecutionTypeRequest.newInstance(
+                  ExecutionType.OPPORTUNISTIC, true)));
+
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, null, null, priority2,
+              true, null,
               ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true));
-          newAskList.add(newRR);
+                  ExecutionType.OPPORTUNISTIC, true)));
+
+      int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
+          node, ExecutionType.GUARANTEED, capability).remoteRequest
+          .getNumContainers();
+      int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
+          rack, ExecutionType.GUARANTEED, capability).remoteRequest
+          .getNumContainers();
+      int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          .remoteRequest.getNumContainers();
+      int oppContainersRequestedAny =
+          amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
+              ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+              .getNumContainers();
+
+      assertEquals(2, containersRequestedNode);
+      assertEquals(2, containersRequestedRack);
+      assertEquals(2, containersRequestedAny);
+      assertEquals(1, oppContainersRequestedAny);
+
+      assertEquals(4, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      // RM should allocate container within 2 calls to allocate()
+      int allocatedContainerCount = 0;
+      int iterationsLeft = 10;
+      Set<ContainerId> releases = new TreeSet<>();
+
+      amClient.getNMTokenCache().clearCache();
+      Assert.assertEquals(0,
+          amClient.getNMTokenCache().numberOfTokensInCache());
+      HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+      while (allocatedContainerCount <
+          (containersRequestedAny + oppContainersRequestedAny)
+          && iterationsLeft-- > 0) {
+        AllocateResponse allocResponse = amClient.allocate(0.1f);
+        assertEquals(0, amClient.ask.size());
+        assertEquals(0, amClient.release.size());
+
+        allocatedContainerCount += allocResponse.getAllocatedContainers()
+            .size();
+        for (Container container : allocResponse.getAllocatedContainers()) {
+          ContainerId rejectContainerId = container.getId();
+          releases.add(rejectContainerId);
         }
-      }
-      request.setAskList(newAskList);
-
-      AllocateResponse allocResponse = client.allocate(request);
-      Assert.assertNotNull(allocResponse);
-
-      // Ensure that all the requests are satisfied immediately
-      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-      // Verify that the allocated containers are OPPORTUNISTIC
-      for (Container allocatedContainer : allocResponse
-          .getAllocatedContainers()) {
-        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-            .newContainerTokenIdentifier(
-            allocatedContainer.getContainerToken());
-        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-            containerTokenIdentifier.getExecutionType());
-      }
-
-      request.setAskList(new ArrayList<ResourceRequest>());
-      request.setResponseId(request.getResponseId() + 1);
-
-      Thread.sleep(1000);
 
-      // RM should allocate GUARANTEED containers within 2 calls to allocate()
-      allocResponse = client.allocate(request);
-      Assert.assertNotNull(allocResponse);
-      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+        for (NMToken token : allocResponse.getNMTokens()) {
+          String nodeID = token.getNodeId().toString();
+          receivedNMTokens.put(nodeID, token.getToken());
+        }
 
-      // Verify that the allocated containers are GUARANTEED
-      for (Container allocatedContainer : allocResponse
-          .getAllocatedContainers()) {
-        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-            .newContainerTokenIdentifier(
-                allocatedContainer.getContainerToken());
-        Assert.assertEquals(ExecutionType.GUARANTEED,
-            containerTokenIdentifier.getExecutionType());
+        if (allocatedContainerCount < containersRequestedAny) {
+          // sleep to let NM's heartbeat to RM and trigger allocations
+          sleep(100);
+        }
       }
 
-      LOG.info("testDistributedSchedulingE2E - Finish");
+      assertEquals(allocatedContainerCount,
+          containersRequestedAny + oppContainersRequestedAny);
+      for (ContainerId rejectContainerId : releases) {
+        amClient.releaseAssignedContainer(rejectContainerId);
+      }
+      assertEquals(3, amClient.release.size());
+      assertEquals(0, amClient.ask.size());
+
+      // need to tell the AMRMClient that we dont need these resources anymore
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
+              true, null,
+              ExecutionTypeRequest.newInstance(
+                  ExecutionType.OPPORTUNISTIC, true)));
+      assertEquals(4, amClient.ask.size());
+
+      // test RPC exception handling
+      amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
+          nodes, racks, priority));
+      amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
+          nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
+              true, null,
+              ExecutionTypeRequest.newInstance(
+                  ExecutionType.OPPORTUNISTIC, true)));
+
+      final AMRMClient amc = amClient;
+      ApplicationMasterProtocol realRM = amClient.rmClient;
+      try {
+        ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
+            .class);
+        when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+            new Answer<AllocateResponse>() {
+              public AllocateResponse answer(InvocationOnMock invocation)
+                  throws Exception {
+                amc.removeContainerRequest(
+                    new AMRMClient.ContainerRequest(capability, nodes,
+                        racks, priority));
+                amc.removeContainerRequest(
+                    new AMRMClient.ContainerRequest(capability, nodes, racks,
+                        priority));
+                amc.removeContainerRequest(
+                    new AMRMClient.ContainerRequest(capability, null, null,
+                        priority2, true, null,
+                        ExecutionTypeRequest.newInstance(
+                            ExecutionType.OPPORTUNISTIC, true)));
+                throw new Exception();
+              }
+            });
+        amClient.rmClient = mockRM;
+        amClient.allocate(0.1f);
+      } catch (Exception ioe) {
+      } finally {
+        amClient.rmClient = realRM;
+      }
 
-      FinishApplicationMasterResponse responseFinish =
-          client.finishApplicationMaster(FinishApplicationMasterRequest
-              .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+      assertEquals(3, amClient.release.size());
+      assertEquals(6, amClient.ask.size());
+
+      iterationsLeft = 3;
+      // do a few iterations to ensure RM is not going send new containers
+      while (iterationsLeft-- > 0) {
+        // inform RM of rejection
+        AllocateResponse allocResponse = amClient.allocate(0.1f);
+        // RM did not send new containers because AM does not need any
+        assertEquals(0, allocResponse.getAllocatedContainers().size());
+        if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+          for (ContainerStatus cStatus : allocResponse
+              .getCompletedContainersStatuses()) {
+            if (releases.contains(cStatus.getContainerId())) {
+              assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+              assertEquals(-100, cStatus.getExitStatus());
+              releases.remove(cStatus.getContainerId());
+            }
+          }
+        }
+        if (iterationsLeft > 0) {
+          // sleep to make sure NM's heartbeat
+          sleep(100);
+        }
+      }
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
 
-      Assert.assertNotNull(responseFinish);
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
 
     } finally {
-      if (rmClient != null) {
-        rmClient.stop();
+      if (amClient != null && amClient.getServiceState() == Service.STATE
+          .STARTED) {
+        amClient.stop();
       }
-      cluster.stop();
     }
   }
 
-  @Ignore
-  @Override
-  public void testAMRMProxyE2E() throws Exception { }
-
-  @Ignore
-  @Override
-  public void testE2ETokenRenewal() throws Exception { }
-
-  @Ignore
-  @Override
-  public void testE2ETokenSwap() throws Exception { }
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index cd04130..969fb70 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -251,9 +252,9 @@ public class TestNMClient {
           racks, priority));
     }
 
-    int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
-        .get(ResourceRequest.ANY).get(capability).remoteRequest
-        .getNumContainers();
+    int containersRequestedAny = rmClient.remoteRequestsTable.get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        .remoteRequest.getNumContainers();
 
     // RM should allocate container within 2 calls to allocate()
     int allocatedContainerCount = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index fd56f4f..b0c4b97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -214,7 +214,8 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
         + ", # Containers: " + getNumContainers()
         + ", Location: " + getResourceName()
         + ", Relax Locality: " + getRelaxLocality()
-        + ", Execution Spec: " + getExecutionTypeRequest() + "}";
+        + ", Execution Type Request: " + getExecutionTypeRequest()
+        + ", Node Label Expression: " + getNodeLabelExpression() + "}";
   }
 
   @Override
@@ -235,4 +236,4 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
     }
     builder.setNodeLabelExpression(nodeLabelExpression);
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org