You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/12/12 23:59:12 UTC

[37/50] hadoop git commit: YARN-6704. Add support for work preserving NM restart when FederationInterceptor is enabled in AMRMProxyService. (Botong Huang via Subru).

YARN-6704. Add support for work preserving NM restart when FederationInterceptor is enabled in AMRMProxyService. (Botong Huang via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/670e8d4e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/670e8d4e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/670e8d4e

Branch: refs/heads/HDFS-7240
Commit: 670e8d4ec7e71fc3b054cd3b2826f869b649a788
Parents: 04b84da
Author: Subru Krishnan <su...@apache.org>
Authored: Fri Dec 8 15:39:18 2017 -0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Fri Dec 8 15:39:18 2017 -0800

----------------------------------------------------------------------
 .../yarn/server/MockResourceManagerFacade.java  |  16 +-
 .../nodemanager/amrmproxy/AMRMProxyService.java |   5 +-
 .../amrmproxy/FederationInterceptor.java        | 271 +++++++++++++++++--
 .../amrmproxy/BaseAMRMProxyTest.java            |  15 +
 .../amrmproxy/TestFederationInterceptor.java    | 104 +++++++
 .../TestableFederationInterceptor.java          |   8 +-
 6 files changed, 387 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/670e8d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index b5727aa..15e1cea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -622,7 +623,20 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
     validateRunning();
 
-    return GetContainersResponse.newInstance(null);
+    ApplicationAttemptId attemptId = request.getApplicationAttemptId();
+    List<ContainerReport> containers = new ArrayList<>();
+    synchronized (applicationContainerIdMap) {
+      // Return the list of running containers that were being tracked for this
+      // application
+      Assert.assertTrue("The application id is NOT registered: " + attemptId,
+          applicationContainerIdMap.containsKey(attemptId));
+      List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
+      for (ContainerId c : ids) {
+        containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0,
+            null, null, 0, null, null));
+      }
+    }
+    return GetContainersResponse.newInstance(containers);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/670e8d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index ebd85bf..815e39b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -128,11 +128,8 @@ public class AMRMProxyService extends CompositeService implements
         new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore());
     this.secretManager.init(conf);
 
-    // Both second app attempt and NM restart within Federation need registry
     if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED,
