You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2018/07/09 19:28:04 UTC

hadoop git commit: YARN-7899. [AMRMProxy] Stateful FederationInterceptor for pending requests. Contributed by Botong Huang.

Repository: hadoop
Updated Branches:
  refs/heads/trunk e12d93bfc -> ea9b60823


YARN-7899. [AMRMProxy] Stateful FederationInterceptor for pending requests. Contributed by Botong Huang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ea9b6082
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ea9b6082
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ea9b6082

Branch: refs/heads/trunk
Commit: ea9b608237e7f2cf9b1e36b0f78c9674ec84096f
Parents: e12d93b
Author: Giovanni Matteo Fumarola <gi...@apache.com>
Authored: Mon Jul 9 12:27:36 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@apache.com>
Committed: Mon Jul 9 12:27:36 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/client/AMRMClientUtils.java     |  91 ------------
 .../hadoop/yarn/server/AMRMClientRelayer.java   |   9 +-
 .../yarn/server/uam/UnmanagedAMPoolManager.java |  16 ++
 .../server/uam/UnmanagedApplicationManager.java |  40 ++---
 .../yarn/server/MockResourceManagerFacade.java  |  13 +-
 .../amrmproxy/FederationInterceptor.java        | 146 ++++++++++++++++---
 .../amrmproxy/BaseAMRMProxyTest.java            |   2 +
 .../amrmproxy/TestFederationInterceptor.java    |  17 +++
 8 files changed, 192 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea9b6082/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
