You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2018/09/24 18:37:39 UTC

[1/2] hadoop git commit: YARN-8696. [AMRMProxy] FederationInterceptor upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 8de5c923b -> 309092280


http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index 33617d4..78f6eb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -26,18 +27,26 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
 import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Extends the FederationInterceptor and overrides methods to provide a testable
  * implementation of FederationInterceptor.
  */
 public class TestableFederationInterceptor extends FederationInterceptor {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestableFederationInterceptor.class);
+
   private ConcurrentHashMap<String, MockResourceManagerFacade>
       secondaryResourceManagers = new ConcurrentHashMap<>();
   private AtomicInteger runningIndex = new AtomicInteger(0);
@@ -58,6 +67,12 @@ public class TestableFederationInterceptor extends FederationInterceptor {
     return new TestableUnmanagedAMPoolManager(threadPool);
   }
 
+  @Override
+  protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
+      Configuration conf, ApplicationId appId) {
+    return new TestableAMRequestHandlerThread(conf, appId);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
@@ -109,6 +124,71 @@ public class TestableFederationInterceptor extends FederationInterceptor {
     return secondaryResourceManagers;
   }
 
+  protected MockResourceManagerFacade getSecondaryRM(String scId) {
+    return secondaryResourceManagers.get(scId);
+  }
+
+  /**
+   * Drain all aysnc heartbeat threads, comes in two favors:
+   *
+   * 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to
+   * pick up all pending heartbeat requests. Not necessarily wait for all
+   * threads to finish processing the last request. This is used to make sure
+   * all new UAM are launched by the async threads, but at the same time will
+   * finish draining while (slow) RM is still processing the last heartbeat
+   * request.
+   *
+   * 2. waitForAsyncHBThreadFinish == true. Wait for all async thread to finish
+   * processing all heartbeat requests.
+   */
+  protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish)
+      throws YarnException {
+
+    LOG.info("waiting to drain home heartbeat handler");
+    if (waitForAsyncHBThreadFinish) {
+      getHomeHeartbeartHandler().drainHeartbeatThread();
+    } else {
+      while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+
+    LOG.info("waiting to drain UAM heartbeat handlers");
+    UnmanagedAMPoolManager uamPool = getUnmanagedAMPool();
+    if (waitForAsyncHBThreadFinish) {
+      getUnmanagedAMPool().drainUAMHeartbeats();
+    } else {
+      while (true) {
+        boolean done = true;
+        for (String scId : uamPool.getAllUAMIds()) {
+          if (uamPool.getRequestQueueSize(scId) > 0) {
+            done = false;
+            break;
+          }
+        }
+        if (done) {
+          break;
+        }
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  }
+
+  protected UserGroupInformation getUGIWithToken(
+      ApplicationAttemptId appAttemptId) {
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
+    ugi.addTokenIdentifier(token);
+    return ugi;
+  }
+
   /**
    * Extends the UnmanagedAMPoolManager and overrides methods to provide a
    * testable implementation of UnmanagedAMPoolManager.
@@ -141,6 +221,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
         String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
       super(conf, appId, queueName, submitter, appNameSuffix,
           keepContainersAcrossApplicationAttempts, "TEST");
+      setHandlerThread(new TestableAMRequestHandlerThread(conf, appId));
     }
 
     /**
@@ -156,4 +237,30 @@ public class TestableFederationInterceptor extends FederationInterceptor {
           YarnConfiguration.getClusterId(config));
     }
   }
+
+  /**
+   * Wrap the handler thread so it calls from the same user.
+   */
+  protected class TestableAMRequestHandlerThread
+      extends AMHeartbeatRequestHandler {
+    public TestableAMRequestHandlerThread(Configuration conf,
+        ApplicationId applicationId) {
+      super(conf, applicationId);
+    }
+
+    @Override
+    public void run() {
+      try {
+        getUGIWithToken(getAttemptId())
+            .doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() {
+                TestableAMRequestHandlerThread.super.run();
+                return null;
+              }
+            });
+      } catch (Exception e) {
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 6fe0aa9..70b7498 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -84,7 +84,6 @@ import com.google.common.annotations.VisibleForTesting;
 public class ApplicationMasterService extends AbstractService implements
     ApplicationMasterProtocol {
   private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
-  private static final int PRE_REGISTER_RESPONSE_ID = -1;
 
   private final AMLivelinessMonitor amLivelinessMonitor;
   private YarnScheduler rScheduler;
@@ -377,11 +376,6 @@ public class ApplicationMasterService extends AbstractService implements
   protected static final Allocation EMPTY_ALLOCATION = new Allocation(
       EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
 
-  private int getNextResponseId(int responseId) {
-    // Loop between 0 to Integer.MAX_VALUE
-    return (responseId + 1) & Integer.MAX_VALUE;
-  }
-
   @Override
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
@@ -415,8 +409,8 @@ public class ApplicationMasterService extends AbstractService implements
       }
 
       // Normally request.getResponseId() == lastResponse.getResponseId()
-      if (getNextResponseId(request.getResponseId()) == lastResponse
-          .getResponseId()) {
+      if (AMRMClientUtils.getNextResponseId(
+          request.getResponseId()) == lastResponse.getResponseId()) {
         // heartbeat one step old, simply return lastReponse
         return lastResponse;
       } else if (request.getResponseId() != lastResponse.getResponseId()) {
@@ -461,7 +455,8 @@ public class ApplicationMasterService extends AbstractService implements
        * need to worry about unregister call occurring in between (which
        * removes the lock object).
        */
-      response.setResponseId(getNextResponseId(lastResponse.getResponseId()));
+      response.setResponseId(
+          AMRMClientUtils.getNextResponseId(lastResponse.getResponseId()));
       lock.setAllocateResponse(response);
       return response;
     }
@@ -472,7 +467,7 @@ public class ApplicationMasterService extends AbstractService implements
         recordFactory.newRecordInstance(AllocateResponse.class);
     // set response id to -1 before application master for the following
     // attemptID get registered
-    response.setResponseId(PRE_REGISTER_RESPONSE_ID);
+    response.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
     LOG.info("Registering app attempt : " + attemptId);
     responseMap.put(attemptId, new AllocateResponseLock(response));
     rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-8696. [AMRMProxy] FederationInterceptor upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.

Posted by gi...@apache.org.
YARN-8696. [AMRMProxy] FederationInterceptor upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/30909228
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/30909228
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/30909228

Branch: refs/heads/trunk
Commit: 3090922805699b8374a359e92323884a4177dc4e
Parents: 8de5c92
Author: Giovanni Matteo Fumarola <gi...@apache.org>
Authored: Mon Sep 24 11:37:05 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@apache.org>
Committed: Mon Sep 24 11:37:05 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   5 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   2 +
 .../hadoop/yarn/client/AMRMClientUtils.java     |   7 +
 .../yarn/server/AMHeartbeatRequestHandler.java  |  23 +-
 .../utils/FederationRegistryClient.java         |  10 +-
 .../yarn/server/uam/UnmanagedAMPoolManager.java |  15 +
 .../server/uam/UnmanagedApplicationManager.java |  14 +
 .../yarn/server/MockResourceManagerFacade.java  |  72 ++--
 .../amrmproxy/FederationInterceptor.java        | 376 +++++++++++--------
 .../amrmproxy/TestFederationInterceptor.java    | 298 +++++++++------
 .../TestableFederationInterceptor.java          | 107 ++++++
 .../ApplicationMasterService.java               |  15 +-
 12 files changed, 634 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a82801d..d69ae57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3221,6 +3221,11 @@ public class YarnConfiguration extends Configuration {
       "org.apache.hadoop.yarn.server.federation.resolver."
           + "DefaultSubClusterResolverImpl";
 
+  // the maximum wait time for the first async heartbeat response
+  public static final String FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS =
+      FEDERATION_PREFIX + "amrmproxy.hb.maximum.wait.ms";
+  public static final long DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS = 5000;
+
   // AMRMProxy split-merge timeout for active sub-clusters. We will not route
   // new asks to expired sub-clusters.
   public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index d63933c..6f781fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -106,6 +106,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
     configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
+    configurationPropsToSkipCompare
         .add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
 
     // Federation StateStore ZK implementation configs to be ignored

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
index b8319cd..34a9b34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
@@ -52,6 +52,8 @@ public final class AMRMClientUtils {
   private static final Logger LOG =
       LoggerFactory.getLogger(AMRMClientUtils.class);
 
+  public static final int PRE_REGISTER_RESPONSE_ID = -1;
+
   public static final String APP_ALREADY_REGISTERED_MESSAGE =
       "Application Master is already registered : ";
 
@@ -152,6 +154,11 @@ public final class AMRMClientUtils {
     }
   }
 
+  public static int getNextResponseId(int responseId) {
+    // Loop between 0 to Integer.MAX_VALUE
+    return (responseId + 1) & Integer.MAX_VALUE;
+  }
+
   public static void addToOutstandingSchedulingRequests(
       Collection<SchedulingRequest> requests,
       Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
index 42227bb..380c216 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
@@ -47,6 +47,9 @@ public class AMHeartbeatRequestHandler extends Thread {
   // Indication flag for the thread to keep running
   private volatile boolean keepRunning;
 
+  // For unit test draining
+  private volatile boolean isThreadWaiting;
+
   private Configuration conf;
   private ApplicationId applicationId;
 
@@ -61,6 +64,7 @@ public class AMHeartbeatRequestHandler extends Thread {
     this.setUncaughtExceptionHandler(
         new HeartBeatThreadUncaughtExceptionHandler());
     this.keepRunning = true;
+    this.isThreadWaiting = false;
 
     this.conf = conf;
     this.applicationId = applicationId;
@@ -82,12 +86,15 @@ public class AMHeartbeatRequestHandler extends Thread {
     while (keepRunning) {
       AsyncAllocateRequestInfo requestInfo;
       try {
-        requestInfo = requestQueue.take();
+        this.isThreadWaiting = true;
+        requestInfo = this.requestQueue.take();
+        this.isThreadWaiting = false;
+
         if (requestInfo == null) {
           throw new YarnException(
               "Null requestInfo taken from request queue");
         }
-        if (!keepRunning) {
+        if (!this.keepRunning) {
           break;
         }
 
@@ -98,7 +105,7 @@ public class AMHeartbeatRequestHandler extends Thread {
           throw new YarnException("Null allocateRequest from requestInfo");
         }
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
+          LOG.debug("Sending Heartbeat to RM. AskList:"
               + ((request.getAskList() == null) ? " empty"
                   : request.getAskList().size()));
         }
@@ -182,6 +189,16 @@ public class AMHeartbeatRequestHandler extends Thread {
   }
 
   @VisibleForTesting
+  public void drainHeartbeatThread() {
+    while (!this.isThreadWaiting || this.requestQueue.size() > 0) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  @VisibleForTesting
   public int getRequestQueueSize() {
     return this.requestQueue.size();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
index 6624318..13545c9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
@@ -78,7 +78,7 @@ public class FederationRegistryClient {
    *
    * @return the list of known applications
    */
-  public List<String> getAllApplications() {
+  public synchronized List<String> getAllApplications() {
     // Suppress the exception here because it is valid that the entry does not
     // exist
     List<String> applications = null;
@@ -99,7 +99,7 @@ public class FederationRegistryClient {
    * For testing, delete all application records in registry.
    */
   @VisibleForTesting
-  public void cleanAllApplications() {
+  public synchronized void cleanAllApplications() {
     try {
       removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null),
           true, false);
@@ -115,7 +115,7 @@ public class FederationRegistryClient {
    * @param token the UAM of the application
    * @return whether the amrmToken is added or updated to a new value
    */
-  public boolean writeAMRMTokenForUAM(ApplicationId appId,
+  public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
       String subClusterId, Token<AMRMTokenIdentifier> token) {
     Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
         this.appSubClusterTokenMap.get(appId);
@@ -154,7 +154,7 @@ public class FederationRegistryClient {
    * @param appId application id
    * @return the sub-cluster to UAM token mapping
    */
-  public Map<String, Token<AMRMTokenIdentifier>>
+  public synchronized Map<String, Token<AMRMTokenIdentifier>>
       loadStateFromRegistry(ApplicationId appId) {
     Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>();
     // Suppress the exception here because it is valid that the entry does not
@@ -203,7 +203,7 @@ public class FederationRegistryClient {
    *
    * @param appId application id
    */
-  public void removeAppFromRegistry(ApplicationId appId) {
+  public synchronized void removeAppFromRegistry(ApplicationId appId) {
     Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
         this.appSubClusterTokenMap.get(appId);
     LOG.info("Removing all registry entries for {}", appId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index d708ced..d5a0168 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -407,4 +407,19 @@ public class UnmanagedAMPoolManager extends AbstractService {
     return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
   }
 
+  @VisibleForTesting
+  public int getRequestQueueSize(String uamId) throws YarnException {
+    if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " does not exist");
+    }
+    return this.unmanagedAppMasterMap.get(uamId).getRequestQueueSize();
+  }
+
+  @VisibleForTesting
+  public void drainUAMHeartbeats() {
+    for (UnmanagedApplicationManager uam : this.unmanagedAppMasterMap
+        .values()) {
+      uam.drainHeartbeatThread();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 7c1e154..91d5d6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -225,6 +225,10 @@ public class UnmanagedApplicationManager {
       LOG.debug("RegisterUAM returned existing NM token for node "
           + nmToken.getNodeId());
     }
+    LOG.info(
+        "RegisterUAM returned {} existing running container and {} NM tokens",
+        response.getContainersFromPreviousAttempts().size(),
+        response.getNMTokensFromPreviousAttempts().size());
 
     // Only when register succeed that we start the heartbeat thread
     this.heartbeatHandler.setDaemon(true);
@@ -516,4 +520,14 @@ public class UnmanagedApplicationManager {
   public int getRequestQueueSize() {
     return this.heartbeatHandler.getRequestQueueSize();
   }
+
+  @VisibleForTesting
+  protected void setHandlerThread(AMHeartbeatRequestHandler thread) {
+    this.heartbeatHandler = thread;
+  }
+
+  @VisibleForTesting
+  protected void drainHeartbeatThread() {
+    this.heartbeatHandler.drainHeartbeatThread();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 958b1f1..50a4bff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -189,8 +189,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
   private HashSet<ApplicationId> applicationMap = new HashSet<>();
   private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>();
-  private HashMap<ApplicationAttemptId,
-      List<ContainerId>> applicationContainerIdMap = new HashMap<>();
+  private HashMap<ApplicationId, List<ContainerId>> applicationContainerIdMap =
+      new HashMap<>();
+  private int rmId;
   private AtomicInteger containerIndex = new AtomicInteger(0);
   private Configuration conf;
   private int subClusterId;
@@ -203,6 +204,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
   private boolean shouldReRegisterNext = false;
 
+  private boolean shouldWaitForSyncNextAllocate = false;
+
   // For unit test synchronization
   private static Object syncObj = new Object();
 
@@ -218,6 +221,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public MockResourceManagerFacade(Configuration conf, int startContainerIndex,
       int subClusterId, boolean isRunning) {
     this.conf = conf;
+    this.rmId = startContainerIndex;
     this.containerIndex.set(startContainerIndex);
     this.subClusterId = subClusterId;
     this.isRunning = isRunning;
@@ -259,17 +263,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     validateRunning();
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Registering application attempt: " + attemptId);
+    ApplicationId appId = attemptId.getApplicationId();
 
     List<Container> containersFromPreviousAttempt = null;
 
     synchronized (applicationContainerIdMap) {
-      if (applicationContainerIdMap.containsKey(attemptId)) {
-        if (keepContainerOnUams.contains(attemptId.getApplicationId())) {
+      if (applicationContainerIdMap.containsKey(appId)) {
+        if (keepContainerOnUams.contains(appId)) {
           // For UAM with the keepContainersFromPreviousAttempt flag, return all
           // running containers
           containersFromPreviousAttempt = new ArrayList<>();
-          for (ContainerId containerId : applicationContainerIdMap
-              .get(attemptId)) {
+          for (ContainerId containerId : applicationContainerIdMap.get(appId)) {
             containersFromPreviousAttempt.add(Container.newInstance(containerId,
                 null, null, null, null, null));
           }
@@ -279,7 +283,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
         }
       } else {
         // Keep track of the containers that are returned to this application
-        applicationContainerIdMap.put(attemptId, new ArrayList<ContainerId>());
+        applicationContainerIdMap.put(appId, new ArrayList<ContainerId>());
       }
     }
 
@@ -314,6 +318,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Finishing application attempt: " + attemptId);
+    ApplicationId appId = attemptId.getApplicationId();
 
     if (shouldReRegisterNext) {
       String message = "AM is not registered, should re-register.";
@@ -324,8 +329,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     synchronized (applicationContainerIdMap) {
       // Remove the containers that were being tracked for this application
       Assert.assertTrue("The application id is NOT registered: " + attemptId,
-          applicationContainerIdMap.containsKey(attemptId));
-      applicationContainerIdMap.remove(attemptId);
+          applicationContainerIdMap.containsKey(appId));
+      applicationContainerIdMap.remove(appId);
     }
 
     return FinishApplicationMasterResponse.newInstance(
@@ -350,6 +355,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Allocate from application attempt: " + attemptId);
+    ApplicationId appId = attemptId.getApplicationId();
 
     if (shouldReRegisterNext) {
       String message = "AM is not registered, should re-register.";
@@ -357,6 +363,21 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       throw new ApplicationMasterNotRegisteredException(message);
     }
 
+    // Wait for signal for certain test cases
+    synchronized (syncObj) {
+      if (shouldWaitForSyncNextAllocate) {
+        shouldWaitForSyncNextAllocate = false;
+
+        LOG.info("Allocate call in RM start waiting");
+        try {
+          syncObj.wait();
+          LOG.info("Allocate call in RM wait finished");
+        } catch (InterruptedException e) {
+          LOG.info("Allocate call in RM wait interrupted", e);
+        }
+      }
+    }
+
     ArrayList<Container> containerList = new ArrayList<Container>();
     if (request.getAskList() != null) {
       for (ResourceRequest rr : request.getAskList()) {
@@ -381,9 +402,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
             // will need it in future
             Assert.assertTrue(
                 "The application id is Not registered before allocate(): "
-                    + attemptId,
-                applicationContainerIdMap.containsKey(attemptId));
-            List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
+                    + appId,
+                applicationContainerIdMap.containsKey(appId));
+            List<ContainerId> ids = applicationContainerIdMap.get(appId);
             ids.add(containerId);
           }
         }
@@ -395,12 +416,10 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
         && request.getReleaseList().size() > 0) {
       LOG.info("Releasing containers: " + request.getReleaseList().size());
       synchronized (applicationContainerIdMap) {
-        Assert
-            .assertTrue(
-                "The application id is not registered before allocate(): "
-                    + attemptId,
-                applicationContainerIdMap.containsKey(attemptId));
-        List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
+        Assert.assertTrue(
+            "The application id is not registered before allocate(): " + appId,
+            applicationContainerIdMap.containsKey(appId));
+        List<ContainerId> ids = applicationContainerIdMap.get(appId);
 
         for (ContainerId id : request.getReleaseList()) {
           boolean found = false;
@@ -426,7 +445,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
         + " for application attempt: " + conf.get("AMRMTOKEN"));
 
     // Always issue a new AMRMToken as if RM rolled master key
-    Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
+    Token newAMRMToken = Token.newInstance(new byte[0],
+        Integer.toString(this.rmId), new byte[0], "");
 
     return AllocateResponse.newInstance(0, completedList, containerList,
         new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
@@ -434,6 +454,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
         new ArrayList<UpdatedContainer>());
   }
 
+  public void setWaitForSyncNextAllocate(boolean wait) {
+    synchronized (syncObj) {
+      shouldWaitForSyncNextAllocate = wait;
+    }
+  }
+
   @Override
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) throws YarnException, IOException {
@@ -624,14 +650,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
     validateRunning();
 
-    ApplicationAttemptId attemptId = request.getApplicationAttemptId();
+    ApplicationId appId = request.getApplicationAttemptId().getApplicationId();
     List<ContainerReport> containers = new ArrayList<>();
     synchronized (applicationContainerIdMap) {
       // Return the list of running containers that were being tracked for this
       // application
-      Assert.assertTrue("The application id is NOT registered: " + attemptId,
-          applicationContainerIdMap.containsKey(attemptId));
-      List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
+      Assert.assertTrue("The application id is NOT registered: " + appId,
+          applicationContainerIdMap.containsKey(appId));
+      List<ContainerId> ids = applicationContainerIdMap.get(appId);
       for (ContainerId c : ids) {
         containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0,
             null, null, 0, null, null));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 1bf882f..c02296d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -62,14 +64,18 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
 import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
@@ -80,9 +86,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
-import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,6 +122,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       NMSS_CLASS_PREFIX + "secondarySC/";
   public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
 
+  private static final RecordFactory RECORD_FACTORY =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  /**
+   * From AM's perspective, FederationInterceptor behaves exactly the same as
+   * YarnRM (ApplicationMasterService). This is to remember the last heart beat
+   * response, used to handle duplicate heart beat and responseId from AM.
+   */
+  private AllocateResponse lastAllocateResponse;
+  private final Object lastAllocateResponseLock = new Object();
+
   private ApplicationAttemptId attemptId;
 
   /**
@@ -124,7 +141,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   private AMRMClientRelayer homeRMRelayer;
   private SubClusterId homeSubClusterId;
-  private volatile int lastHomeResponseId;
+  private AMHeartbeatRequestHandler homeHeartbeartHandler;
 
   /**
    * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@@ -146,7 +163,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
   /**
    * Stores the AllocateResponses that are received asynchronously from all the
-   * sub-cluster resource managers except the home RM.
+   * sub-cluster resource managers, including home RM.
    */
   private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
 
@@ -194,14 +211,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   /** The policy used to split requests among sub-clusters. */
   private FederationAMRMProxyPolicy policyInterpreter;
 
-  /**
-   * The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken
-   * issued by home RM.
-   */
-  private UserGroupInformation appOwner;
-
   private FederationRegistryClient registryClient;
 
+  // the maximum wait time for the first async heart beat response
+  private long heartbeatMaxWaitTimeMs;
+
+  private MonotonicClock clock = new MonotonicClock();
+
   /**
    * Creates an instance of the FederationInterceptor class.
    */
@@ -213,7 +229,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     this.secondaryRelayers = new ConcurrentHashMap<>();
     this.amRegistrationRequest = null;
     this.amRegistrationResponse = null;
-    this.lastHomeResponseId = Integer.MAX_VALUE;
     this.justRecovered = false;
   }
 
@@ -233,8 +248,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       setConf(conf);
     }
 
+    // The proxy ugi used to talk to home RM as well as Yarn Registry, loaded
+    // with the up-to-date AMRMToken issued by home RM.
+    UserGroupInformation appOwner;
     try {
-      this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
+      appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
           UserGroupInformation.getCurrentUser());
     } catch (Exception ex) {
       throw new YarnRuntimeException(ex);
@@ -242,10 +260,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
     if (appContext.getRegistryClient() != null) {
       this.registryClient = new FederationRegistryClient(conf,
-          appContext.getRegistryClient(), this.appOwner);
+          appContext.getRegistryClient(), appOwner);
       // Add all app tokens for Yarn Registry access
       if (appContext.getCredentials() != null) {
-        this.appOwner.addCredentials(appContext.getCredentials());
+        appOwner.addCredentials(appContext.getCredentials());
       }
     }
 
@@ -254,9 +272,21 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
     this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
-        ApplicationMasterProtocol.class, this.appOwner), appId,
+        ApplicationMasterProtocol.class, appOwner), appId,
         this.homeSubClusterId.toString());
 
+    this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId);
+    this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer);
+    this.homeHeartbeartHandler.setUGI(appOwner);
+    this.homeHeartbeartHandler.setDaemon(true);
+    this.homeHeartbeartHandler.start();
+
+    // set lastResponseId to -1 before application master registers
+    this.lastAllocateResponse =
+        RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
+    this.lastAllocateResponse
+        .setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
+
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.subClusterResolver = this.federationFacade.getSubClusterResolver();
 
@@ -265,6 +295,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
     this.uamPool.init(conf);
     this.uamPool.start();
+
+    this.heartbeatMaxWaitTimeMs =
+        conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS,
+            YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
   }
 
   @Override
@@ -272,6 +306,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     super.recover(recoveredDataMap);
     LOG.info("Recovering data for FederationInterceptor for {}",
         this.attemptId);
+    this.justRecovered = true;
+
     if (recoveredDataMap == null) {
       return;
     }
@@ -294,9 +330,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         this.amRegistrationResponse =
             new RegisterApplicationMasterResponsePBImpl(pb);
         LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
-        // Trigger re-register and full pending re-send only if we have a
-        // saved register response. This should always be true though.
-        this.justRecovered = true;
       }
 
       // Recover UAM amrmTokens from registry or NMSS
@@ -355,6 +388,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
               .getContainersFromPreviousAttempts()) {
             containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
             containers++;
+            LOG.debug("  From subcluster " + subClusterId
+                + " running container " + container.getId());
           }
           LOG.info("Recovered {} running containers from UAM in {}",
               response.getContainersFromPreviousAttempts().size(),
@@ -384,7 +419,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         LOG.debug("  From home RM " + this.homeSubClusterId
             + " running container " + container.getContainerId());
       }
-      LOG.info("{} running containers including AM recovered from home RM ",
+      LOG.info("{} running containers including AM recovered from home RM {}",
           response.getContainerList().size(), this.homeSubClusterId);
 
       LOG.info(
@@ -411,8 +446,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    * so that when AM registers more than once, it returns the same register
    * success response instead of throwing
    * {@link InvalidApplicationMasterRequestException}. Furthermore, we present
-   * to AM as if we are the RM that never fails over. When actual RM fails over,
-   * we always re-register automatically.
+   * to AM as if we are the RM that never fails over (except when AMRMProxy
+   * restarts). When actual RM fails over, we always re-register automatically.
    *
    * We did this because FederationInterceptor can receive concurrent register
    * requests from AM because of timeout between AM and AMRMProxy, which is
@@ -425,6 +460,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   public synchronized RegisterApplicationMasterResponse
       registerApplicationMaster(RegisterApplicationMasterRequest request)
           throws YarnException, IOException {
+
+    // Reset the heartbeat responseId to zero upon register
+    synchronized (this.lastAllocateResponseLock) {
+      this.lastAllocateResponse.setResponseId(0);
+    }
+    this.justRecovered = false;
+
     // If AM is calling with a different request, complain
     if (this.amRegistrationRequest != null) {
       if (!this.amRegistrationRequest.equals(request)) {
@@ -524,34 +566,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   @Override
   public AllocateResponse allocate(AllocateRequest request)
-      throws YarnException {
+      throws YarnException, IOException {
     Preconditions.checkArgument(this.policyInterpreter != null,
         "Allocate should be called after registerApplicationMaster");
 
-    if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) {
-      // Save the responseId home RM is expecting
-      this.lastHomeResponseId = request.getResponseId();
-
+    if (this.justRecovered) {
       throw new ApplicationMasterNotRegisteredException(
           "AMRMProxy just restarted and recovered for " + this.attemptId
               + ". AM should re-register and full re-send pending requests.");
     }
 
-    // Override responseId in the request in two cases:
-    //
-    // 1. After we just recovered after an NM restart and AM's responseId is
-    // reset due to the exception we generate. We need to override the
-    // responseId to the one homeRM expects.
-    //
-    // 2. After homeRM fail-over, the allocate response with reseted responseId
-    // might not be returned successfully back to AM because of RPC connection
-    // timeout between AM and AMRMProxy. In this case, we remember and reset the
-    // responseId for AM.
-    if (this.justRecovered
-        || request.getResponseId() > this.lastHomeResponseId) {
-      LOG.warn("Setting allocate responseId for {} from {} to {}",
-          this.attemptId, request.getResponseId(), this.lastHomeResponseId);
-      request.setResponseId(this.lastHomeResponseId);
+    // Check responseId and handle duplicate heartbeat exactly same as RM
+    synchronized (this.lastAllocateResponseLock) {
+      LOG.info("Heartbeat from " + this.attemptId + " with responseId "
+          + request.getResponseId() + " when we are expecting "
+          + this.lastAllocateResponse.getResponseId());
+      // Normally request.getResponseId() == lastResponse.getResponseId()
+      if (AMRMClientUtils.getNextResponseId(
+          request.getResponseId()) == this.lastAllocateResponse
+              .getResponseId()) {
+        // heartbeat one step old, simply return lastReponse
+        return this.lastAllocateResponse;
+      } else if (request.getResponseId() != this.lastAllocateResponse
+          .getResponseId()) {
+        throw new InvalidApplicationMasterRequestException(
+            AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(attemptId,
+                this.lastAllocateResponse.getResponseId(),
+                request.getResponseId()));
+      }
     }
 
     try {
@@ -560,71 +602,55 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       Map<SubClusterId, AllocateRequest> requests =
           splitAllocateRequest(request);
 
-      // Send the requests to the secondary sub-cluster resource managers.
-      // These secondary requests are send asynchronously and the responses will
-      // be collected and merged with the home response. In addition, it also
-      // return the newly registered Unmanaged AMs.
-      Registrations newRegistrations =
-          sendRequestsToSecondaryResourceManagers(requests);
-
-      // Send the request to the home RM and get the response
-      AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
-      LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId,
-          homeRequest.getResponseId());
-
-      AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
-
-      // Reset the flag after the first successful homeRM allocate response,
-      // otherwise keep overriding the responseId of new allocate request
-      if (this.justRecovered) {
-        this.justRecovered = false;
+      /**
+       * Send the requests to the all sub-cluster resource managers. All
+       * requests are synchronously triggered but sent asynchronously. Later the
+       * responses will be collected and merged. In addition, it also returns
+       * the newly registered UAMs.
+       */
+      Registrations newRegistrations = sendRequestsToResourceManagers(requests);
+
+      // Wait for the first async response to arrive
+      long startTime = this.clock.getTime();
+      synchronized (this.asyncResponseSink) {
+        try {
+          this.asyncResponseSink.wait(this.heartbeatMaxWaitTimeMs);
+        } catch (InterruptedException e) {
+        }
       }
+      long firstResponseTime = this.clock.getTime() - startTime;
 
-      // Notify policy of home response
+      // An extra brief wait for other async heart beats, so that most of their
+      // responses can make it back to AM in the same heart beat round trip.
       try {
-        this.policyInterpreter.notifyOfResponse(this.homeSubClusterId,
-            homeResponse);
-      } catch (YarnException e) {
-        LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
-            + this.homeSubClusterId, e);
+        Thread.sleep(firstResponseTime);
+      } catch (InterruptedException e) {
       }
 
-      // If the resource manager sent us a new token, add to the current user
-      if (homeResponse.getAMRMToken() != null) {
-        LOG.debug("Received new AMRMToken");
-        YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(),
-            this.appOwner, getConf());
-      }
+      // Prepare the response to AM
+      AllocateResponse response =
+          RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
 
-      // Merge the responses from home and secondary sub-cluster RMs
-      homeResponse = mergeAllocateResponses(homeResponse);
+      // Merge all responses from response sink
+      mergeAllocateResponses(response);
 
       // Merge the containers and NMTokens from the new registrations into
-      // the homeResponse.
+      // the response
       if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
-        homeResponse = mergeRegistrationResponses(homeResponse,
+        mergeRegistrationResponses(response,
             newRegistrations.getSuccessfulRegistrations());
       }
 
-      LOG.info("{} heartbeat response from home RM with responseId {}",
-          this.attemptId, homeResponse.getResponseId());
-
-      // Update lastHomeResponseId in three cases:
-      // 1. The normal responseId increments
-      // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
-      // over, AMRMClientRelayer auto re-register and full re-send for homeRM.
-      // 3. lastHomeResponseId == MAX_INT. This is the initial case or
-      // responseId about to overflow and wrap around
-      if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
-          || homeResponse.getResponseId() == 1
-          || this.lastHomeResponseId == Integer.MAX_VALUE) {
-        this.lastHomeResponseId = homeResponse.getResponseId();
+      // update the responseId and return the final response to AM
+      synchronized (this.lastAllocateResponseLock) {
+        response.setResponseId(AMRMClientUtils
+            .getNextResponseId(this.lastAllocateResponse.getResponseId()));
+        this.lastAllocateResponse = response;
       }
-
-      // return the final response to the application master.
-      return homeResponse;
-    } catch (IOException ex) {
-      LOG.error("Exception encountered while processing heart beat", ex);
+      return response;
+    } catch (Throwable ex) {
+      LOG.error("Exception encountered while processing heart beat for "
+          + this.attemptId, ex);
       throw new YarnException(ex);
     }
   }
@@ -696,6 +722,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     FinishApplicationMasterResponse homeResponse =
         this.homeRMRelayer.finishApplicationMaster(request);
 
+    // Stop the home heartbeat thread
+    this.homeHeartbeartHandler.shutdown();
+
     if (subClusterIds.size() > 0) {
       // Wait for other sub-cluster resource managers to return the
       // response and merge it with the home response
@@ -758,10 +787,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
       this.threadpool = null;
     }
-    homeRMRelayer.shutdown();
-    for(AMRMClientRelayer relayer : secondaryRelayers.values()){
+
+    // Stop the home heartbeat thread
+    this.homeHeartbeartHandler.shutdown();
+    this.homeRMRelayer.shutdown();
+    for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) {
       relayer.shutdown();
     }
+
     super.shutdown();
   }
 
@@ -781,8 +814,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   }
 
   @VisibleForTesting
-  protected int getLastHomeResponseId() {
-    return this.lastHomeResponseId;
+  protected ApplicationAttemptId getAttemptId() {
+    return this.attemptId;
+  }
+
+  @VisibleForTesting
+  protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() {
+    return this.homeHeartbeartHandler;
   }
 
   /**
@@ -798,6 +836,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     return new UnmanagedAMPoolManager(threadPool);
   }
 
+  @VisibleForTesting
+  protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
+      Configuration conf, ApplicationId appId) {
+    return new AMHeartbeatRequestHandler(conf, appId);
+  }
+
   /**
    * Create a proxy instance that is used to connect to the Home resource
    * manager.
@@ -872,7 +916,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         + "Reattaching in parallel", uamMap.size(), appId);
 
     ExecutorCompletionService<RegisterApplicationMasterResponse>
-        completionService = new ExecutorCompletionService<>(threadpool);
+        completionService = new ExecutorCompletionService<>(this.threadpool);
 
     for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
       final SubClusterId subClusterId =
@@ -1047,16 +1091,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
   /**
    * This methods sends the specified AllocateRequests to the appropriate
-   * sub-cluster resource managers.
+   * sub-cluster resource managers asynchronously.
    *
    * @param requests contains the heart beat requests to send to the resource
-   *          manager keyed by the resource manager address
+   *          manager keyed by the sub-cluster id
    * @return the registration responses from the newly added sub-cluster
    *         resource managers
    * @throws YarnException
    * @throws IOException
    */
-  private Registrations sendRequestsToSecondaryResourceManagers(
+  private Registrations sendRequestsToResourceManagers(
       Map<SubClusterId, AllocateRequest> requests)
       throws YarnException, IOException {
 
@@ -1065,32 +1109,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     Registrations registrations = registerWithNewSubClusters(requests.keySet());
 
     // Now that all the registrations are done, send the allocation request
-    // to the sub-cluster RMs using the Unmanaged application masters
-    // asynchronously and don't wait for the response. The responses will
-    // arrive asynchronously and will be added to the response sink. These
-    // responses will be sent to the application master in some future heart
-    // beat response.
+    // to the sub-cluster RMs asynchronously and don't wait for the response.
+    // The responses will arrive asynchronously and will be added to the
+    // response sink, then merged and sent to the application master.
     for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
-      final SubClusterId subClusterId = entry.getKey();
+      SubClusterId subClusterId = entry.getKey();
 
       if (subClusterId.equals(this.homeSubClusterId)) {
-        // Skip the request for the home sub-cluster resource manager.
-        // It will be handled separately in the allocate() method
-        continue;
-      }
-
-      if (!this.uamPool.hasUAMId(subClusterId.getId())) {
-        // TODO: This means that the registration for this sub-cluster RM
-        // failed. For now, we ignore the resource requests and continue
-        // but we need to fix this and handle this situation. One way would
-        // be to send the request to another RM by consulting the policy.
-        LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
-            subClusterId);
-        continue;
+        // Request for the home sub-cluster resource manager
+        this.homeHeartbeartHandler.allocateAsync(entry.getValue(),
+            new HeartbeatCallBack(this.homeSubClusterId, false));
+      } else {
+        if (!this.uamPool.hasUAMId(subClusterId.getId())) {
+          // TODO: This means that the registration for this sub-cluster RM
+          // failed. For now, we ignore the resource requests and continue
+          // but we need to fix this and handle this situation. One way would
+          // be to send the request to another RM by consulting the policy.
+          LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
+              subClusterId);
+          continue;
+        }
+        this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
+            new HeartbeatCallBack(subClusterId, true));
       }
-
-      this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
-          new HeartbeatCallBack(subClusterId));
     }
 
     return registrations;
@@ -1123,7 +1164,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           this.amRegistrationRequest;
       final AMRMProxyApplicationContext appContext = getApplicationContext();
       ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
-          completionService = new ExecutorCompletionService<>(threadpool);
+          completionService = new ExecutorCompletionService<>(this.threadpool);
 
       for (final String subClusterId : newSubClusters) {
         completionService
@@ -1208,21 +1249,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   }
 
   /**
-   * Merges the responses from other sub-clusters that we received
-   * asynchronously with the specified home cluster response and keeps track of
-   * the containers received from each sub-cluster resource managers.
+   * Merge the responses from all sub-clusters that we received asynchronously
+   * and keeps track of the containers received from each sub-cluster resource
+   * managers.
    */
-  private AllocateResponse mergeAllocateResponses(
-      AllocateResponse homeResponse) {
-    // Timing issue, we need to remove the completed and then save the new ones.
-    removeFinishedContainersFromCache(
-        homeResponse.getCompletedContainersStatuses());
-    cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
-        this.homeSubClusterId);
-
+  private void mergeAllocateResponses(AllocateResponse mergedResponse) {
     synchronized (this.asyncResponseSink) {
-      for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink
-          .entrySet()) {
+      for (Entry<SubClusterId, List<AllocateResponse>> entry :
+          this.asyncResponseSink.entrySet()) {
         SubClusterId subClusterId = entry.getKey();
         List<AllocateResponse> responses = entry.getValue();
         if (responses.size() > 0) {
@@ -1231,14 +1265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                 response.getCompletedContainersStatuses());
             cacheAllocatedContainers(response.getAllocatedContainers(),
                 subClusterId);
-            mergeAllocateResponse(homeResponse, response, subClusterId);
+            mergeAllocateResponse(mergedResponse, response, subClusterId);
           }
           responses.clear();
         }
       }
     }
-
-    return homeResponse;
   }
 
   /**
@@ -1256,11 +1288,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   }
 
   /**
-   * Helper method for merging the responses from the secondary sub cluster RMs
-   * with the home response to return to the AM.
+   * Helper method for merging the registration responses from the secondary sub
+   * cluster RMs into the allocate response to return to the AM.
    */
-  private AllocateResponse mergeRegistrationResponses(
-      AllocateResponse homeResponse,
+  private void mergeRegistrationResponses(AllocateResponse homeResponse,
       Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
 
     for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
@@ -1292,13 +1323,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         }
       }
     }
-
-    return homeResponse;
   }
 
   private void mergeAllocateResponse(AllocateResponse homeResponse,
       AllocateResponse otherResponse, SubClusterId otherRMAddress) {
 
+    if (otherResponse.getAMRMToken() != null) {
+      // Propagate only the new amrmToken from home sub-cluster back to
+      // AMRMProxyService
+      if (otherRMAddress.equals(this.homeSubClusterId)) {
+        homeResponse.setAMRMToken(otherResponse.getAMRMToken());
+      } else {
+        throw new YarnRuntimeException(
+            "amrmToken from UAM " + otherRMAddress + " should be null here");
+      }
+    }
+
     if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
       if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
         homeResponse.getAllocatedContainers()
@@ -1406,9 +1446,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       SubClusterId subClusterId) {
     for (Container container : containers) {
       LOG.debug("Adding container {}", container);
-      if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
+
+      if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) {
         SubClusterId existingSubClusterId =
-            containerIdToSubClusterIdMap.get(container.getId());
+            this.containerIdToSubClusterIdMap.get(container.getId());
         if (existingSubClusterId.equals(subClusterId)) {
           /*
            * When RM fails over, the new RM master might send out the same
@@ -1441,7 +1482,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         }
       }
 
-      containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
+      this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
     }
   }
 
@@ -1463,7 +1504,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       newRequest.setProgress(originalAMRequest.getProgress());
       requestMap.put(subClusterId, newRequest);
     }
-
     return newRequest;
   }
 
@@ -1472,7 +1512,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   private static AllocateRequest createAllocateRequest() {
     AllocateRequest request =
-        AllocateRequest.newInstance(0, 0, null, null, null);
+        RECORD_FACTORY.newRecordInstance(AllocateRequest.class);
     request.setAskList(new ArrayList<ResourceRequest>());
     request.setReleaseList(new ArrayList<ContainerId>());
     ResourceBlacklistRequest blackList =
@@ -1526,6 +1566,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   }
 
   @VisibleForTesting
+  protected UnmanagedAMPoolManager getUnmanagedAMPool() {
+    return this.uamPool;
+  }
+
+  @VisibleForTesting
   public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
     return this.asyncResponseSink;
   }
@@ -1535,9 +1580,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
     private SubClusterId subClusterId;
+    private boolean isUAM;
 
-    HeartbeatCallBack(SubClusterId subClusterId) {
+    HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) {
       this.subClusterId = subClusterId;
+      this.isUAM = isUAM;
     }
 
     @Override
@@ -1551,16 +1598,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           asyncResponseSink.put(subClusterId, responses);
         }
         responses.add(response);
+        // Notify main thread about the response arrival
+        asyncResponseSink.notifyAll();
       }
 
       // Save the new AMRMToken for the UAM if present
-      if (response.getAMRMToken() != null) {
+      if (this.isUAM && response.getAMRMToken() != null) {
         Token<AMRMTokenIdentifier> newToken = ConverterUtils
             .convertFromYarn(response.getAMRMToken(), (Text) null);
+        // Do not further propagate the new amrmToken for UAM
+        response.setAMRMToken(null);
+
         // Update the token in registry or NMSS
         if (registryClient != null) {
-          registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
-              subClusterId.getId(), newToken);
+          if (registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
+              subClusterId.getId(), newToken)) {
+            try {
+              AMRMTokenIdentifier identifier = new AMRMTokenIdentifier();
+              identifier.readFields(new DataInputStream(
+                  new ByteArrayInputStream(newToken.getIdentifier())));
+              LOG.info(
+                  "Received new UAM amrmToken with keyId {} and "
+                      + "service {} from {} for {}, written to Registry",
+                  identifier.getKeyId(), newToken.getService(), subClusterId,
+                  attemptId);
+            } catch (IOException e) {
+            }
+          }
         } else if (getNMStateStore() != null) {
           try {
             getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
@@ -1573,11 +1637,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         }
       }
 
-      // Notify policy of secondary sub-cluster responses
+      // Notify policy of allocate response
       try {
         policyInterpreter.notifyOfResponse(subClusterId, response);
       } catch (YarnException e) {
-        LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+        LOG.warn("notifyOfResponse for policy failed for sub-cluster "
             + subClusterId, e);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index a837eed..407ae83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
 
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -95,6 +97,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
   private int testAppId;
   private ApplicationAttemptId attemptId;
 
+  private volatile int lastResponseId;
+
   @Override
   public void setUp() throws IOException {
     super.setUp();
@@ -120,6 +124,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(),
         attemptId, "test-user", null, null, null, registry));
     interceptor.cleanupRegistry();
+
+    lastResponseId = 0;
   }
 
   @Override
@@ -174,8 +180,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
   private List<Container> getContainersAndAssert(int numberOfResourceRequests,
       int numberOfAllocationExcepted) throws Exception {
     AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
-    allocateRequest.setResponseId(1);
-
     List<Container> containers =
         new ArrayList<Container>(numberOfResourceRequests);
     List<ResourceRequest> askList =
@@ -187,22 +191,31 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
 
     allocateRequest.setAskList(askList);
 
+    allocateRequest.setResponseId(lastResponseId);
     AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
     Assert.assertNotNull("allocate() returned null response", allocateResponse);
+    checkAMRMToken(allocateResponse.getAMRMToken());
+    lastResponseId = allocateResponse.getResponseId();
 
     containers.addAll(allocateResponse.getAllocatedContainers());
     LOG.info("Number of allocated containers in the original request: "
         + Integer.toString(allocateResponse.getAllocatedContainers().size()));
 
+    // Make sure this request is picked up by all async heartbeat handlers
+    interceptor.drainAllAsyncQueue(false);
+
     // Send max 10 heart beats to receive all the containers. If not, we will
     // fail the test
     int numHeartbeat = 0;
     while (containers.size() < numberOfAllocationExcepted
         && numHeartbeat++ < 10) {
-      allocateResponse =
-          interceptor.allocate(Records.newRecord(AllocateRequest.class));
+      allocateRequest = Records.newRecord(AllocateRequest.class);
+      allocateRequest.setResponseId(lastResponseId);
+      allocateResponse = interceptor.allocate(allocateRequest);
       Assert.assertNotNull("allocate() returned null response",
           allocateResponse);
+      checkAMRMToken(allocateResponse.getAMRMToken());
+      lastResponseId = allocateResponse.getResponseId();
 
       containers.addAll(allocateResponse.getAllocatedContainers());
 
@@ -220,8 +233,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
       throws Exception {
     Assert.assertTrue(containers.size() > 0);
     AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
-    allocateRequest.setResponseId(1);
-
     List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
     for (Container container : containers) {
       relList.add(container.getId());
@@ -229,8 +240,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
 
     allocateRequest.setReleaseList(relList);
 
+    allocateRequest.setResponseId(lastResponseId);
     AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
     Assert.assertNotNull(allocateResponse);
+    checkAMRMToken(allocateResponse.getAMRMToken());
+    lastResponseId = allocateResponse.getResponseId();
 
     // The release request will be split and handled by the corresponding UAM.
     // The release containers returned by the mock resource managers will be
@@ -244,14 +258,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     LOG.info("Number of containers received in the original request: "
         + Integer.toString(newlyFinished.size()));
 
+    // Make sure this request is picked up by all async heartbeat handlers
+    interceptor.drainAllAsyncQueue(false);
+
     // Send max 10 heart beats to receive all the containers. If not, we will
     // fail the test
     int numHeartbeat = 0;
     while (containersForReleasedContainerIds.size() < relList.size()
         && numHeartbeat++ < 10) {
-      allocateResponse =
-          interceptor.allocate(Records.newRecord(AllocateRequest.class));
+      allocateRequest = Records.newRecord(AllocateRequest.class);
+      allocateRequest.setResponseId(lastResponseId);
+      allocateResponse = interceptor.allocate(allocateRequest);
       Assert.assertNotNull(allocateResponse);
+      checkAMRMToken(allocateResponse.getAMRMToken());
+      lastResponseId = allocateResponse.getResponseId();
+
       newlyFinished = getCompletedContainerIds(
           allocateResponse.getCompletedContainersStatuses());
       containersForReleasedContainerIds.addAll(newlyFinished);
@@ -267,65 +288,81 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         containersForReleasedContainerIds.size());
   }
 
+  private void checkAMRMToken(Token amrmToken) {
+    if (amrmToken != null) {
+      // The token should be the one issued by home MockRM
+      Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0)));
+    }
+  }
+
   @Test
   public void testMultipleSubClusters() throws Exception {
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        // Register the application
+        RegisterApplicationMasterRequest registerReq =
+            Records.newRecord(RegisterApplicationMasterRequest.class);
+        registerReq.setHost(Integer.toString(testAppId));
+        registerReq.setRpcPort(0);
+        registerReq.setTrackingUrl("");
 
-    // Register the application
-    RegisterApplicationMasterRequest registerReq =
-        Records.newRecord(RegisterApplicationMasterRequest.class);
-    registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(0);
-    registerReq.setTrackingUrl("");
+        RegisterApplicationMasterResponse registerResponse =
+            interceptor.registerApplicationMaster(registerReq);
+        Assert.assertNotNull(registerResponse);
+        lastResponseId = 0;
 
-    RegisterApplicationMasterResponse registerResponse =
-        interceptor.registerApplicationMaster(registerReq);
-    Assert.assertNotNull(registerResponse);
+        Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+        // Allocate the first batch of containers, with sc1 and sc2 active
+        registerSubCluster(SubClusterId.newInstance("SC-1"));
+        registerSubCluster(SubClusterId.newInstance("SC-2"));
+
+        int numberOfContainers = 3;
+        List<Container> containers =
+            getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+        Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
 
-    Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+        // Allocate the second batch of containers, with sc1 and sc3 active
+        deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
+        registerSubCluster(SubClusterId.newInstance("SC-3"));
 
-    // Allocate the first batch of containers, with sc1 and sc2 active
-    registerSubCluster(SubClusterId.newInstance("SC-1"));
-    registerSubCluster(SubClusterId.newInstance("SC-2"));
+        numberOfContainers = 1;
+        containers.addAll(
+            getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
+        Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
 
-    int numberOfContainers = 3;
-    List<Container> containers =
-        getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
-    Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
-
-    // Allocate the second batch of containers, with sc1 and sc3 active
-    deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
-    registerSubCluster(SubClusterId.newInstance("SC-3"));
-
-    numberOfContainers = 1;
-    containers.addAll(
-        getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
-    Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
-
-    // Allocate the third batch of containers with only in home sub-cluster
-    // active
-    deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
-    deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
-    registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
-
-    numberOfContainers = 2;
-    containers.addAll(
-        getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
-    Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
-
-    // Release all containers
-    releaseContainersAndAssert(containers);
-
-    // Finish the application
-    FinishApplicationMasterRequest finishReq =
-        Records.newRecord(FinishApplicationMasterRequest.class);
-    finishReq.setDiagnostics("");
-    finishReq.setTrackingUrl("");
-    finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
-
-    FinishApplicationMasterResponse finshResponse =
-        interceptor.finishApplicationMaster(finishReq);
-    Assert.assertNotNull(finshResponse);
-    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+        // Allocate the third batch of containers with only in home sub-cluster
+        // active
+        deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
+        deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
+        registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+
+        numberOfContainers = 2;
+        containers.addAll(
+            getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
+        Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
+
+        // Release all containers
+        releaseContainersAndAssert(containers);
+
+        // Finish the application
+        FinishApplicationMasterRequest finishReq =
+            Records.newRecord(FinishApplicationMasterRequest.class);
+        finishReq.setDiagnostics("");
+        finishReq.setTrackingUrl("");
+        finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+        FinishApplicationMasterResponse finshResponse =
+            interceptor.finishApplicationMaster(finishReq);
+        Assert.assertNotNull(finshResponse);
+        Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+        return null;
+      }
+    });
   }
 
   /*
@@ -333,49 +370,58 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
    */
   @Test
   public void testReregister() throws Exception {
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
 
-    // Register the application
-    RegisterApplicationMasterRequest registerReq =
-        Records.newRecord(RegisterApplicationMasterRequest.class);
-    registerReq.setHost(Integer.toString(testAppId));
-    registerReq.setRpcPort(0);
-    registerReq.setTrackingUrl("");
+        // Register the application
+        RegisterApplicationMasterRequest registerReq =
+            Records.newRecord(RegisterApplicationMasterRequest.class);
+        registerReq.setHost(Integer.toString(testAppId));
+        registerReq.setRpcPort(0);
+        registerReq.setTrackingUrl("");
 
-    RegisterApplicationMasterResponse registerResponse =
-        interceptor.registerApplicationMaster(registerReq);
-    Assert.assertNotNull(registerResponse);
+        RegisterApplicationMasterResponse registerResponse =
+            interceptor.registerApplicationMaster(registerReq);
+        Assert.assertNotNull(registerResponse);
+        lastResponseId = 0;
 
-    Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+        Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
 
-    // Allocate the first batch of containers
-    registerSubCluster(SubClusterId.newInstance("SC-1"));
-    registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+        // Allocate the first batch of containers
+        registerSubCluster(SubClusterId.newInstance("SC-1"));
+        registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
 
-    interceptor.setShouldReRegisterNext();
+        interceptor.setShouldReRegisterNext();
 
-    int numberOfContainers = 3;
-    List<Container> containers =
-        getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
-    Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+        int numberOfContainers = 3;
+        List<Container> containers =
+            getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+        Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
 
-    interceptor.setShouldReRegisterNext();
+        interceptor.setShouldReRegisterNext();
 
-    // Release all containers
-    releaseContainersAndAssert(containers);
+        // Release all containers
+        releaseContainersAndAssert(containers);
 
-    interceptor.setShouldReRegisterNext();
+        interceptor.setShouldReRegisterNext();
 
-    // Finish the application
-    FinishApplicationMasterRequest finishReq =
-        Records.newRecord(FinishApplicationMasterRequest.class);
-    finishReq.setDiagnostics("");
-    finishReq.setTrackingUrl("");
-    finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+        // Finish the application
+        FinishApplicationMasterRequest finishReq =
+            Records.newRecord(FinishApplicationMasterRequest.class);
+        finishReq.setDiagnostics("");
+        finishReq.setTrackingUrl("");
+        finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
 
-    FinishApplicationMasterResponse finshResponse =
-        interceptor.finishApplicationMaster(finishReq);
-    Assert.assertNotNull(finshResponse);
-    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+        FinishApplicationMasterResponse finshResponse =
+            interceptor.finishApplicationMaster(finishReq);
+        Assert.assertNotNull(finshResponse);
+        Assert.assertEquals(true, finshResponse.getIsUnregistered());
+        return null;
+      }
+    });
   }
 
   /*
@@ -442,6 +488,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         // Use port number 1001 to let mock RM block in the register call
         response = interceptor.registerApplicationMaster(
             RegisterApplicationMasterRequest.newInstance(null, 1001, null));
+        lastResponseId = 0;
       } catch (Exception e) {
         LOG.info("Register thread exception", e);
         response = null;
@@ -460,9 +507,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     testRecover(null);
   }
 
-  public void testRecover(RegistryOperations registryObj) throws Exception {
-    ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
-    userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
+  protected void testRecover(final RegistryOperations registryObj)
+      throws Exception {
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {
         interceptor = new TestableFederationInterceptor();
@@ -480,6 +529,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         RegisterApplicationMasterResponse registerResponse =
             interceptor.registerApplicationMaster(registerReq);
         Assert.assertNotNull(registerResponse);
+        lastResponseId = 0;
 
         Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
 
@@ -492,6 +542,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
             getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
 
+        // Make sure all async hb threads are done
+        interceptor.drainAllAsyncQueue(true);
+
         // Prepare for Federation Interceptor restart and recover
         Map<String, byte[]> recoveredDataMap =
             recoverDataMapForAppAttempt(nmStateStore, attemptId);
@@ -517,22 +570,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         interceptor.recover(recoveredDataMap);
 
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-        Assert.assertEquals(Integer.MAX_VALUE,
-            interceptor.getLastHomeResponseId());
 
         // The first allocate call expects a fail-over exception and re-register
-        int responseId = 10;
-        AllocateRequest allocateRequest =
-            Records.newRecord(AllocateRequest.class);
-        allocateRequest.setResponseId(responseId);
         try {
-          interceptor.allocate(allocateRequest);
+          AllocateRequest allocateRequest =
+              Records.newRecord(AllocateRequest.class);
+          allocateRequest.setResponseId(lastResponseId);
+          AllocateResponse allocateResponse =
+              interceptor.allocate(allocateRequest);
+          lastResponseId = allocateResponse.getResponseId();
           Assert.fail("Expecting an ApplicationMasterNotRegisteredException  "
               + " after FederationInterceptor restarts and recovers");
         } catch (ApplicationMasterNotRegisteredException e) {
         }
         interceptor.registerApplicationMaster(registerReq);
-        Assert.assertEquals(responseId, interceptor.getLastHomeResponseId());
+        lastResponseId = 0;
 
         // Release all containers
         releaseContainersAndAssert(containers);
@@ -614,6 +666,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
       RegisterApplicationMasterResponse registerResponse =
           interceptor.registerApplicationMaster(registerReq);
       Assert.assertNotNull(registerResponse);
+      lastResponseId = 0;
     }
   }
 
@@ -629,6 +682,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     RegisterApplicationMasterResponse registerResponse =
         interceptor.registerApplicationMaster(registerReq);
     Assert.assertNotNull(registerResponse);
+    lastResponseId = 0;
 
     // Register the application second time with a different request obj
     registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
@@ -637,6 +691,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     registerReq.setTrackingUrl("different");
     try {
       registerResponse = interceptor.registerApplicationMaster(registerReq);
+      lastResponseId = 0;
       Assert.fail("Should throw if a different request obj is used");
     } catch (YarnException e) {
     }
@@ -689,20 +744,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
 
   @Test
   public void testSecondAttempt() throws Exception {
-    ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
-    userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
+    final RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
       public Object run() throws Exception {
         // Register the application
-        RegisterApplicationMasterRequest registerReq =
-            Records.newRecord(RegisterApplicationMasterRequest.class);
-        registerReq.setHost(Integer.toString(testAppId));
-        registerReq.setRpcPort(testAppId);
-        registerReq.setTrackingUrl("");
-
         RegisterApplicationMasterResponse registerResponse =
             interceptor.registerApplicationMaster(registerReq);
         Assert.assertNotNull(registerResponse);
+        lastResponseId = 0;
 
         Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
 
@@ -714,10 +771,13 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         List<Container> containers =
             getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
         for (Container c : containers) {
-          System.out.println(c.getId() + " ha");
+          LOG.info("Allocated container " + c.getId());
         }
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
 
+        // Make sure all async hb threads are done
+        interceptor.drainAllAsyncQueue(true);
+
         // Preserve the mock RM instances for secondaries
         ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
             interceptor.getSecondaryRMs();
@@ -729,8 +789,20 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         interceptor = new TestableFederationInterceptor(null, secondaries);
         interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
             getConf(), attemptId, "test-user", null, null, null, registry));
-        registerResponse = interceptor.registerApplicationMaster(registerReq);
+        return null;
+      }
+    });
 
+    // Update the ugi with new attemptId
+    ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        RegisterApplicationMasterResponse registerResponse =
+            interceptor.registerApplicationMaster(registerReq);
+        lastResponseId = 0;
+
+        int numberOfContainers = 3;
         // Should re-attach secondaries and get the three running containers
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
         Assert.assertEquals(numberOfContainers,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org