You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/09/28 20:13:13 UTC

[1/2] hadoop git commit: YARN-6955. Handle concurrent register AM requests in FederationInterceptor. (Botong Huang via Subru).

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 913a64e4c -> ffcf5ba1c


YARN-6955. Handle concurrent register AM requests in FederationInterceptor. (Botong Huang via Subru).

(cherry picked from commit c61f2c419830e40bb47fb2b1fe1f7d6109ed29a9)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6da014f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6da014f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6da014f

Branch: refs/heads/branch-2
Commit: d6da014f6783694bf6d9e77b2afd75cf99680de0
Parents: 913a64e
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Aug 7 16:58:29 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Sep 28 13:11:19 2017 -0700

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |  4 +-
 .../yarn/server/MockResourceManagerFacade.java  | 18 ++--
 .../amrmproxy/FederationInterceptor.java        | 43 ++++------
 .../amrmproxy/TestFederationInterceptor.java    | 88 ++++++++++++++++++--
 4 files changed, 110 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6da014f/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 73f1038..2664cd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -567,11 +567,9 @@
     <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
   </Match>
 
-  <!-- Ignore false alert for RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE -->
   <Match>
     <Class name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor" />
-    <Method name="registerApplicationMaster" />
-    <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE" />
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
 
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6da014f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 68c55ac..e33d7e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -246,6 +246,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
     shouldReRegisterNext = false;
 
+    synchronized (applicationContainerIdMap) {
+      if (applicationContainerIdMap.containsKey(amrmToken)) {
+        throw new InvalidApplicationMasterRequestException(
+            AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+      }
+      // Keep track of the containers that are returned to this application
+      applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
+    }
+
+    // Make sure we wait for certain test cases last in the method
     synchronized (syncObj) {
       syncObj.notifyAll();
       // We reuse the port number to indicate whether the unit test want us to
@@ -261,14 +271,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       }
     }
 
-    synchronized (applicationContainerIdMap) {
-      if (applicationContainerIdMap.containsKey(amrmToken)) {
-        throw new InvalidApplicationMasterRequestException(
-            AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
-      }
-      // Keep track of the containers that are returned to this application
-      applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
-    }
     return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
         null, request.getHost(), null);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6da014f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index ffe47f4..28724aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -208,22 +208,25 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    * requests from AM because of timeout between AM and AMRMProxy, which is
    * shorter than the timeout + failOver between FederationInterceptor
    * (AMRMProxy) and RM.