index 387e399..5d4ab4a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
@@ -36,19 +36,9 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,87 +58,6 @@ public final class AMRMClientUtils {
   }
 
   /**
-   * Handle ApplicationNotRegistered exception and re-register.
-   *
-   * @param appId application Id
-   * @param rmProxy RM proxy instance
-   * @param registerRequest the AM re-register request
-   * @throws YarnException if re-register fails
-   */
-  public static void handleNotRegisteredExceptionAndReRegister(
-      ApplicationId appId, ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest) throws YarnException {
-    LOG.info("App attempt {} not registered, most likely due to RM failover. "
-        + " Trying to re-register.", appId);
-    try {
-      rmProxy.registerApplicationMaster(registerRequest);
-    } catch (Exception e) {
-      if (e instanceof InvalidApplicationMasterRequestException
-          && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
-        LOG.info("Concurrent thread successfully registered, moving on.");
-      } else {
-        LOG.error("Error trying to re-register AM", e);
-        throw new YarnException(e);
-      }
-    }
-  }
-
-  /**
-   * Helper method for client calling ApplicationMasterProtocol.allocate that
-   * handles re-register if RM fails over.
-   *
-   * @param request allocate request
-   * @param rmProxy RM proxy
-   * @param registerRequest the register request for re-register
-   * @param appId application id
-   * @return allocate response
-   * @throws YarnException if RM call fails
-   * @throws IOException if RM call fails
-   */
-  public static AllocateResponse allocateWithReRegister(AllocateRequest request,
-      ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
-      throws YarnException, IOException {
-    try {
-      return rmProxy.allocate(request);
-    } catch (ApplicationMasterNotRegisteredException e) {
-      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
-          registerRequest);
-      // reset responseId after re-register
-      request.setResponseId(0);
-      // retry allocate
-      return allocateWithReRegister(request, rmProxy, registerRequest, appId);
-    }
-  }
-
-  /**
-   * Helper method for client calling
-   * ApplicationMasterProtocol.finishApplicationMaster that handles re-register
-   * if RM fails over.
-   *
-   * @param request finishApplicationMaster request
-   * @param rmProxy RM proxy
-   * @param registerRequest the register request for re-register
-   * @param appId application id
-   * @return finishApplicationMaster response
-   * @throws YarnException if RM call fails
-   * @throws IOException if RM call fails
-   */
-  public static FinishApplicationMasterResponse finishAMWithReRegister(
-      FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
-      throws YarnException, IOException {
-    try {
-      return rmProxy.finishApplicationMaster(request);
-    } catch (ApplicationMasterNotRegisteredException ex) {
-      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
-          registerRequest);
-      // retry finishAM after re-register
-      return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
-    }
-  }
-
-  /**
    * Create a proxy for the specified protocol.
    *
    * @param configuration Configuration to generate {@link ClientRMProxy}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea9b6082/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
index e8a7f64..0d1a27e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
@@ -147,6 +147,11 @@ public class AMRMClientRelayer extends AbstractService
     super.serviceStop();
   }
 
+  public void setAMRegistrationRequest(
+      RegisterApplicationMasterRequest registerRequest) {
+    this.amRegistrationRequest = registerRequest;
+  }
+
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request)
@@ -259,8 +264,10 @@ public class AMRMClientRelayer extends AbstractService
           }
         }
 
-        // re register with RM, then retry allocate recursively
+        // re-register with RM, then retry allocate recursively
         registerApplicationMaster(this.amRegistrationRequest);
+        // Reset responseId after re-register
+        allocateRequest.setResponseId(0);
         return allocate(allocateRequest);
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea9b6082/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 02eef29..5f9d81b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -385,4 +386,19 @@ public class UnmanagedAMPoolManager extends AbstractService {
     return this.unmanagedAppMasterMap.containsKey(uamId);
   }
 
+  /**
+   * Return the rmProxy relayer of an UAM.
+   *
+   * @param uamId uam Id
+   * @return the rmProxy relayer
+   * @throws YarnException if fails
+   */
+  public AMRMClientRelayer getAMRMClientRelayer(String uamId)
+      throws YarnException {
+    if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " does not exist");
+    }
+    return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea9b6082/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 73795dc..856a818 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
@@ -90,7 +91,7 @@ public class UnmanagedApplicationManager {
 
   private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
   private AMRequestHandlerThread handlerThread;
-  private ApplicationMasterProtocol rmProxy;
+  private AMRMClientRelayer rmProxyRelayer;
   private ApplicationId applicationId;
   private String submitter;
   private String appNameSuffix;
@@ -138,7 +139,7 @@ public class UnmanagedApplicationManager {
     this.appNameSuffix = appNameSuffix;
     this.handlerThread = new AMRequestHandlerThread();
     this.requestQueue = new LinkedBlockingQueue<>();
-    this.rmProxy = null;
+    this.rmProxyRelayer = null;
     this.connectionInitiated = false;
     this.registerRequest = null;
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
@@ -190,8 +191,9 @@ public class UnmanagedApplicationManager {
       throws IOException {
     this.userUgi = UserGroupInformation.createProxyUser(
         this.applicationId.toString(), UserGroupInformation.getCurrentUser());
-    this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
-        this.userUgi, amrmToken);
+    this.rmProxyRelayer =
+        new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
+            this.conf, this.userUgi, amrmToken));
   }
 
   /**
@@ -209,19 +211,18 @@ public class UnmanagedApplicationManager {
     // Save the register request for re-register later
     this.registerRequest = request;
 
-    // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
-    // We do not expect application already registered exception here
     LOG.info("Registering the Unmanaged application master {}",
         this.applicationId);
     RegisterApplicationMasterResponse response =
-        this.rmProxy.registerApplicationMaster(this.registerRequest);
+        this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
+    this.lastResponseId = 0;
 
     for (Container container : response.getContainersFromPreviousAttempts()) {
-      LOG.info("RegisterUAM returned existing running container "
+      LOG.debug("RegisterUAM returned existing running container "
           + container.getId());
     }
     for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
-      LOG.info("RegisterUAM returned existing NM token for node "
+      LOG.debug("RegisterUAM returned existing NM token for node "
           + nmToken.getNodeId());
     }
 
@@ -249,7 +250,7 @@ public class UnmanagedApplicationManager {
 
     this.handlerThread.shutdown();
 
-    if (this.rmProxy == null) {
+    if (this.rmProxyRelayer == null) {
       if (this.connectionInitiated) {
         // This is possible if the async launchUAM is still
         // blocked and retrying. Return a dummy response in this case.
@@ -261,8 +262,7 @@ public class UnmanagedApplicationManager {
             + "be called before createAndRegister");
       }
     }
-    return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
-        this.registerRequest, this.applicationId);
+    return this.rmProxyRelayer.finishApplicationMaster(request);
   }
 
   /**
@@ -308,7 +308,7 @@ public class UnmanagedApplicationManager {
     //
     // In case 2, we have already save the allocate request above, so if the
     // registration succeed later, no request is lost.
-    if (this.rmProxy == null) {
+    if (this.rmProxyRelayer == null) {
       if (this.connectionInitiated) {
         LOG.info("Unmanaged AM still not successfully launched/registered yet."
             + " Saving the allocate request and send later.");
@@ -329,6 +329,15 @@ public class UnmanagedApplicationManager {
   }
 
   /**
+   * Returns the rmProxy relayer of this UAM.
+   *
+   * @return rmProxy relayer of the UAM
+   */
+  public AMRMClientRelayer getAMRMClientRelayer() {
+    return this.rmProxyRelayer;
+  }
+
+  /**
    * Returns RM proxy for the specified protocol type. Unit test cases can
    * override this method and return mock proxy instances.
    *
@@ -592,10 +601,7 @@ public class UnmanagedApplicationManager {
           }
 
           request.setResponseId(lastResponseId);
-
-          AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
-              request, rmProxy, registerRequest, applicationId);
-
+          AllocateResponse response = rmProxyRelayer.allocate(request);
           if (response == null) {
             throw new YarnException("Null allocateResponse from allocate");
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea9b6082/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 23cd3e2..9b4d91d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -251,8 +251,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Registering application attempt: " + attemptId);
 
-    shouldReRegisterNext = false;
-
     List<Container> containersFromPreviousAttempt = null;
 
     synchronized (applicationContainerIdMap) {
@@ -266,7 +264,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
             containersFromPreviousAttempt.add(Container.newInstance(containerId,
                 null, null, null, null, null));
           }
-        } else {
+        } else if (!shouldReRegisterNext) {
           throw new InvalidApplicationMasterRequestException(
               AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
         }
@@ -276,6 +274,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       }
     }
 
+    shouldReRegisterNext = false;
+
     // Make sure we wait for certain test cases last in the method
     synchronized (syncObj) {
       syncObj.notifyAll();
@@ -339,13 +339,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
     validateRunning();
 
-    if (request.getAskList() != null && request.getAskList().size() > 0
-        && request.getReleaseList() != null
-        && request.getReleaseList().size() > 0) {
-      Assert.fail("The mock RM implementation does not support receiving "
-          + "askList and releaseList in the same heartbeat");
-    }
-
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Allocate from application attempt: " + attemptId);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea9b6082/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 5740749..645e47e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -62,14 +62,15 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
@@ -106,9 +107,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   public static final String NMSS_REG_RESPONSE_KEY =
       NMSS_CLASS_PREFIX + "registerResponse";
 
-  /*
+  /**
    * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
-   * Registry. Otherwise if NM recovery is enabled, the UAM token are store in
+   * Registry. Otherwise if NM recovery is enabled, the UAM token are stored in
    * local NMSS instead under this directory name.
    */
   public static final String NMSS_SECONDARY_SC_PREFIX =
