You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/09/28 20:13:13 UTC
[1/2] hadoop git commit: YARN-6955. Handle concurrent register AM
requests in FederationInterceptor. (Botong Huang via Subru).
Repository: hadoop
Updated Branches:
refs/heads/branch-2 913a64e4c -> ffcf5ba1c
YARN-6955. Handle concurrent register AM requests in FederationInterceptor. (Botong Huang via Subru).
(cherry picked from commit c61f2c419830e40bb47fb2b1fe1f7d6109ed29a9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6da014f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6da014f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6da014f
Branch: refs/heads/branch-2
Commit: d6da014f6783694bf6d9e77b2afd75cf99680de0
Parents: 913a64e
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Aug 7 16:58:29 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Sep 28 13:11:19 2017 -0700
----------------------------------------------------------------------
.../dev-support/findbugs-exclude.xml | 4 +-
.../yarn/server/MockResourceManagerFacade.java | 18 ++--
.../amrmproxy/FederationInterceptor.java | 43 ++++------
.../amrmproxy/TestFederationInterceptor.java | 88 ++++++++++++++++++--
4 files changed, 110 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6da014f/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 73f1038..2664cd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -567,11 +567,9 @@
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>
- <!-- Ignore false alert for RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE -->
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor" />
- <Method name="registerApplicationMaster" />
- <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6da014f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 68c55ac..e33d7e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -246,6 +246,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
shouldReRegisterNext = false;
+ synchronized (applicationContainerIdMap) {
+ if (applicationContainerIdMap.containsKey(amrmToken)) {
+ throw new InvalidApplicationMasterRequestException(
+ AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+ }
+ // Keep track of the containers that are returned to this application
+ applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
+ }
+
+ // Make sure we wait for certain test cases last in the method
synchronized (syncObj) {
syncObj.notifyAll();
// We reuse the port number to indicate whether the unit test want us to
@@ -261,14 +271,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
}
- synchronized (applicationContainerIdMap) {
- if (applicationContainerIdMap.containsKey(amrmToken)) {
- throw new InvalidApplicationMasterRequestException(
- AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
- }
- // Keep track of the containers that are returned to this application
- applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
- }
return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
null, request.getHost(), null);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6da014f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index ffe47f4..28724aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -208,22 +208,25 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* requests from AM because of timeout between AM and AMRMProxy, which is
* shorter than the timeout + failOver between FederationInterceptor
* (AMRMProxy) and RM.
+ *
+ * For the same reason, this method needs to be synchronized.
*/
@Override
- public RegisterApplicationMasterResponse registerApplicationMaster(
- RegisterApplicationMasterRequest request)
- throws YarnException, IOException {
+ public synchronized RegisterApplicationMasterResponse
+ registerApplicationMaster(RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
// If AM is calling with a different request, complain
- if (this.amRegistrationRequest != null
- && !this.amRegistrationRequest.equals(request)) {
- throw new YarnException("A different request body recieved. AM should"
- + " not call registerApplicationMaster with different request body");
+ if (this.amRegistrationRequest != null) {
+ if (!this.amRegistrationRequest.equals(request)) {
+ throw new YarnException("AM should not call "
+ + "registerApplicationMaster with a different request body");
+ }
+ } else {
+ // Save the registration request. This will be used for registering with
+ // secondary sub-clusters using UAMs, as well as re-register later
+ this.amRegistrationRequest = request;
}
- // Save the registration request. This will be used for registering with
- // secondary sub-clusters using UAMs, as well as re-register later
- this.amRegistrationRequest = request;
-
/*
* Present to AM as if we are the RM that never fails over. When actual RM
* fails over, we always re-register automatically.
@@ -245,22 +248,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* is running and will breaks the elasticity feature. The registration with
* the other sub-cluster RM will be done lazily as needed later.
*/
- try {
- this.amRegistrationResponse =
- this.homeRM.registerApplicationMaster(request);
- } catch (InvalidApplicationMasterRequestException e) {
- if (e.getMessage()
- .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
- // Some other register thread might have succeeded in the meantime
- if (this.amRegistrationResponse != null) {
- LOG.info("Other concurrent thread registered successfully, "
- + "simply return the same success register response");
- return this.amRegistrationResponse;
- }
- }
- // This is a real issue, throw back to AM
- throw e;
- }
+ this.amRegistrationResponse =
+ this.homeRM.registerApplicationMaster(request);
// the queue this application belongs will be used for getting
// AMRMProxy policy from state store.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6da014f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 4e15323..34b0741 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
@@ -234,7 +240,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
@@ -298,7 +304,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
@@ -338,6 +344,78 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
Assert.assertEquals(true, finshResponse.getIsUnregistered());
}
+ /*
+ * Test concurrent register threads. This is possible because the timeout
+ * between AM and AMRMProxy is shorter than the timeout + failOver between
+ * FederationInterceptor (AMRMProxy) and RM. When first call is blocked due to
+ * RM failover and AM timeout, it will call us resulting in a second register
+ * thread.
+ */
+ @Test(timeout = 5000)
+ public void testConcurrentRegister()
+ throws InterruptedException, ExecutionException {
+ ExecutorService threadpool = Executors.newCachedThreadPool();
+ ExecutorCompletionService<RegisterApplicationMasterResponse> compSvc =
+ new ExecutorCompletionService<>(threadpool);
+
+ Object syncObj = MockResourceManagerFacade.getSyncObj();
+
+ // Two register threads
+ synchronized (syncObj) {
+ // Make sure first thread will block within RM, before the second thread
+ // starts
+ LOG.info("Starting first register thread");
+ compSvc.submit(new ConcurrentRegisterAMCallable());
+
+ try {
+ LOG.info("Test main starts waiting for the first thread to block");
+ syncObj.wait();
+ LOG.info("Test main wait finished");
+ } catch (Exception e) {
+ LOG.info("Test main wait interrupted", e);
+ }
+ }
+
+ // The second thread will get already registered exception from RM.
+ LOG.info("Starting second register thread");
+ compSvc.submit(new ConcurrentRegisterAMCallable());
+
+ // Notify the first register thread to return
+ LOG.info("Let first blocked register thread move on");
+ synchronized (syncObj) {
+ syncObj.notifyAll();
+ }
+
+ // Both thread should return without exception
+ RegisterApplicationMasterResponse response = compSvc.take().get();
+ Assert.assertNotNull(response);
+
+ response = compSvc.take().get();
+ Assert.assertNotNull(response);
+
+ threadpool.shutdown();
+ }
+
+ /**
+ * A callable that calls registerAM to RM with blocking.
+ */
+ public class ConcurrentRegisterAMCallable
+ implements Callable<RegisterApplicationMasterResponse> {
+ @Override
+ public RegisterApplicationMasterResponse call() throws Exception {
+ RegisterApplicationMasterResponse response = null;
+ try {
+ // Use port number 1001 to let mock RM block in the register call
+ response = interceptor.registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 1001, null));
+ } catch (Exception e) {
+ LOG.info("Register thread exception", e);
+ response = null;
+ }
+ return response;
+ }
+ }
+
@Test
public void testRequestInterceptorChainCreation() throws Exception {
RequestInterceptor root =
@@ -381,7 +459,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
for (int i = 0; i < 2; i++) {
@@ -397,7 +475,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
@@ -407,7 +485,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
// Register the application second time with a different request obj
registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("different");
try {
registerResponse = interceptor.registerApplicationMaster(registerReq);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-6962. Add support for updateContainers
when allocating using FederationInterceptor. (Botong Huang via Subru).
Posted by su...@apache.org.
YARN-6962. Add support for updateContainers when allocating using FederationInterceptor. (Botong Huang via Subru).
(cherry picked from commit ca669f9f8bc7abe5b7d4648c589aa1756bd336d1)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffcf5ba1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffcf5ba1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffcf5ba1
Branch: refs/heads/branch-2
Commit: ffcf5ba1cec5cc0e1d805144d8514b69585221c0
Parents: d6da014
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Sep 28 13:04:03 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Sep 28 13:11:32 2017 -0700
----------------------------------------------------------------------
.../amrmproxy/FederationInterceptor.java | 86 +++++++++++++-------
.../amrmproxy/TestFederationInterceptor.java | 54 ++++++++++++
2 files changed, 111 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffcf5ba1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 28724aa..33cfca3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -540,30 +540,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
- if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
- request.getResourceBlacklistRequest().getBlacklistAdditions())) {
- for (String resourceName : request.getResourceBlacklistRequest()
- .getBlacklistAdditions()) {
- SubClusterId subClusterId = getSubClusterForNode(resourceName);
- if (subClusterId != null) {
- AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
- subClusterId, request, requestMap);
- newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
- .add(resourceName);
+ if (request.getResourceBlacklistRequest() != null) {
+ if (!isNullOrEmpty(
+ request.getResourceBlacklistRequest().getBlacklistAdditions())) {
+ for (String resourceName : request.getResourceBlacklistRequest()
+ .getBlacklistAdditions()) {
+ SubClusterId subClusterId = getSubClusterForNode(resourceName);
+ if (subClusterId != null) {
+ AllocateRequest newRequest =
+ findOrCreateAllocateRequestForSubCluster(subClusterId, request,
+ requestMap);
+ newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+ .add(resourceName);
+ }
}
}
- }
-
- if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
- request.getResourceBlacklistRequest().getBlacklistRemovals())) {
- for (String resourceName : request.getResourceBlacklistRequest()
- .getBlacklistRemovals()) {
- SubClusterId subClusterId = getSubClusterForNode(resourceName);
- if (subClusterId != null) {
- AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
- subClusterId, request, requestMap);
- newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
- .add(resourceName);
+ if (!isNullOrEmpty(
+ request.getResourceBlacklistRequest().getBlacklistRemovals())) {
+ for (String resourceName : request.getResourceBlacklistRequest()
+ .getBlacklistRemovals()) {
+ SubClusterId subClusterId = getSubClusterForNode(resourceName);
+ if (subClusterId != null) {
+ AllocateRequest newRequest =
+ findOrCreateAllocateRequestForSubCluster(subClusterId, request,
+ requestMap);
+ newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+ .add(resourceName);
+ }
}
}
}
@@ -896,13 +899,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
- if (!isNullOrEmpty(otherResponse.getNMTokens())) {
- if (!isNullOrEmpty(homeResponse.getNMTokens())) {
- homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
- } else {
- homeResponse.setNMTokens(otherResponse.getNMTokens());
- }
- }
+ homeResponse.setNumClusterNodes(
+ homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes());
PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
@@ -935,6 +933,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
spar1.getContainers().addAll(spar2.getContainers());
}
}
+
+ if (!isNullOrEmpty(otherResponse.getNMTokens())) {
+ if (!isNullOrEmpty(homeResponse.getNMTokens())) {
+ homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
+ } else {
+ homeResponse.setNMTokens(otherResponse.getNMTokens());
+ }
+ }
+
+ if (!isNullOrEmpty(otherResponse.getUpdatedContainers())) {
+ if (!isNullOrEmpty(homeResponse.getUpdatedContainers())) {
+ homeResponse.getUpdatedContainers()
+ .addAll(otherResponse.getUpdatedContainers());
+ } else {
+ homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers());
+ }
+ }
+
+ if (!isNullOrEmpty(otherResponse.getUpdateErrors())) {
+ if (!isNullOrEmpty(homeResponse.getUpdateErrors())) {
+ homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors());
+ } else {
+ homeResponse.setUpdateErrors(otherResponse.getUpdateErrors());
+ }
+ }
}
/**
@@ -1052,6 +1075,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return this.uamPool.getAllUAMIds().size();
}
+ @VisibleForTesting
+ public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
+ return this.asyncResponseSink;
+ }
+
/**
* Private structure for encapsulating SubClusterId and
* RegisterApplicationMasterResponse instances.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffcf5ba1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 34b0741..3db0e35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
@@ -36,8 +38,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -493,4 +502,49 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
} catch (YarnException e) {
}
}
+
+ @Test
+ public void testAllocateResponse() throws Exception {
+ interceptor.registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 0, null));
+ AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+
+ Map<SubClusterId, List<AllocateResponse>> asyncResponseSink =
+ interceptor.getAsyncResponseSink();
+
+ ContainerId cid = ContainerId.newContainerId(attemptId, 0);
+ ContainerStatus cStatus = Records.newRecord(ContainerStatus.class);
+ cStatus.setContainerId(cid);
+ Container container =
+ Container.newInstance(cid, null, null, null, null, null);
+
+ AllocateResponse response = Records.newRecord(AllocateResponse.class);
+ response.setAllocatedContainers(Collections.singletonList(container));
+ response.setCompletedContainersStatuses(Collections.singletonList(cStatus));
+ response.setUpdatedNodes(
+ Collections.singletonList(Records.newRecord(NodeReport.class)));
+ response.setNMTokens(
+ Collections.singletonList(Records.newRecord(NMToken.class)));
+ response.setUpdatedContainers(
+ Collections.singletonList(Records.newRecord(UpdatedContainer.class)));
+ response.setUpdateErrors(Collections
+ .singletonList(Records.newRecord(UpdateContainerError.class)));
+ response.setAvailableResources(Records.newRecord(Resource.class));
+ response.setPreemptionMessage(Records.newRecord(PreemptionMessage.class));
+
+ List<AllocateResponse> list = new ArrayList<>();
+ list.add(response);
+ asyncResponseSink.put(SubClusterId.newInstance("SC-1"), list);
+
+ response = interceptor.allocate(allocateRequest);
+
+ Assert.assertEquals(1, response.getAllocatedContainers().size());
+ Assert.assertNotNull(response.getAvailableResources());
+ Assert.assertEquals(1, response.getCompletedContainersStatuses().size());
+ Assert.assertEquals(1, response.getUpdatedNodes().size());
+ Assert.assertNotNull(response.getPreemptionMessage());
+ Assert.assertEquals(1, response.getNMTokens().size());
+ Assert.assertEquals(1, response.getUpdatedContainers().size());
+ Assert.assertEquals(1, response.getUpdateErrors().size());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org