+   *
+   * For the same reason, this method needs to be synchronized.
    */
   @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster(
-      RegisterApplicationMasterRequest request)
-      throws YarnException, IOException {
+  public synchronized RegisterApplicationMasterResponse
+      registerApplicationMaster(RegisterApplicationMasterRequest request)
+          throws YarnException, IOException {
     // If AM is calling with a different request, complain
-    if (this.amRegistrationRequest != null
-        && !this.amRegistrationRequest.equals(request)) {
-      throw new YarnException("A different request body recieved. AM should"
-          + " not call registerApplicationMaster with different request body");
+    if (this.amRegistrationRequest != null) {
+      if (!this.amRegistrationRequest.equals(request)) {
+        throw new YarnException("AM should not call "
+            + "registerApplicationMaster with a different request body");
+      }
+    } else {
+      // Save the registration request. This will be used for registering with
+      // secondary sub-clusters using UAMs, as well as re-register later
+      this.amRegistrationRequest = request;
     }
 
-    // Save the registration request. This will be used for registering with
-    // secondary sub-clusters using UAMs, as well as re-register later
-    this.amRegistrationRequest = request;
-
     /*
      * Present to AM as if we are the RM that never fails over. When actual RM
      * fails over, we always re-register automatically.
@@ -245,22 +248,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
      * is running and will breaks the elasticity feature. The registration with
      * the other sub-cluster RM will be done lazily as needed later.
      */
-    try {
-      this.amRegistrationResponse =
-          this.homeRM.registerApplicationMaster(request);
-    } catch (InvalidApplicationMasterRequestException e) {
-      if (e.getMessage()
-          .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
-        // Some other register thread might have succeeded in the meantime
-        if (this.amRegistrationResponse != null) {
-          LOG.info("Other concurrent thread registered successfully, "
-              + "simply return the same success register response");
-          return this.amRegistrationResponse;
-        }
-      }
-      // This is a real issue, throw back to AM
-      throw e;
-    }
+    this.amRegistrationResponse =
+        this.homeRM.registerApplicationMaster(request);
 
     // the queue this application belongs will be used for getting
     // AMRMProxy policy from state store.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6da014f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 4e15323..34b0741 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
@@ -234,7 +240,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("");
 
     RegisterApplicationMasterResponse registerResponse =
@@ -298,7 +304,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("");
 
     RegisterApplicationMasterResponse registerResponse =
@@ -338,6 +344,78 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     Assert.assertEquals(true, finshResponse.getIsUnregistered());
   }
 
+  /*
+   * Test concurrent register threads. This is possible because the timeout
+   * between AM and AMRMProxy is shorter than the timeout + failOver between
+   * FederationInterceptor (AMRMProxy) and RM. When first call is blocked due to
+   * RM failover and AM timeout, it will call us resulting in a second register
+   * thread.
+   */
+  @Test(timeout = 5000)
+  public void testConcurrentRegister()
+      throws InterruptedException, ExecutionException {
+    ExecutorService threadpool = Executors.newCachedThreadPool();
+    ExecutorCompletionService<RegisterApplicationMasterResponse> compSvc =
+        new ExecutorCompletionService<>(threadpool);
+
+    Object syncObj = MockResourceManagerFacade.getSyncObj();
+
+    // Two register threads
+    synchronized (syncObj) {
+      // Make sure first thread will block within RM, before the second thread
+      // starts
+      LOG.info("Starting first register thread");
+      compSvc.submit(new ConcurrentRegisterAMCallable());
+
+      try {
+        LOG.info("Test main starts waiting for the first thread to block");
+        syncObj.wait();
+        LOG.info("Test main wait finished");
+      } catch (Exception e) {
+        LOG.info("Test main wait interrupted", e);
+      }
+    }
+
+    // The second thread will get already registered exception from RM.
+    LOG.info("Starting second register thread");
+    compSvc.submit(new ConcurrentRegisterAMCallable());
+
+    // Notify the first register thread to return
+    LOG.info("Let first blocked register thread move on");
+    synchronized (syncObj) {
+      syncObj.notifyAll();
+    }
+
+    // Both thread should return without exception
+    RegisterApplicationMasterResponse response = compSvc.take().get();
+    Assert.assertNotNull(response);
+
+    response = compSvc.take().get();
+    Assert.assertNotNull(response);
+
+    threadpool.shutdown();
+  }
+
+  /**
+   * A callable that calls registerAM to RM with blocking.
+   */
+  public class ConcurrentRegisterAMCallable
+      implements Callable<RegisterApplicationMasterResponse> {
+    @Override
+    public RegisterApplicationMasterResponse call() throws Exception {
+      RegisterApplicationMasterResponse response = null;
+      try {
+        // Use port number 1001 to let mock RM block in the register call
+        response = interceptor.registerApplicationMaster(
+            RegisterApplicationMasterRequest.newInstance(null, 1001, null));
+      } catch (Exception e) {
+        LOG.info("Register thread exception", e);
+        response = null;
+      }
+      return response;
+    }
+  }
+
   @Test
   public void testRequestInterceptorChainCreation() throws Exception {
     RequestInterceptor root =
@@ -381,7 +459,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("");
 
     for (int i = 0; i < 2; i++) {
@@ -397,7 +475,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterRequest registerReq =
         Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("");
 
     RegisterApplicationMasterResponse registerResponse =
@@ -407,7 +485,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     // Register the application second time with a different request obj
     registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
     registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(testAppId);
+    registerReq.setRpcPort(0);
     registerReq.setTrackingUrl("different");
     try {
       registerResponse = interceptor.registerApplicationMaster(registerReq);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-6962. Add support for updateContainers when allocating using FederationInterceptor. (Botong Huang via Subru).

Posted by su...@apache.org.
YARN-6962. Add support for updateContainers when allocating using FederationInterceptor. (Botong Huang via Subru).

(cherry picked from commit ca669f9f8bc7abe5b7d4648c589aa1756bd336d1)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ffcf5ba1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ffcf5ba1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ffcf5ba1

Branch: refs/heads/branch-2
Commit: ffcf5ba1cec5cc0e1d805144d8514b69585221c0
Parents: d6da014
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Sep 28 13:04:03 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Sep 28 13:11:32 2017 -0700

----------------------------------------------------------------------
 .../amrmproxy/FederationInterceptor.java        | 86 +++++++++++++-------
 .../amrmproxy/TestFederationInterceptor.java    | 54 ++++++++++++
 2 files changed, 111 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffcf5ba1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 28724aa..33cfca3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -540,30 +540,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
     }
 
-    if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
-        request.getResourceBlacklistRequest().getBlacklistAdditions())) {
-      for (String resourceName : request.getResourceBlacklistRequest()
-          .getBlacklistAdditions()) {
-        SubClusterId subClusterId = getSubClusterForNode(resourceName);
-        if (subClusterId != null) {
-          AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-              subClusterId, request, requestMap);
-          newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
-              .add(resourceName);
+    if (request.getResourceBlacklistRequest() != null) {
+      if (!isNullOrEmpty(
+          request.getResourceBlacklistRequest().getBlacklistAdditions())) {
+        for (String resourceName : request.getResourceBlacklistRequest()
+            .getBlacklistAdditions()) {
+          SubClusterId subClusterId = getSubClusterForNode(resourceName);
+          if (subClusterId != null) {
+            AllocateRequest newRequest =
+                findOrCreateAllocateRequestForSubCluster(subClusterId, request,
+                    requestMap);
+            newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+                .add(resourceName);
+          }
         }
       }
-    }
-
-    if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
-        request.getResourceBlacklistRequest().getBlacklistRemovals())) {
-      for (String resourceName : request.getResourceBlacklistRequest()
-          .getBlacklistRemovals()) {
-        SubClusterId subClusterId = getSubClusterForNode(resourceName);
-        if (subClusterId != null) {
-          AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
-              subClusterId, request, requestMap);
-          newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
-              .add(resourceName);
+      if (!isNullOrEmpty(
+          request.getResourceBlacklistRequest().getBlacklistRemovals())) {
+        for (String resourceName : request.getResourceBlacklistRequest()
+            .getBlacklistRemovals()) {
+          SubClusterId subClusterId = getSubClusterForNode(resourceName);
+          if (subClusterId != null) {
+            AllocateRequest newRequest =
+                findOrCreateAllocateRequestForSubCluster(subClusterId, request,
+                    requestMap);
+            newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+                .add(resourceName);
+          }
         }
       }
     }
@@ -896,13 +899,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
     }
 
-    if (!isNullOrEmpty(otherResponse.getNMTokens())) {
-      if (!isNullOrEmpty(homeResponse.getNMTokens())) {
-        homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
-      } else {
-        homeResponse.setNMTokens(otherResponse.getNMTokens());
-      }
-    }
+    homeResponse.setNumClusterNodes(
+        homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes());
 
     PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
     PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
@@ -935,6 +933,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         spar1.getContainers().addAll(spar2.getContainers());
       }
     }