@@ -119,8 +120,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    * The home sub-cluster is the sub-cluster where the AM container is running
    * in.
    */
-  private ApplicationMasterProtocol homeRM;
+  private AMRMClientRelayer homeRMRelayer;
   private SubClusterId homeSubClusterId;
+  private volatile int lastHomeResponseId;
+
+  /**
+   * A flag for work preserving NM restart. If we just recovered, we need to
+   * generate an {@link ApplicationMasterNotRegisteredException} exception back
+   * to AM (similar to what RM will do after its restart/fail-over) in its next
+   * allocate to trigger AM re-register (which we will shield from RM and just
+   * return our saved register response) and a full pending requests re-send, so
+   * that all the {@link AMRMClientRelayer} will be re-populated with all
+   * pending requests.
+   *
+   * TODO: When split-merge is not idempotent, this can lead to some
+   * over-allocation without a full cancel to RM.
+   */
+  private volatile boolean justRecovered;
 
   /**
    * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@@ -134,6 +150,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   private UnmanagedAMPoolManager uamPool;
 
+  /**
+   * The rmProxy relayers for secondary sub-clusters that keep track of all
+   * pending requests.
+   */
+  private Map<String, AMRMClientRelayer> secondaryRelayers;
+
   /** Thread pool used for asynchronous operations. */
   private ExecutorService threadpool;
 
@@ -186,8 +208,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     this.asyncResponseSink = new ConcurrentHashMap<>();
     this.threadpool = Executors.newCachedThreadPool();
     this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
+    this.secondaryRelayers = new ConcurrentHashMap<>();
     this.amRegistrationRequest = null;
     this.amRegistrationResponse = null;