-        YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED)
-        || conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED,
-            YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) {
+        YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED)) {
       this.registry = FederationStateStoreFacade.createInstance(conf,
           YarnConfiguration.YARN_REGISTRY_CLASS,
           YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/670e8d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index ef5e061..9a53a50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -37,16 +37,23 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.PreemptionContract;
@@ -59,6 +66,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
@@ -90,6 +99,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   private static final Logger LOG =
       LoggerFactory.getLogger(FederationInterceptor.class);
 
+  public static final String NMSS_CLASS_PREFIX = "FederationInterceptor/";
+
+  public static final String NMSS_REG_REQUEST_KEY =
+      NMSS_CLASS_PREFIX + "registerRequest";
+  public static final String NMSS_REG_RESPONSE_KEY =
+      NMSS_CLASS_PREFIX + "registerResponse";
+
+  /*
+   * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
+   * Registry. Otherwise if NM recovery is enabled, the UAM token are store in
+   * local NMSS instead under this directory name.
+   */
+  public static final String NMSS_SECONDARY_SC_PREFIX =
+      NMSS_CLASS_PREFIX + "secondarySC/";
+  public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
+
   /**
    * The home sub-cluster is the sub-cluster where the AM container is running
    * in.
@@ -187,14 +212,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     } catch (Exception ex) {
       throw new YarnRuntimeException(ex);
     }
-    // Add all app tokens for Yarn Registry access
-    if (this.registryClient != null && appContext.getCredentials() != null) {
-      this.appOwner.addCredentials(appContext.getCredentials());
+
+    if (appContext.getRegistryClient() != null) {
+      this.registryClient = new FederationRegistryClient(conf,
+          appContext.getRegistryClient(), this.appOwner);
+      // Add all app tokens for Yarn Registry access
+      if (appContext.getCredentials() != null) {
+        this.appOwner.addCredentials(appContext.getCredentials());
+      }
     }
 
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
-    this.homeRM = createHomeRMProxy(appContext);
+    this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
+        this.appOwner);
 
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -204,11 +235,137 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
     this.uamPool.init(conf);
     this.uamPool.start();
+  }
 
-    if (appContext.getRegistryClient() != null) {
-      this.registryClient = new FederationRegistryClient(conf,
-          appContext.getRegistryClient(), this.appOwner);
+  @Override
+  public void recover(Map<String, byte[]> recoveredDataMap) {
+    super.recover(recoveredDataMap);
+    LOG.info("Recovering data for FederationInterceptor");
+    if (recoveredDataMap == null) {
+      return;
     }
+
+    ApplicationAttemptId attemptId =
+        getApplicationContext().getApplicationAttemptId();
+    try {
+      if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
+        RegisterApplicationMasterRequestProto pb =
+            RegisterApplicationMasterRequestProto
+                .parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
+        this.amRegistrationRequest =
+            new RegisterApplicationMasterRequestPBImpl(pb);
+        LOG.info("amRegistrationRequest recovered for {}", attemptId);
+      }
+      if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
+        RegisterApplicationMasterResponseProto pb =
+            RegisterApplicationMasterResponseProto
+                .parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
+        this.amRegistrationResponse =
+            new RegisterApplicationMasterResponsePBImpl(pb);
+        LOG.info("amRegistrationResponse recovered for {}", attemptId);
+      }
+
+      // Recover UAM amrmTokens from registry or NMSS
+      Map<String, Token<AMRMTokenIdentifier>> uamMap;
+      if (this.registryClient != null) {
+        uamMap = this.registryClient
+            .loadStateFromRegistry(attemptId.getApplicationId());
+        LOG.info("Found {} existing UAMs for application {} in Yarn Registry",
+            uamMap.size(), attemptId.getApplicationId());
+      } else {
+        uamMap = new HashMap<>();
+        for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
+          if (entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) {
+            // entry for subClusterId -> UAM amrmToken
+            String scId =
+                entry.getKey().substring(NMSS_SECONDARY_SC_PREFIX.length());
+            Token<AMRMTokenIdentifier> amrmToken = new Token<>();
+            amrmToken.decodeFromUrlString(
+                new String(entry.getValue(), STRING_TO_BYTE_FORMAT));
+            uamMap.put(scId, amrmToken);
+            LOG.debug("Recovered UAM in " + scId + " from NMSS");
+          }
+        }
+        LOG.info("Found {} existing UAMs for application {} in NMStateStore",
+            uamMap.size(), attemptId.getApplicationId());
+      }
+
+      // Re-attach the UAMs
+      int containers = 0;
+      for (Map.Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap
+          .entrySet()) {
+        SubClusterId subClusterId = SubClusterId.newInstance(entry.getKey());
+
+        // Create a config loaded with federation on and subclusterId
+        // for each UAM
+        YarnConfiguration config = new YarnConfiguration(getConf());
+        FederationProxyProviderUtil.updateConfForFederation(config,
+            subClusterId.getId());
+
+        try {
+          this.uamPool.reAttachUAM(subClusterId.getId(), config,
+              attemptId.getApplicationId(),
+              this.amRegistrationResponse.getQueue(),
+              getApplicationContext().getUser(), this.homeSubClusterId.getId(),
+              entry.getValue());
+
+          RegisterApplicationMasterResponse response =
+              this.uamPool.registerApplicationMaster(subClusterId.getId(),
+                  this.amRegistrationRequest);
+
+          // Running containers from secondary RMs
+          for (Container container : response
+              .getContainersFromPreviousAttempts()) {
+            containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
+            containers++;
+          }
+          LOG.info("Recovered {} running containers from UAM in {}",
+              response.getContainersFromPreviousAttempts().size(),
+              subClusterId);
+
+        } catch (Exception e) {
+          LOG.error(
+              "Error reattaching UAM to " + subClusterId + " for " + attemptId,
+              e);
+        }
+      }
+
+      // Get the running containers from home RM, note that we will also get the
+      // AM container itself from here. We don't need it, but no harm to put the
+      // map as well.
+      UserGroupInformation appSubmitter = UserGroupInformation
+          .createRemoteUser(getApplicationContext().getUser());
+      ApplicationClientProtocol rmClient =
+          createHomeRMProxy(getApplicationContext(),
+              ApplicationClientProtocol.class, appSubmitter);
+
+      GetContainersResponse response =
+          rmClient.getContainers(GetContainersRequest.newInstance(attemptId));
+      for (ContainerReport container : response.getContainerList()) {
+        containerIdToSubClusterIdMap.put(container.getContainerId(),
+            this.homeSubClusterId);
+        containers++;
+        LOG.debug("  From home RM " + this.homeSubClusterId
+            + " running container " + container.getContainerId());
+      }
+      LOG.info("{} running containers including AM recovered from home RM ",
+          response.getContainerList().size(), this.homeSubClusterId);
+
+      LOG.info(
+          "In all {} UAMs {} running containers including AM recovered for {}",
+          uamMap.size(), containers, attemptId);
+
+      if (this.amRegistrationResponse != null) {
+        // Initialize the AMRMProxyPolicy
+        String queue = this.amRegistrationResponse.getQueue();
+        this.policyInterpreter =
+            FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
+                getConf(), this.federationFacade, this.homeSubClusterId);
+      }
+    } catch (IOException | YarnException e) {
+      throw new YarnRuntimeException(e);
+    }
+
   }
 
   /**
@@ -242,6 +399,19 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       // Save the registration request. This will be used for registering with
       // secondary sub-clusters using UAMs, as well as re-register later
       this.amRegistrationRequest = request;
+      if (getNMStateStore() != null) {
+        try {
+          RegisterApplicationMasterRequestPBImpl pb =
+              (RegisterApplicationMasterRequestPBImpl)
+                  this.amRegistrationRequest;
+          getNMStateStore().storeAMRMProxyAppContextEntry(
+              getApplicationContext().getApplicationAttemptId(),
+              NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
+        } catch (Exception e) {
+          LOG.error("Error storing AMRMProxy application context entry for "
+              + getApplicationContext().getApplicationAttemptId(), e);
+        }
+      }
     }
 
     /*
@@ -278,6 +448,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         getApplicationContext().getApplicationAttemptId().getApplicationId();
     reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
 
+    if (getNMStateStore() != null) {
+      try {
+        RegisterApplicationMasterResponsePBImpl pb =
+            (RegisterApplicationMasterResponsePBImpl)
+                this.amRegistrationResponse;
+        getNMStateStore().storeAMRMProxyAppContextEntry(
+            getApplicationContext().getApplicationAttemptId(),
+            NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
+      } catch (Exception e) {
+        LOG.error("Error storing AMRMProxy application context entry for "
+            + getApplicationContext().getApplicationAttemptId(), e);
+      }
+    }
+
     // the queue this application belongs will be used for getting
     // AMRMProxy policy from state store.
     String queue = this.amRegistrationResponse.getQueue();
@@ -437,6 +621,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           if (uamResponse.getResponse() == null
               || !uamResponse.getResponse().getIsUnregistered()) {
             failedToUnRegister = true;
+          } else if (getNMStateStore() != null) {
+            getNMStateStore().removeAMRMProxyAppContextEntry(
+                getApplicationContext().getApplicationAttemptId(),
+                NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId());
           }
         } catch (Throwable e) {
           failedToUnRegister = true;
@@ -496,6 +684,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     }
   }
 
+  @VisibleForTesting
+  protected FederationRegistryClient getRegistryClient() {
+    return this.registryClient;
+  }
+
   /**
    * Create the UAM pool manager for secondary sub-clsuters. For unit test to
    * override.
@@ -510,18 +703,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   }
 
   /**
-   * Returns instance of the ApplicationMasterProtocol proxy class that is used
-   * to connect to the Home resource manager.
+   * Create a proxy instance that is used to connect to the Home resource
+   * manager.
    *
    * @param appContext AMRMProxyApplicationContext
+   * @param protocol the protocol class for the proxy
+   * @param user the ugi for the proxy
+   * @param <T> the type of the proxy
    * @return the proxy created
    */
-  protected ApplicationMasterProtocol createHomeRMProxy(
-      AMRMProxyApplicationContext appContext) {
+  protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
+      Class<T> protocol, UserGroupInformation user) {
     try {
       return FederationProxyProviderUtil.createRMProxy(appContext.getConf(),
-          ApplicationMasterProtocol.class, this.homeSubClusterId, this.appOwner,
-          appContext.getAMRMToken());
+          protocol, this.homeSubClusterId, user, appContext.getAMRMToken());
     } catch (Exception ex) {
       throw new YarnRuntimeException(ex);
     }
@@ -810,17 +1005,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                 responses.add(response);
               }
 
-              // Save the new AMRMToken for the UAM in registry if present
+              // Save the new AMRMToken for the UAM if present
               if (response.getAMRMToken() != null) {
                 Token<AMRMTokenIdentifier> newToken = ConverterUtils
                     .convertFromYarn(response.getAMRMToken(), (Text) null);
-                // Update the token in registry
+                // Update the token in registry or NMSS
                 if (registryClient != null) {
                   registryClient
                       .writeAMRMTokenForUAM(
                           getApplicationContext().getApplicationAttemptId()
                               .getApplicationId(),
                           subClusterId.getId(), newToken);
+                } else if (getNMStateStore() != null) {
+                  try {
+                    getNMStateStore().storeAMRMProxyAppContextEntry(
+                        getApplicationContext().getApplicationAttemptId(),
+                        NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
+                        newToken.encodeToUrlString()
+                            .getBytes(STRING_TO_BYTE_FORMAT));
+                  } catch (IOException e) {
+                    LOG.error(
+                        "Error storing UAM token as AMRMProxy "
+                            + "context entry in NMSS for "
+                            + getApplicationContext().getApplicationAttemptId(),
+                        e);
+                  }
                 }
               }
 
@@ -925,12 +1134,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
             successfulRegistrations.put(uamResponse.getSubClusterId(),
                 uamResponse.getResponse());
 
+            // Save the UAM token in registry or NMSS
             if (registryClient != null) {
               registryClient.writeAMRMTokenForUAM(
                   getApplicationContext().getApplicationAttemptId()
                       .getApplicationId(),
                   uamResponse.getSubClusterId().getId(),
                   uamResponse.getUamToken());
+            } else if (getNMStateStore() != null) {
+              getNMStateStore().storeAMRMProxyAppContextEntry(
+                  getApplicationContext().getApplicationAttemptId(),
+                  NMSS_SECONDARY_SC_PREFIX
+                      + uamResponse.getSubClusterId().getId(),
+                  uamResponse.getUamToken().encodeToUrlString()
+                      .getBytes(STRING_TO_BYTE_FORMAT));
             }
           }
         } catch (Exception e) {
@@ -952,11 +1169,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   private AllocateResponse mergeAllocateResponses(
       AllocateResponse homeResponse) {
     // Timing issue, we need to remove the completed and then save the new ones.
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Remove containers: "
-          + homeResponse.getCompletedContainersStatuses());
-      LOG.debug("Adding containers: " + homeResponse.getAllocatedContainers());
-    }
     removeFinishedContainersFromCache(
         homeResponse.getCompletedContainersStatuses());
     cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
