You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/06/13 09:28:00 UTC
[40/51] [abbrv] hadoop git commit: YARN-5124. Modify AMRMClient to
set the ExecutionType in the ResourceRequest. (asuresh)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index 6d93eb3..a556aa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,20 +22,31 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -43,12 +54,23 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Validates End2End Distributed Scheduling flow which includes the AM
@@ -57,11 +79,70 @@ import java.util.List;
* the NM and the DistributedSchedulingProtocol used by the framework to talk
* to the DistributedSchedulingService running on the RM.
*/
-public class TestDistributedScheduling extends TestAMRMProxy {
+public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
private static final Log LOG =
LogFactory.getLog(TestDistributedScheduling.class);
+ protected MiniYARNCluster cluster;
+ protected YarnClient rmClient;
+ protected ApplicationMasterProtocol client;
+ protected Configuration conf;
+ protected Configuration yarnConf;
+ protected ApplicationAttemptId attemptId;
+ protected ApplicationId appId;
+
+ @Before
+ public void doBefore() throws Exception {
+ cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
+
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+ cluster.init(conf);
+ cluster.start();
+ yarnConf = cluster.getConfig();
+
+ // the client has to connect to AMRMProxy
+ yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+ rmClient = YarnClient.createYarnClient();
+ rmClient.init(yarnConf);
+ rmClient.start();
+
+ // Submit application
+ attemptId = createApp(rmClient, cluster, conf);
+ appId = attemptId.getApplicationId();
+ client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+ }
+
+ @After
+ public void doAfter() throws Exception {
+ if (client != null) {
+ try {
+ client.finishApplicationMaster(FinishApplicationMasterRequest
+ .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+ rmClient.killApplication(attemptId.getApplicationId());
+ attemptId = null;
+ } catch (Exception e) {
+ }
+ }
+ if (rmClient != null) {
+ try {
+ rmClient.stop();
+ } catch (Exception e) {
+ }
+ }
+ if (cluster != null) {
+ try {
+ cluster.stop();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+
/**
* Validates if Allocate Requests containing only OPPORTUNISTIC container
* requests are satisfied instantly.
@@ -70,104 +151,63 @@ public class TestDistributedScheduling extends TestAMRMProxy {
*/
@Test(timeout = 60000)
public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
- MiniYARNCluster cluster =
- new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
- YarnClient rmClient = null;
- ApplicationMasterProtocol client;
-
- try {
- Configuration conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
- conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
- cluster.init(conf);
- cluster.start();
- final Configuration yarnConf = cluster.getConfig();
-
- // the client has to connect to AMRMProxy
-
- yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
- rmClient = YarnClient.createYarnClient();
- rmClient.init(yarnConf);
- rmClient.start();
-
- // Submit application
-
- ApplicationId appId = createApp(rmClient, cluster);
-
- client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-
- LOG.info("testDistributedSchedulingE2E - Register");
-
- RegisterApplicationMasterResponse responseRegister =
- client.registerApplicationMaster(RegisterApplicationMasterRequest
- .newInstance(NetUtils.getHostname(), 1024, ""));
-
- Assert.assertNotNull(responseRegister);
- Assert.assertNotNull(responseRegister.getQueue());
- Assert.assertNotNull(responseRegister.getApplicationACLs());
- Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
- Assert
- .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
- Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
- Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
- RMApp rmApp =
- cluster.getResourceManager().getRMContext().getRMApps().get(appId);
- Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
- LOG.info("testDistributedSchedulingE2E - Allocate");
-
- AllocateRequest request =
- createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-
- // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
- // everything else
- List<ResourceRequest> newAskList = new ArrayList<>();
- for (ResourceRequest rr : request.getAskList()) {
- if (ResourceRequest.ANY.equals(rr.getResourceName())) {
- ResourceRequest newRR = ResourceRequest.newInstance(rr
- .getPriority(), rr.getResourceName(),
- rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(),
- ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true));
- newAskList.add(newRR);
- }
- }
- request.setAskList(newAskList);
-
- AllocateResponse allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
-
- // Ensure that all the requests are satisfied immediately
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
- // Verify that the allocated containers are OPPORTUNISTIC
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- containerTokenIdentifier.getExecutionType());
- }
-
- LOG.info("testDistributedSchedulingE2E - Finish");
-
- FinishApplicationMasterResponse responseFinish =
- client.finishApplicationMaster(FinishApplicationMasterRequest
- .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
-
- Assert.assertNotNull(responseFinish);
-
- } finally {
- if (rmClient != null) {
- rmClient.stop();
+ LOG.info("testDistributedSchedulingE2E - Register");
+
+ RegisterApplicationMasterResponse responseRegister =
+ client.registerApplicationMaster(RegisterApplicationMasterRequest
+ .newInstance(NetUtils.getHostname(), 1024, ""));
+
+ Assert.assertNotNull(responseRegister);
+ Assert.assertNotNull(responseRegister.getQueue());
+ Assert.assertNotNull(responseRegister.getApplicationACLs());
+ Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+ Assert
+ .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+ Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+ Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+ RMApp rmApp =
+ cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+ Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+ LOG.info("testDistributedSchedulingE2E - Allocate");
+
+ AllocateRequest request =
+ createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+
+ // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
+ // everything else
+ List<ResourceRequest> newAskList = new ArrayList<>();
+ for (ResourceRequest rr : request.getAskList()) {
+ if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+ ResourceRequest newRR = ResourceRequest.newInstance(rr
+ .getPriority(), rr.getResourceName(),
+ rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(),
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true));
+ newAskList.add(newRR);
}
- cluster.stop();
}
+ request.setAskList(newAskList);
+
+ AllocateResponse allocResponse = client.allocate(request);
+ Assert.assertNotNull(allocResponse);
+
+ // Ensure that all the requests are satisfied immediately
+ Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+ // Verify that the allocated containers are OPPORTUNISTIC
+ for (Container allocatedContainer : allocResponse
+ .getAllocatedContainers()) {
+ ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+ .newContainerTokenIdentifier(
+ allocatedContainer.getContainerToken());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ containerTokenIdentifier.getExecutionType());
+ }
+
+ LOG.info("testDistributedSchedulingE2E - Finish");
}
/**
@@ -178,135 +218,305 @@ public class TestDistributedScheduling extends TestAMRMProxy {
*/
@Test(timeout = 60000)
public void testMixedExecutionTypeRequestE2E() throws Exception {
- MiniYARNCluster cluster =
- new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
- YarnClient rmClient = null;
- ApplicationMasterProtocol client;
+ LOG.info("testDistributedSchedulingE2E - Register");
+
+ RegisterApplicationMasterResponse responseRegister =
+ client.registerApplicationMaster(RegisterApplicationMasterRequest
+ .newInstance(NetUtils.getHostname(), 1024, ""));
+
+ Assert.assertNotNull(responseRegister);
+ Assert.assertNotNull(responseRegister.getQueue());
+ Assert.assertNotNull(responseRegister.getApplicationACLs());
+ Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+ Assert
+ .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+ Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+ Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+ RMApp rmApp =
+ cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+ Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+ LOG.info("testDistributedSchedulingE2E - Allocate");
+
+ AllocateRequest request =
+ createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+ List<ResourceRequest> askList = request.getAskList();
+ List<ResourceRequest> newAskList = new ArrayList<>(askList);
+
+ // Duplicate all ANY requests marking them as opportunistic
+ for (ResourceRequest rr : askList) {
+ if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+ ResourceRequest newRR = ResourceRequest.newInstance(rr
+ .getPriority(), rr.getResourceName(),
+ rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+ rr.getNodeLabelExpression(),
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true));
+ newAskList.add(newRR);
+ }
+ }
+ request.setAskList(newAskList);
+
+ AllocateResponse allocResponse = client.allocate(request);
+ Assert.assertNotNull(allocResponse);
+
+ // Ensure that all the requests are satisfied immediately
+ Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+ // Verify that the allocated containers are OPPORTUNISTIC
+ for (Container allocatedContainer : allocResponse
+ .getAllocatedContainers()) {
+ ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+ .newContainerTokenIdentifier(
+ allocatedContainer.getContainerToken());
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+ containerTokenIdentifier.getExecutionType());
+ }
+
+ request.setAskList(new ArrayList<ResourceRequest>());
+ request.setResponseId(request.getResponseId() + 1);
+ Thread.sleep(1000);
+
+ // RM should allocate GUARANTEED containers within 2 calls to allocate()
+ allocResponse = client.allocate(request);
+ Assert.assertNotNull(allocResponse);
+ Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+ // Verify that the allocated containers are GUARANTEED
+ for (Container allocatedContainer : allocResponse
+ .getAllocatedContainers()) {
+ ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+ .newContainerTokenIdentifier(
+ allocatedContainer.getContainerToken());
+ Assert.assertEquals(ExecutionType.GUARANTEED,
+ containerTokenIdentifier.getExecutionType());
+ }
+
+ LOG.info("testDistributedSchedulingE2E - Finish");
+ }
+
+ /**
+ * Validates if AMRMClient can be used with Distributed Scheduling turned on.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 120000)
+ @SuppressWarnings("unchecked")
+ public void testAMRMClient() throws Exception {
+ AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
try {
- Configuration conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
- conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
- cluster.init(conf);
- cluster.start();
- final Configuration yarnConf = cluster.getConfig();
-
- // the client has to connect to AMRMProxy
-
- yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
- rmClient = YarnClient.createYarnClient();
- rmClient.init(yarnConf);
- rmClient.start();
-
- // Submit application
-
- ApplicationId appId = createApp(rmClient, cluster);
-
- client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-
- LOG.info("testDistributedSchedulingE2E - Register");
-
- RegisterApplicationMasterResponse responseRegister =
- client.registerApplicationMaster(RegisterApplicationMasterRequest
- .newInstance(NetUtils.getHostname(), 1024, ""));
-
- Assert.assertNotNull(responseRegister);
- Assert.assertNotNull(responseRegister.getQueue());
- Assert.assertNotNull(responseRegister.getApplicationACLs());
- Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
- Assert
- .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
- Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
- Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
- RMApp rmApp =
- cluster.getResourceManager().getRMContext().getRMApps().get(appId);
- Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
- LOG.info("testDistributedSchedulingE2E - Allocate");
-
- AllocateRequest request =
- createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
- List<ResourceRequest> askList = request.getAskList();
- List<ResourceRequest> newAskList = new ArrayList<>(askList);
-
- // Duplicate all ANY requests marking them as opportunistic
- for (ResourceRequest rr : askList) {
- if (ResourceRequest.ANY.equals(rr.getResourceName())) {
- ResourceRequest newRR = ResourceRequest.newInstance(rr
- .getPriority(), rr.getResourceName(),
- rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
- rr.getNodeLabelExpression(),
+ Priority priority = Priority.newInstance(1);
+ Priority priority2 = Priority.newInstance(2);
+ Resource capability = Resource.newInstance(1024, 1);
+
+ List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
+ String node = nodeReports.get(0).getNodeId().getHost();
+ String rack = nodeReports.get(0).getRackName();
+ String[] nodes = new String[]{node};
+ String[] racks = new String[]{rack};
+
+ // start am rm client
+ amClient = new AMRMClientImpl(client);
+ amClient.init(yarnConf);
+ amClient.start();
+ amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
+
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2,
+ true, null,
ExecutionTypeRequest.newInstance(
- ExecutionType.OPPORTUNISTIC, true));
- newAskList.add(newRR);
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
+ node, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
+ rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
+ int oppContainersRequestedAny =
+ amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(2, containersRequestedNode);
+ assertEquals(2, containersRequestedRack);
+ assertEquals(2, containersRequestedAny);
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int iterationsLeft = 10;
+ Set<ContainerId> releases = new TreeSet<>();
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+ while (allocatedContainerCount <
+ (containersRequestedAny + oppContainersRequestedAny)
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount += allocResponse.getAllocatedContainers()
+ .size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
}
- }
- request.setAskList(newAskList);
-
- AllocateResponse allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
-
- // Ensure that all the requests are satisfied immediately
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
- // Verify that the allocated containers are OPPORTUNISTIC
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
- containerTokenIdentifier.getExecutionType());
- }
-
- request.setAskList(new ArrayList<ResourceRequest>());
- request.setResponseId(request.getResponseId() + 1);
-
- Thread.sleep(1000);
- // RM should allocate GUARANTEED containers within 2 calls to allocate()
- allocResponse = client.allocate(request);
- Assert.assertNotNull(allocResponse);
- Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
- // Verify that the allocated containers are GUARANTEED
- for (Container allocatedContainer : allocResponse
- .getAllocatedContainers()) {
- ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
- .newContainerTokenIdentifier(
- allocatedContainer.getContainerToken());
- Assert.assertEquals(ExecutionType.GUARANTEED,
- containerTokenIdentifier.getExecutionType());
+ if (allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
}
- LOG.info("testDistributedSchedulingE2E - Finish");
+ assertEquals(allocatedContainerCount,
+ containersRequestedAny + oppContainersRequestedAny);
+ for (ContainerId rejectContainerId : releases) {
+ amClient.releaseAssignedContainer(rejectContainerId);
+ }
+ assertEquals(3, amClient.release.size());
+ assertEquals(0, amClient.ask.size());
+
+ // need to tell the AMRMClient that we dont need these resources anymore
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ assertEquals(4, amClient.ask.size());
+
+ // test RPC exception handling
+ amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
+ nodes, racks, priority));
+ amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
+ nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ final AMRMClient amc = amClient;
+ ApplicationMasterProtocol realRM = amClient.rmClient;
+ try {
+ ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
+ .class);
+ when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+ new Answer<AllocateResponse>() {
+ public AllocateResponse answer(InvocationOnMock invocation)
+ throws Exception {
+ amc.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes,
+ racks, priority));
+ amc.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks,
+ priority));
+ amc.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null,
+ priority2, true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ throw new Exception();
+ }
+ });
+ amClient.rmClient = mockRM;
+ amClient.allocate(0.1f);
+ } catch (Exception ioe) {
+ } finally {
+ amClient.rmClient = realRM;
+ }
- FinishApplicationMasterResponse responseFinish =
- client.finishApplicationMaster(FinishApplicationMasterRequest
- .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+ assertEquals(3, amClient.release.size());
+ assertEquals(6, amClient.ask.size());
+
+ iterationsLeft = 3;
+ // do a few iterations to ensure RM is not going send new containers
+ while (iterationsLeft-- > 0) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ assertEquals(0, allocResponse.getAllocatedContainers().size());
+ if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+ for (ContainerStatus cStatus : allocResponse
+ .getCompletedContainersStatuses()) {
+ if (releases.contains(cStatus.getContainerId())) {
+ assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+ assertEquals(-100, cStatus.getExitStatus());
+ releases.remove(cStatus.getContainerId());
+ }
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
- Assert.assertNotNull(responseFinish);
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
} finally {
- if (rmClient != null) {
- rmClient.stop();
+ if (amClient != null && amClient.getServiceState() == Service.STATE
+ .STARTED) {
+ amClient.stop();
}
- cluster.stop();
}
}
- @Ignore
- @Override
- public void testAMRMProxyE2E() throws Exception { }
-
- @Ignore
- @Override
- public void testE2ETokenRenewal() throws Exception { }
-
- @Ignore
- @Override
- public void testE2ETokenSwap() throws Exception { }
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index cd04130..969fb70 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -251,9 +252,9 @@ public class TestNMClient {
racks, priority));
}
- int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
- .get(ResourceRequest.ANY).get(capability).remoteRequest
- .getNumContainers();
+ int containersRequestedAny = rmClient.remoteRequestsTable.get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index fd56f4f..b0c4b97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -214,7 +214,8 @@ public class ResourceRequestPBImpl extends ResourceRequest {
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
- + ", Execution Spec: " + getExecutionTypeRequest() + "}";
+ + ", Execution Type Request: " + getExecutionTypeRequest()
+ + ", Node Label Expression: " + getNodeLabelExpression() + "}";
}
@Override
@@ -235,4 +236,4 @@ public class ResourceRequestPBImpl extends ResourceRequest {
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org