+    this.lastHomeResponseId = Integer.MAX_VALUE;
+    this.justRecovered = false;
   }
 
   /**
@@ -224,8 +249,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
-    this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
-        this.appOwner);
+    this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
+        ApplicationMasterProtocol.class, this.appOwner));
 
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -240,13 +265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   @Override
   public void recover(Map<String, byte[]> recoveredDataMap) {
     super.recover(recoveredDataMap);
-    LOG.info("Recovering data for FederationInterceptor");
+    ApplicationAttemptId attemptId =
+        getApplicationContext().getApplicationAttemptId();
+    LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
     if (recoveredDataMap == null) {
       return;
     }
-
-    ApplicationAttemptId attemptId =
-        getApplicationContext().getApplicationAttemptId();
     try {
       if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
         RegisterApplicationMasterRequestProto pb =
@@ -255,6 +279,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         this.amRegistrationRequest =
             new RegisterApplicationMasterRequestPBImpl(pb);
         LOG.info("amRegistrationRequest recovered for {}", attemptId);
+
+        // Give the register request to homeRMRelayer for future re-registration
+        this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
       }
       if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
         RegisterApplicationMasterResponseProto pb =
@@ -263,6 +290,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         this.amRegistrationResponse =
             new RegisterApplicationMasterResponsePBImpl(pb);
         LOG.info("amRegistrationResponse recovered for {}", attemptId);
+        // Trigger re-register and full pending re-send only if we have a
+        // saved register response. This should always be true though.
+        this.justRecovered = true;
       }
 
       // Recover UAM amrmTokens from registry or NMSS
@@ -309,6 +339,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
               getApplicationContext().getUser(), this.homeSubClusterId.getId(),
               entry.getValue());
 
+          this.secondaryRelayers.put(subClusterId.getId(),
+              this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
+
           RegisterApplicationMasterResponse response =
               this.uamPool.registerApplicationMaster(subClusterId.getId(),
                   this.amRegistrationRequest);
@@ -436,7 +469,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
      * the other sub-cluster RM will be done lazily as needed later.
      */
     this.amRegistrationResponse =
-        this.homeRM.registerApplicationMaster(request);
+        this.homeRMRelayer.registerApplicationMaster(request);
     if (this.amRegistrationResponse
         .getContainersFromPreviousAttempts() != null) {
       cacheAllocatedContainers(
@@ -495,6 +528,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     Preconditions.checkArgument(this.policyInterpreter != null,
         "Allocate should be called after registerApplicationMaster");
 
+    if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) {
+      // Save the responseId home RM is expecting
+      this.lastHomeResponseId = request.getResponseId();
+
+      throw new ApplicationMasterNotRegisteredException(
+          "AMRMProxy just restarted and recovered for "
+              + getApplicationContext().getApplicationAttemptId()
+              + ". AM should re-register and full re-send pending requests.");
+    }
+
+    // Override responseId in the request in two cases:
+    //
+    // 1. After we just recovered after an NM restart and AM's responseId is
+    // reset due to the exception we generate. We need to override the
+    // responseId to the one homeRM expects.
+    //
+    // 2. After homeRM fail-over, the allocate response with reseted responseId
+    // might not be returned successfully back to AM because of RPC connection
+    // timeout between AM and AMRMProxy. In this case, we remember and reset the
+    // responseId for AM.
+    if (this.justRecovered
+        || request.getResponseId() > this.lastHomeResponseId) {
+      LOG.warn("Setting allocate responseId for {} from {} to {}",
+          getApplicationContext().getApplicationAttemptId(),
+          request.getResponseId(), this.lastHomeResponseId);
+      request.setResponseId(this.lastHomeResponseId);
+    }
+
     try {
       // Split the heart beat request into multiple requests, one for each
       // sub-cluster RM that is used by this application.
@@ -509,10 +570,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           sendRequestsToSecondaryResourceManagers(requests);
 
       // Send the request to the home RM and get the response
-      AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
-          requests.get(this.homeSubClusterId), this.homeRM,
-          this.amRegistrationRequest,
-          getApplicationContext().getApplicationAttemptId().getApplicationId());
+      AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
+      LOG.info("{} heartbeating to home RM with responseId {}",
+          getApplicationContext().getApplicationAttemptId(),
+          homeRequest.getResponseId());
+
+      AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
+
+      // Reset the flag after the first successful homeRM allocate response,
+      // otherwise keep overriding the responseId of new allocate request
+      if (this.justRecovered) {
+        this.justRecovered = false;
+      }
 
       // Notify policy of home response
       try {
@@ -540,6 +609,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
             newRegistrations.getSuccessfulRegistrations());
       }
 