@@ -989,6 +1201,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   private void removeFinishedContainersFromCache(
       List<ContainerStatus> finishedContainers) {
     for (ContainerStatus container : finishedContainers) {
+      LOG.debug("Completed container {}", container);
       if (containerIdToSubClusterIdMap
           .containsKey(container.getContainerId())) {
         containerIdToSubClusterIdMap.remove(container.getContainerId());
@@ -1146,12 +1359,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   private void cacheAllocatedContainers(List<Container> containers,
       SubClusterId subClusterId) {
     for (Container container : containers) {
+      LOG.debug("Adding container {}", container);
       if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
         SubClusterId existingSubClusterId =
             containerIdToSubClusterIdMap.get(container.getId());
         if (existingSubClusterId.equals(subClusterId)) {
-          // When RM fails over, the new RM master might send out the same
-          // container allocation more than once. Just move on in this case.
+          /*
+           * When RM fails over, the new RM master might send out the same
+           * container allocation more than once.
+           *
+           * It is also possible because of a recent NM restart with NM recovery
+           * enabled. We recover running containers from RM. But RM might not
+           * notified AM of some of these containers yet. When RM dose notify,
+           * we will already have these containers in the map.
+           *
+           * Either case, just warn and move on.
+           */
           LOG.warn(
               "Duplicate containerID: {} found in the allocated containers"
                   + " from same sub-cluster: {}, so ignoring.",
@@ -1226,7 +1449,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   private boolean warnIfNotExists(ContainerId containerId, String actionName) {
     if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
-      LOG.error("AM is trying to {} a container {} that does not exist. ",
+      LOG.error(
+          "AM is trying to {} a container {} that does not exist. Might happen "
+              + "shortly after NM restart when NM recovery is enabled",
           actionName, containerId.toString());
       return false;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/670e8d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index da1d047..0319dbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -180,6 +181,20 @@ public abstract class BaseAMRMProxyTest {
     return new NMContext(null, null, null, null, stateStore, false, this.conf);
   }
 
+  // A utility method for intercepter recover unit test
+  protected Map<String, byte[]> recoverDataMapForAppAttempt(
+      NMStateStoreService nmStateStore, ApplicationAttemptId attemptId)
+      throws IOException {
+    RecoveredAMRMProxyState state = nmStateStore.loadAMRMProxyState();
+    for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state
+        .getAppContexts().entrySet()) {
+      if (entry.getKey().equals(attemptId)) {
+        return entry.getValue();
+      }
+    }
+    return null;
+  }
+
   protected List<ContainerId> getCompletedContainerIds(
       List<ContainerStatus> containerStatus) {
     List<ContainerId> ret = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/670e8d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index aa7ed69..eefaba1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -450,6 +450,104 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
   }
 
   @Test
+  public void testRecoverWithAMRMProxyHA() throws Exception {
+    testRecover(registry);
+  }
+
+  @Test
+  public void testRecoverWithoutAMRMProxyHA() throws Exception {
+    testRecover(null);
+  }
+
+  public void testRecover(RegistryOperations registryObj) throws Exception {
+    ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
+    userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        interceptor = new TestableFederationInterceptor();
+        interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
+            getConf(), attemptId, "test-user", null, null, null, registryObj));
+        interceptor.cleanupRegistry();
+
+        // Register the application
+        RegisterApplicationMasterRequest registerReq =
+            Records.newRecord(RegisterApplicationMasterRequest.class);
+        registerReq.setHost(Integer.toString(testAppId));
+        registerReq.setRpcPort(testAppId);
+        registerReq.setTrackingUrl("");
+
+        RegisterApplicationMasterResponse registerResponse =
+            interceptor.registerApplicationMaster(registerReq);
+        Assert.assertNotNull(registerResponse);
+
+        Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+        // Allocate one batch of containers
+        registerSubCluster(SubClusterId.newInstance("SC-1"));
+        registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+
+        int numberOfContainers = 3;
+        List<Container> containers =
+            getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+        Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+        // Prepare for Federation Interceptor restart and recover
+        Map<String, byte[]> recoveredDataMap =
+            recoverDataMapForAppAttempt(nmStateStore, attemptId);
+        String scEntry =
+            FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1";
+        if (registryObj == null) {
+          Assert.assertTrue(recoveredDataMap.containsKey(scEntry));
+        } else {
+          // When AMRMPRoxy HA is enabled, NMSS should not have the UAM token,
+          // it should be in Registry
+          Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
+        }
+
+        // Preserve the mock RM instances
+        MockResourceManagerFacade homeRM = interceptor.getHomeRM();
+        ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+            interceptor.getSecondaryRMs();
+
+        // Create a new intercepter instance and recover
+        interceptor = new TestableFederationInterceptor(homeRM, secondaries);
+        interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
+            getConf(), attemptId, "test-user", null, null, null, registryObj));
+        interceptor.recover(recoveredDataMap);
+
+        Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+        // Release all containers
+        releaseContainersAndAssert(containers);
+
+        // Finish the application
+        FinishApplicationMasterRequest finishReq =
+            Records.newRecord(FinishApplicationMasterRequest.class);
+        finishReq.setDiagnostics("");
+        finishReq.setTrackingUrl("");
+        finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+        FinishApplicationMasterResponse finshResponse =
+            interceptor.finishApplicationMaster(finishReq);
+        Assert.assertNotNull(finshResponse);
+        Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+        // After the application succeeds, the registry/NMSS entry should be
+        // cleaned up
+        if (registryObj != null) {
+          Assert.assertEquals(0,
+              interceptor.getRegistryClient().getAllApplications().size());
+        } else {
+          recoveredDataMap =
+              recoverDataMapForAppAttempt(nmStateStore, attemptId);
+          Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
+        }
+        return null;
+      }
+    });
+  }
+
+  @Test
   public void testRequestInterceptorChainCreation() throws Exception {
     RequestInterceptor root =
         super.getAMRMProxyService().createRequestInterceptorChain();
@@ -636,6 +734,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
             interceptor.finishApplicationMaster(finishReq);
         Assert.assertNotNull(finshResponse);
         Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+        // After the application succeeds, the registry entry should be deleted
+        if (interceptor.getRegistryClient() != null) {
+          Assert.assertEquals(0,
+              interceptor.getRegistryClient().getAllApplications().size());
+        }
         return null;
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/670e8d4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index 23c80ae..1088c69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -59,16 +58,17 @@ public class TestableFederationInterceptor extends FederationInterceptor {
     return new TestableUnmanagedAMPoolManager(threadPool);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  protected ApplicationMasterProtocol createHomeRMProxy(
-      AMRMProxyApplicationContext appContext) {
+  protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
+      Class<T> protocol, UserGroupInformation user) {
     synchronized (this) {
       if (mockRm == null) {
         mockRm = new MockResourceManagerFacade(
             new YarnConfiguration(super.getConf()), 0);
       }
     }
-    return mockRm;
+    return (T) mockRm;
   }
 
   @SuppressWarnings("unchecked")


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org