+
+    if (!isNullOrEmpty(otherResponse.getNMTokens())) {
+      if (!isNullOrEmpty(homeResponse.getNMTokens())) {
+        homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
+      } else {
+        homeResponse.setNMTokens(otherResponse.getNMTokens());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getUpdatedContainers())) {
+      if (!isNullOrEmpty(homeResponse.getUpdatedContainers())) {
+        homeResponse.getUpdatedContainers()
+            .addAll(otherResponse.getUpdatedContainers());
+      } else {
+        homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getUpdateErrors())) {
+      if (!isNullOrEmpty(homeResponse.getUpdateErrors())) {
+        homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors());
+      } else {
+        homeResponse.setUpdateErrors(otherResponse.getUpdateErrors());
+      }
+    }
   }
 
   /**
@@ -1052,6 +1075,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     return this.uamPool.getAllUAMIds().size();
   }
 
+  @VisibleForTesting
+  public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
+    return this.asyncResponseSink;
+  }
+
   /**
    * Private structure for encapsulating SubClusterId and
    * RegisterApplicationMasterResponse instances.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ffcf5ba1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 34b0741..3db0e35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
@@ -36,8 +38,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -493,4 +502,49 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     } catch (YarnException e) {
     }
   }
+
+  @Test
+  public void testAllocateResponse() throws Exception {
+    interceptor.registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null));
+    AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+
+    Map<SubClusterId, List<AllocateResponse>> asyncResponseSink =
+        interceptor.getAsyncResponseSink();
+
+    ContainerId cid = ContainerId.newContainerId(attemptId, 0);
+    ContainerStatus cStatus = Records.newRecord(ContainerStatus.class);
+    cStatus.setContainerId(cid);
+    Container container =
+        Container.newInstance(cid, null, null, null, null, null);
+
+    AllocateResponse response = Records.newRecord(AllocateResponse.class);
+    response.setAllocatedContainers(Collections.singletonList(container));
+    response.setCompletedContainersStatuses(Collections.singletonList(cStatus));
+    response.setUpdatedNodes(
+        Collections.singletonList(Records.newRecord(NodeReport.class)));
+    response.setNMTokens(
+        Collections.singletonList(Records.newRecord(NMToken.class)));
+    response.setUpdatedContainers(
+        Collections.singletonList(Records.newRecord(UpdatedContainer.class)));
+    response.setUpdateErrors(Collections
+        .singletonList(Records.newRecord(UpdateContainerError.class)));
+    response.setAvailableResources(Records.newRecord(Resource.class));
+    response.setPreemptionMessage(Records.newRecord(PreemptionMessage.class));
+
+    List<AllocateResponse> list = new ArrayList<>();
+    list.add(response);
+    asyncResponseSink.put(SubClusterId.newInstance("SC-1"), list);
+
+    response = interceptor.allocate(allocateRequest);
+
+    Assert.assertEquals(1, response.getAllocatedContainers().size());
+    Assert.assertNotNull(response.getAvailableResources());
+    Assert.assertEquals(1, response.getCompletedContainersStatuses().size());
+    Assert.assertEquals(1, response.getUpdatedNodes().size());
+    Assert.assertNotNull(response.getPreemptionMessage());
+    Assert.assertEquals(1, response.getNMTokens().size());
+    Assert.assertEquals(1, response.getUpdatedContainers().size());
+    Assert.assertEquals(1, response.getUpdateErrors().size());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org