+      LOG.info("{} heartbeat response from home RM with responseId {}",
+          getApplicationContext().getApplicationAttemptId(),
+          homeResponse.getResponseId());
+
+      // Update lastHomeResponseId in three cases:
+      // 1. The normal responseId increments
+      // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
+      // over, AMRMClientRelayer auto re-register and full re-send for homeRM.
+      // 3. lastHomeResponseId == MAX_INT. This is the initial case or
+      // responseId about to overflow and wrap around
+      if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
+          || homeResponse.getResponseId() == 1
+          || this.lastHomeResponseId == Integer.MAX_VALUE) {
+        this.lastHomeResponseId = homeResponse.getResponseId();
+      }
+
       // return the final response to the application master.
       return homeResponse;
     } catch (IOException ex) {
@@ -584,6 +669,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
             try {
               uamResponse =
                   uamPool.finishApplicationMaster(subClusterId, finishRequest);
+
+              if (uamResponse.getIsUnregistered()) {
+                secondaryRelayers.remove(subClusterId);
+
+                if (getNMStateStore() != null) {
+                  getNMStateStore().removeAMRMProxyAppContextEntry(
+                      getApplicationContext().getApplicationAttemptId(),
+                      NMSS_SECONDARY_SC_PREFIX + subClusterId);
+                }
+              }
             } catch (Throwable e) {
               LOG.warn("Failed to finish unmanaged application master: "
                   + "RM address: " + subClusterId + " ApplicationId: "
@@ -600,9 +695,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     // asynchronously by other sub-cluster resource managers, send the same
     // request to the home resource manager on this thread.
     FinishApplicationMasterResponse homeResponse =
-        AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
-            this.amRegistrationRequest, getApplicationContext()
-                .getApplicationAttemptId().getApplicationId());
+        this.homeRMRelayer.finishApplicationMaster(request);
 
     if (subClusterIds.size() > 0) {
       // Wait for other sub-cluster resource managers to return the
@@ -621,10 +714,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           if (uamResponse.getResponse() == null
               || !uamResponse.getResponse().getIsUnregistered()) {
             failedToUnRegister = true;
-          } else if (getNMStateStore() != null) {
-            getNMStateStore().removeAMRMProxyAppContextEntry(
-                getApplicationContext().getApplicationAttemptId(),
-                NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId());
           }
         } catch (Throwable e) {
           failedToUnRegister = true;
@@ -689,6 +778,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     return this.registryClient;
   }
 
+  @VisibleForTesting
+  protected int getLastHomeResponseId() {
+    return this.lastHomeResponseId;
+  }
+
   /**
    * Create the UAM pool manager for secondary sub-clsuters. For unit test to
    * override.
@@ -800,6 +894,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                     getApplicationContext().getUser(), homeSubClusterId.getId(),
                     amrmToken);
 
+                secondaryRelayers.put(subClusterId.getId(),
+                    uamPool.getAMRMClientRelayer(subClusterId.getId()));
+
                 response = uamPool.registerApplicationMaster(
                     subClusterId.getId(), amRegistrationRequest);
 
@@ -1098,7 +1195,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                   token = uamPool.launchUAM(subClusterId, config,
                       appContext.getApplicationAttemptId().getApplicationId(),
                       amRegistrationResponse.getQueue(), appContext.getUser(),
-                      homeSubClusterId.toString(), registryClient != null);
+                      homeSubClusterId.toString(), true);
+
+                  secondaryRelayers.put(subClusterId,
+                      uamPool.getAMRMClientRelayer(subClusterId));
 
                   uamResponse = uamPool.registerApplicationMaster(subClusterId,
                       registerRequest);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea9b6082/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 677732d..2794857 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -536,6 +537,7 @@ public abstract class BaseAMRMProxyTest {
     capability.setMemorySize(memory);
     capability.setVirtualCores(vCores);
     req.setCapability(capability);
+    req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance());
     if (labelExpression != null) {
       req.setNodeLabelExpression(labelExpression);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea9b6082/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index eefaba1..a837eed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
@@ -516,6 +517,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         interceptor.recover(recoveredDataMap);
 
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+        Assert.assertEquals(Integer.MAX_VALUE,
+            interceptor.getLastHomeResponseId());
+
+        // The first allocate call expects a fail-over exception and re-register
+        int responseId = 10;
+        AllocateRequest allocateRequest =
+            Records.newRecord(AllocateRequest.class);
+        allocateRequest.setResponseId(responseId);
+        try {
+          interceptor.allocate(allocateRequest);
+          Assert.fail("Expecting an ApplicationMasterNotRegisteredException  "
+              + " after FederationInterceptor restarts and recovers");
+        } catch (ApplicationMasterNotRegisteredException e) {
+        }
+        interceptor.registerApplicationMaster(registerReq);
+        Assert.assertEquals(responseId, interceptor.getLastHomeResponseId());
 
         // Release all containers
         releaseContainersAndAssert(containers);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org