You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2018/08/27 17:33:11 UTC

hadoop git commit: YARN-8705. Refactor the UAM heartbeat thread in preparation for YARN-8696. Contributed by Botong Huang.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 7b1fa5693 -> f15258256


YARN-8705. Refactor the UAM heartbeat thread in preparation for YARN-8696. Contributed by Botong Huang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f1525825
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f1525825
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f1525825

Branch: refs/heads/trunk
Commit: f1525825623a1307b5aa55c456b6afa3e0c61135
Parents: 7b1fa56
Author: Giovanni Matteo Fumarola <gi...@microsoft.com>
Authored: Mon Aug 27 10:32:22 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@microsoft.com>
Committed: Mon Aug 27 10:32:22 2018 -0700

----------------------------------------------------------------------
 .../yarn/server/AMHeartbeatRequestHandler.java  | 227 +++++++++++++++++
 .../server/uam/UnmanagedApplicationManager.java | 170 ++-----------
 .../amrmproxy/FederationInterceptor.java        | 245 +++++++++----------
 3 files changed, 358 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1525825/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
new file mode 100644
index 0000000..42227bb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
@@ -0,0 +1,227 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Extends Thread and provides an implementation that is used for processing the
+ * AM heart beat request asynchronously and sending back the response using the
+ * callback method registered with the system.
+ */
+public class AMHeartbeatRequestHandler extends Thread {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(AMHeartbeatRequestHandler.class);
+
+  // Indication flag for the thread to keep running
+  private volatile boolean keepRunning;
+
+  private Configuration conf;
+  private ApplicationId applicationId;
+
+  private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
+  private AMRMClientRelayer rmProxyRelayer;
+  private UserGroupInformation userUgi;
+  private int lastResponseId;
+
+  public AMHeartbeatRequestHandler(Configuration conf,
+      ApplicationId applicationId) {
+    super("AMHeartbeatRequestHandler Heartbeat Handler Thread");
+    this.setUncaughtExceptionHandler(
+        new HeartBeatThreadUncaughtExceptionHandler());
+    this.keepRunning = true;
+
+    this.conf = conf;
+    this.applicationId = applicationId;
+    this.requestQueue = new LinkedBlockingQueue<>();
+
+    resetLastResponseId();
+  }
+
+  /**
+   * Shutdown the thread.
+   */
+  public void shutdown() {
+    this.keepRunning = false;
+    this.interrupt();
+  }
+
+  @Override
+  public void run() {
+    while (keepRunning) {
+      AsyncAllocateRequestInfo requestInfo;
+      try {
+        requestInfo = requestQueue.take();
+        if (requestInfo == null) {
+          throw new YarnException(
+              "Null requestInfo taken from request queue");
+        }
+        if (!keepRunning) {
+          break;
+        }
+
+        // change the response id before forwarding the allocate request as we
+        // could have different values for each UAM
+        AllocateRequest request = requestInfo.getRequest();
+        if (request == null) {
+          throw new YarnException("Null allocateRequest from requestInfo");
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
+              + ((request.getAskList() == null) ? " empty"
+                  : request.getAskList().size()));
+        }
+
+        request.setResponseId(lastResponseId);
+        AllocateResponse response = rmProxyRelayer.allocate(request);
+        if (response == null) {
+          throw new YarnException("Null allocateResponse from allocate");
+        }
+
+        lastResponseId = response.getResponseId();
+        // update token if RM has reissued/renewed
+        if (response.getAMRMToken() != null) {
+          LOG.debug("Received new AMRMToken");
+          YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
+              userUgi, conf);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
+              + ((response.getAllocatedContainers() == null) ? " empty"
+                  : response.getAllocatedContainers().size()));
+        }
+
+        if (requestInfo.getCallback() == null) {
+          throw new YarnException("Null callback from requestInfo");
+        }
+        requestInfo.getCallback().callback(response);
+      } catch (InterruptedException ex) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Interrupted while waiting for queue", ex);
+        }
+      } catch (Throwable ex) {
+        LOG.warn(
+            "Error occurred while processing heart beat for " + applicationId,
+            ex);
+      }
+    }
+
+    LOG.info("AMHeartbeatRequestHandler thread for {} is exiting",
+        applicationId);
+  }
+
+  /**
+   * Reset the lastResponseId to zero.
+   */
+  public void resetLastResponseId() {
+    this.lastResponseId = 0;
+  }
+
+  /**
+   * Set the AMRMClientRelayer for RM connection.
+   */
+  public void setAMRMClientRelayer(AMRMClientRelayer relayer) {
+    this.rmProxyRelayer = relayer;
+  }
+
+  /**
+   * Set the UGI for RM connection.
+   */
+  public void setUGI(UserGroupInformation ugi) {
+    this.userUgi = ugi;
+  }
+
+  /**
+   * Sends the specified heart beat request to the resource manager and invokes
+   * the callback asynchronously with the response.
+   *
+   * @param request the allocate request
+   * @param callback the callback method for the request
+   * @throws YarnException if registerAM is not called yet
+   */
+  public void allocateAsync(AllocateRequest request,
+      AsyncCallback<AllocateResponse> callback) throws YarnException {
+    try {
+      this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
+    } catch (InterruptedException ex) {
+      // Should not happen as we have MAX_INT queue length
+      LOG.debug("Interrupted while waiting to put on response queue", ex);
+    }
+  }
+
+  @VisibleForTesting
+  public int getRequestQueueSize() {
+    return this.requestQueue.size();
+  }
+
+  /**
+   * Data structure that encapsulates AllocateRequest and AsyncCallback
+   * instance.
+   */
+  public static class AsyncAllocateRequestInfo {
+    private AllocateRequest request;
+    private AsyncCallback<AllocateResponse> callback;
+
+    public AsyncAllocateRequestInfo(AllocateRequest request,
+        AsyncCallback<AllocateResponse> callback) {
+      Preconditions.checkArgument(request != null,
+          "AllocateRequest cannot be null");
+      Preconditions.checkArgument(callback != null, "Callback cannot be null");
+
+      this.request = request;
+      this.callback = callback;
+    }
+
+    public AsyncCallback<AllocateResponse> getCallback() {
+      return this.callback;
+    }
+
+    public AllocateRequest getRequest() {
+      return this.request;
+    }
+  }
+
+  /**
+   * Uncaught exception handler for the background heartbeat thread.
+   */
+  public class HeartBeatThreadUncaughtExceptionHandler
+      implements UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      LOG.error("Heartbeat thread {} for application {} crashed!", t.getName(),
+          applicationId, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1525825/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index abdec19..78dcfb6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -19,11 +19,8 @@
 package org.apache.hadoop.yarn.server.uam;
 
 import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.EnumSet;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -63,9 +60,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
 import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
@@ -89,8 +86,7 @@ public class UnmanagedApplicationManager {
   public static final String APP_NAME = "UnmanagedAM";
   private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
 
-  private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
-  private AMRequestHandlerThread handlerThread;
+  private AMHeartbeatRequestHandler heartbeatHandler;
   private AMRMClientRelayer rmProxyRelayer;
   private ApplicationId applicationId;
   private String submitter;
@@ -99,7 +95,6 @@ public class UnmanagedApplicationManager {
   private String queueName;
   private UserGroupInformation userUgi;
   private RegisterApplicationMasterRequest registerRequest;
-  private int lastResponseId;
   private ApplicationClientProtocol rmClient;
   private long asyncApiPollIntervalMillis;
   private RecordFactory recordFactory;
@@ -137,8 +132,8 @@ public class UnmanagedApplicationManager {
     this.queueName = queueName;
     this.submitter = submitter;
     this.appNameSuffix = appNameSuffix;
-    this.handlerThread = new AMRequestHandlerThread();
-    this.requestQueue = new LinkedBlockingQueue<>();
+    this.heartbeatHandler =
+        new AMHeartbeatRequestHandler(this.conf, this.applicationId);
     this.rmProxyRelayer = null;
     this.connectionInitiated = false;
     this.registerRequest = null;
@@ -194,6 +189,9 @@ public class UnmanagedApplicationManager {
     this.rmProxyRelayer =
         new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
             this.conf, this.userUgi, amrmToken), this.applicationId);
+
+    this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
+    this.heartbeatHandler.setUGI(this.userUgi);
   }
 
   /**
@@ -215,7 +213,7 @@ public class UnmanagedApplicationManager {
         this.applicationId);
     RegisterApplicationMasterResponse response =
         this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
-    this.lastResponseId = 0;
+    this.heartbeatHandler.resetLastResponseId();
 
     for (Container container : response.getContainersFromPreviousAttempts()) {
       LOG.debug("RegisterUAM returned existing running container "
@@ -227,12 +225,9 @@ public class UnmanagedApplicationManager {
     }
 
     // Only when register succeed that we start the heartbeat thread
-    this.handlerThread.setUncaughtExceptionHandler(
-        new HeartBeatThreadUncaughtExceptionHandler());
-    this.handlerThread.setDaemon(true);
-    this.handlerThread.start();
+    this.heartbeatHandler.setDaemon(true);
+    this.heartbeatHandler.start();
 
-    this.lastResponseId = 0;
     return response;
   }
 
@@ -248,7 +243,7 @@ public class UnmanagedApplicationManager {
       FinishApplicationMasterRequest request)
       throws YarnException, IOException {
 
-    this.handlerThread.shutdown();
+    this.heartbeatHandler.shutdown();
 
     if (this.rmProxyRelayer == null) {
       if (this.connectionInitiated) {
@@ -277,7 +272,7 @@ public class UnmanagedApplicationManager {
     KillApplicationRequest request =
         KillApplicationRequest.newInstance(this.applicationId);
 
-    this.handlerThread.shutdown();
+    this.heartbeatHandler.shutdown();
 
     if (this.rmClient == null) {
       this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
@@ -296,12 +291,8 @@ public class UnmanagedApplicationManager {
    */
   public void allocateAsync(AllocateRequest request,
       AsyncCallback<AllocateResponse> callback) throws YarnException {
-    try {
-      this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
-    } catch (InterruptedException ex) {
-      // Should not happen as we have MAX_INT queue length
-      LOG.debug("Interrupted while waiting to put on response queue", ex);
-    }
+    this.heartbeatHandler.allocateAsync(request, callback);
+
     // Two possible cases why the UAM is not successfully registered yet:
     // 1. launchUAM is not called at all. Should throw here.
     // 2. launchUAM is called but hasn't successfully returned.
@@ -519,139 +510,8 @@ public class UnmanagedApplicationManager {
     return this.rmClient.getApplicationReport(request).getApplicationReport();
   }
 
-  /**
-   * Data structure that encapsulates AllocateRequest and AsyncCallback
-   * instance.
-   */
-  public static class AsyncAllocateRequestInfo {
-    private AllocateRequest request;
-    private AsyncCallback<AllocateResponse> callback;
-
-    public AsyncAllocateRequestInfo(AllocateRequest request,
-        AsyncCallback<AllocateResponse> callback) {
-      Preconditions.checkArgument(request != null,
-          "AllocateRequest cannot be null");
-      Preconditions.checkArgument(callback != null, "Callback cannot be null");
-
-      this.request = request;
-      this.callback = callback;
-    }
-
-    public AsyncCallback<AllocateResponse> getCallback() {
-      return this.callback;
-    }
-
-    public AllocateRequest getRequest() {
-      return this.request;
-    }
-  }
-
   @VisibleForTesting
   public int getRequestQueueSize() {
-    return this.requestQueue.size();
-  }
-
-  /**
-   * Extends Thread and provides an implementation that is used for processing
-   * the AM heart beat request asynchronously and sending back the response
-   * using the callback method registered with the system.
-   */
-  public class AMRequestHandlerThread extends Thread {
-
-    // Indication flag for the thread to keep running
-    private volatile boolean keepRunning;
-
-    public AMRequestHandlerThread() {
-      super("UnmanagedApplicationManager Heartbeat Handler Thread");
-      this.keepRunning = true;
-    }
-
-    /**
-     * Shutdown the thread.
-     */
-    public void shutdown() {
-      this.keepRunning = false;
-      this.interrupt();
-    }
-
-    @Override
-    public void run() {
-      while (keepRunning) {
-        AsyncAllocateRequestInfo requestInfo;
-        try {
-          requestInfo = requestQueue.take();
-          if (requestInfo == null) {
-            throw new YarnException(
-                "Null requestInfo taken from request queue");
-          }
-          if (!keepRunning) {
-            break;
-          }
-
-          // change the response id before forwarding the allocate request as we
-          // could have different values for each UAM
-          AllocateRequest request = requestInfo.getRequest();
-          if (request == null) {
-            throw new YarnException("Null allocateRequest from requestInfo");
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
-                + ((request.getAskList() == null) ? " empty"
-                    : request.getAskList().size()));
-          }
-
-          request.setResponseId(lastResponseId);
-          AllocateResponse response = rmProxyRelayer.allocate(request);
-          if (response == null) {
-            throw new YarnException("Null allocateResponse from allocate");
-          }
-
-          lastResponseId = response.getResponseId();
-          // update token if RM has reissued/renewed
-          if (response.getAMRMToken() != null) {
-            LOG.debug("Received new AMRMToken");
-            YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
-                userUgi, conf);
-          }
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
-                + ((response.getAllocatedContainers() == null) ? " empty"
-                    : response.getAllocatedContainers().size()));
-          }
-
-          if (requestInfo.getCallback() == null) {
-            throw new YarnException("Null callback from requestInfo");
-          }
-          requestInfo.getCallback().callback(response);
-        } catch (InterruptedException ex) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Interrupted while waiting for queue", ex);
-          }
-        } catch (IOException ex) {
-          LOG.warn("IO Error occurred while processing heart beat for "
-              + applicationId, ex);
-        } catch (Throwable ex) {
-          LOG.warn(
-              "Error occurred while processing heart beat for " + applicationId,
-              ex);
-        }
-      }
-
-      LOG.info("UnmanagedApplicationManager has been stopped for {}. "
-          + "AMRequestHandlerThread thread is exiting", applicationId);
-    }
-  }
-
-  /**
-   * Uncaught exception handler for the background heartbeat thread.
-   */
-  protected class HeartBeatThreadUncaughtExceptionHandler
-      implements UncaughtExceptionHandler {
-    @Override
-    public void uncaughtException(Thread t, Throwable e) {
-      LOG.error("Heartbeat thread {} for application {} crashed!",
-          t.getName(), applicationId, e);
-    }
+    return this.heartbeatHandler.getRequestQueueSize();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1525825/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 65a2277..eb818f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -116,6 +116,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       NMSS_CLASS_PREFIX + "secondarySC/";
   public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
 
+  private ApplicationAttemptId attemptId;
+
   /**
    * The home sub-cluster is the sub-cluster where the AM container is running
    * in.
@@ -125,20 +127,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   private volatile int lastHomeResponseId;
 
   /**
-   * A flag for work preserving NM restart. If we just recovered, we need to
-   * generate an {@link ApplicationMasterNotRegisteredException} exception back
-   * to AM (similar to what RM will do after its restart/fail-over) in its next
-   * allocate to trigger AM re-register (which we will shield from RM and just
-   * return our saved register response) and a full pending requests re-send, so
-   * that all the {@link AMRMClientRelayer} will be re-populated with all
-   * pending requests.
-   *
-   * TODO: When split-merge is not idempotent, this can lead to some
-   * over-allocation without a full cancel to RM.
-   */
-  private volatile boolean justRecovered;
-
-  /**
    * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
    * using subClusterId as uamId. One UAM is created per sub-cluster RM except
    * the home RM.
@@ -156,15 +144,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   private Map<String, AMRMClientRelayer> secondaryRelayers;
 
-  /** Thread pool used for asynchronous operations. */
-  private ExecutorService threadpool;
-
   /**
    * Stores the AllocateResponses that are received asynchronously from all the
    * sub-cluster resource managers except the home RM.
    */
   private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
 
+  /** Thread pool used for asynchronous operations. */
+  private ExecutorService threadpool;
+
+  /**
+   * A flag for work preserving NM restart. If we just recovered, we need to
+   * generate an {@link ApplicationMasterNotRegisteredException} exception back
+   * to AM (similar to what RM will do after its restart/fail-over) in its next
+   * allocate to trigger AM re-register (which we will shield from RM and just
+   * return our saved register response) and a full pending requests re-send, so
+   * that all the {@link AMRMClientRelayer} will be re-populated with all
+   * pending requests.
+   *
+   * TODO: When split-merge is not idempotent, this can lead to some
+   * over-allocation without a full cancel to RM.
+   */
+  private volatile boolean justRecovered;
+
   /**
    * Used to keep track of the container Id and the sub cluster RM that created
    * the container, so that we know which sub-cluster to forward later requests
@@ -179,7 +181,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   private RegisterApplicationMasterRequest amRegistrationRequest;
 
   /**
-   * The original registration response from home RM. This instance is reused
+   * The original registration response returned to AM. This instance is reused
    * for duplicate register request from AM, triggered by timeout between AM and
    * AMRMProxy.
    */
@@ -247,12 +249,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
     }
 
+    this.attemptId = appContext.getApplicationAttemptId();
+    ApplicationId appId = this.attemptId.getApplicationId();
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
-    this.homeRMRelayer = new AMRMClientRelayer(
-        createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
-            this.appOwner),
-        getApplicationContext().getApplicationAttemptId().getApplicationId());
+    this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
+        ApplicationMasterProtocol.class, this.appOwner), appId);
 
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -267,9 +269,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   @Override
   public void recover(Map<String, byte[]> recoveredDataMap) {
     super.recover(recoveredDataMap);
-    ApplicationAttemptId attemptId =
-        getApplicationContext().getApplicationAttemptId();
-    LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
+    LOG.info("Recovering data for FederationInterceptor for {}",
+        this.attemptId);
     if (recoveredDataMap == null) {
       return;
     }
@@ -280,7 +281,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                 .parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
         this.amRegistrationRequest =
             new RegisterApplicationMasterRequestPBImpl(pb);
-        LOG.info("amRegistrationRequest recovered for {}", attemptId);
+        LOG.info("amRegistrationRequest recovered for {}", this.attemptId);
 
         // Give the register request to homeRMRelayer for future re-registration
         this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
@@ -291,7 +292,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                 .parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
         this.amRegistrationResponse =
             new RegisterApplicationMasterResponsePBImpl(pb);
-        LOG.info("amRegistrationResponse recovered for {}", attemptId);
+        LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
         // Trigger re-register and full pending re-send only if we have a
         // saved register response. This should always be true though.
         this.justRecovered = true;
@@ -301,9 +302,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       Map<String, Token<AMRMTokenIdentifier>> uamMap;
       if (this.registryClient != null) {
         uamMap = this.registryClient
-            .loadStateFromRegistry(attemptId.getApplicationId());
+            .loadStateFromRegistry(this.attemptId.getApplicationId());
         LOG.info("Found {} existing UAMs for application {} in Yarn Registry",
-            uamMap.size(), attemptId.getApplicationId());
+            uamMap.size(), this.attemptId.getApplicationId());
       } else {
         uamMap = new HashMap<>();
         for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
@@ -319,7 +320,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           }
         }
         LOG.info("Found {} existing UAMs for application {} in NMStateStore",
-            uamMap.size(), attemptId.getApplicationId());
+            uamMap.size(), this.attemptId.getApplicationId());
       }
 
       // Re-attach the UAMs
@@ -336,7 +337,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
         try {
           this.uamPool.reAttachUAM(subClusterId.getId(), config,
-              attemptId.getApplicationId(),
+              this.attemptId.getApplicationId(),
               this.amRegistrationResponse.getQueue(),
               getApplicationContext().getUser(), this.homeSubClusterId.getId(),
               entry.getValue());
@@ -359,9 +360,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
               subClusterId);
 
         } catch (Exception e) {
-          LOG.error(
-              "Error reattaching UAM to " + subClusterId + " for " + attemptId,
-              e);
+          LOG.error("Error reattaching UAM to " + subClusterId + " for "
+              + this.attemptId, e);
         }
       }
 
@@ -374,8 +374,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           createHomeRMProxy(getApplicationContext(),
               ApplicationClientProtocol.class, appSubmitter);
 
-      GetContainersResponse response =
-          rmClient.getContainers(GetContainersRequest.newInstance(attemptId));
+      GetContainersResponse response = rmClient
+          .getContainers(GetContainersRequest.newInstance(this.attemptId));
       for (ContainerReport container : response.getContainerList()) {
         containerIdToSubClusterIdMap.put(container.getContainerId(),
             this.homeSubClusterId);
@@ -388,7 +388,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
       LOG.info(
           "In all {} UAMs {} running containers including AM recovered for {}",
-          uamMap.size(), containers, attemptId);
+          uamMap.size(), containers, this.attemptId);
 
       if (this.amRegistrationResponse != null) {
         // Initialize the AMRMProxyPolicy
@@ -439,12 +439,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           RegisterApplicationMasterRequestPBImpl pb =
               (RegisterApplicationMasterRequestPBImpl)
                   this.amRegistrationRequest;
-          getNMStateStore().storeAMRMProxyAppContextEntry(
-              getApplicationContext().getApplicationAttemptId(),
+          getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
               NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
         } catch (Exception e) {
           LOG.error("Error storing AMRMProxy application context entry for "
-              + getApplicationContext().getApplicationAttemptId(), e);
+              + this.attemptId, e);
         }
       }
     }
@@ -479,8 +478,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           this.homeSubClusterId);
     }
 
-    ApplicationId appId =
-        getApplicationContext().getApplicationAttemptId().getApplicationId();
+    ApplicationId appId = this.attemptId.getApplicationId();
     reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
 
     if (getNMStateStore() != null) {
@@ -488,12 +486,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         RegisterApplicationMasterResponsePBImpl pb =
             (RegisterApplicationMasterResponsePBImpl)
                 this.amRegistrationResponse;
-        getNMStateStore().storeAMRMProxyAppContextEntry(
-            getApplicationContext().getApplicationAttemptId(),
+        getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
             NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
       } catch (Exception e) {
         LOG.error("Error storing AMRMProxy application context entry for "
-            + getApplicationContext().getApplicationAttemptId(), e);
+            + this.attemptId, e);
       }
     }
 
@@ -535,8 +532,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       this.lastHomeResponseId = request.getResponseId();
 
       throw new ApplicationMasterNotRegisteredException(
-          "AMRMProxy just restarted and recovered for "
-              + getApplicationContext().getApplicationAttemptId()
+          "AMRMProxy just restarted and recovered for " + this.attemptId
               + ". AM should re-register and full re-send pending requests.");
     }
 
@@ -553,8 +549,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     if (this.justRecovered
         || request.getResponseId() > this.lastHomeResponseId) {
       LOG.warn("Setting allocate responseId for {} from {} to {}",
-          getApplicationContext().getApplicationAttemptId(),
-          request.getResponseId(), this.lastHomeResponseId);
+          this.attemptId, request.getResponseId(), this.lastHomeResponseId);
       request.setResponseId(this.lastHomeResponseId);
     }
 
@@ -573,8 +568,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
       // Send the request to the home RM and get the response
       AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
-      LOG.info("{} heartbeating to home RM with responseId {}",
-          getApplicationContext().getApplicationAttemptId(),
+      LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId,
           homeRequest.getResponseId());
 
       AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
@@ -612,8 +606,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
 
       LOG.info("{} heartbeat response from home RM with responseId {}",
-          getApplicationContext().getApplicationAttemptId(),
-          homeResponse.getResponseId());
+          this.attemptId, homeResponse.getResponseId());
 
       // Update lastHomeResponseId in three cases:
       // 1. The normal responseId increments
@@ -676,15 +669,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                 secondaryRelayers.remove(subClusterId);
 
                 if (getNMStateStore() != null) {
-                  getNMStateStore().removeAMRMProxyAppContextEntry(
-                      getApplicationContext().getApplicationAttemptId(),
+                  getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
                       NMSS_SECONDARY_SC_PREFIX + subClusterId);
                 }
               }
             } catch (Throwable e) {
               LOG.warn("Failed to finish unmanaged application master: "
                   + "RM address: " + subClusterId + " ApplicationId: "
-                  + getApplicationContext().getApplicationAttemptId(), e);
+                  + attemptId, e);
             }
             return new FinishApplicationMasterResponseInfo(uamResponse,
                 subClusterId);
@@ -720,8 +712,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         } catch (Throwable e) {
           failedToUnRegister = true;
           LOG.warn("Failed to finish unmanaged application master: "
-              + " ApplicationId: "
-              + getApplicationContext().getApplicationAttemptId(), e);
+              + " ApplicationId: " + this.attemptId, e);
         }
       }
     }
@@ -733,8 +724,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       // attempt will be launched.
       this.uamPool.stop();
       if (this.registryClient != null) {
-        this.registryClient.removeAppFromRegistry(getApplicationContext()
-            .getApplicationAttemptId().getApplicationId());
+        this.registryClient
+            .removeAppFromRegistry(this.attemptId.getApplicationId());
       }
     }
     return homeResponse;
@@ -755,12 +746,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   public void shutdown() {
     // Do not stop uamPool service and kill UAMs here because of possible second
     // app attempt
-    if (threadpool != null) {
+    if (this.threadpool != null) {
       try {
-        threadpool.shutdown();
+        this.threadpool.shutdown();
       } catch (Throwable ex) {
       }
-      threadpool = null;
+      this.threadpool = null;
     }
     super.shutdown();
   }
@@ -1090,59 +1081,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
 
       this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
-          new AsyncCallback<AllocateResponse>() {
-            @Override
-            public void callback(AllocateResponse response) {
-              synchronized (asyncResponseSink) {
-                List<AllocateResponse> responses = null;
-                if (asyncResponseSink.containsKey(subClusterId)) {
-                  responses = asyncResponseSink.get(subClusterId);
-                } else {
-                  responses = new ArrayList<>();
-                  asyncResponseSink.put(subClusterId, responses);
-                }
-                responses.add(response);
-              }
-
-              // Save the new AMRMToken for the UAM if present
-              if (response.getAMRMToken() != null) {
-                Token<AMRMTokenIdentifier> newToken = ConverterUtils
-                    .convertFromYarn(response.getAMRMToken(), (Text) null);
-                // Update the token in registry or NMSS
-                if (registryClient != null) {
-                  registryClient
-                      .writeAMRMTokenForUAM(
-                          getApplicationContext().getApplicationAttemptId()
-                              .getApplicationId(),
-                          subClusterId.getId(), newToken);
-                } else if (getNMStateStore() != null) {
-                  try {
-                    getNMStateStore().storeAMRMProxyAppContextEntry(
-                        getApplicationContext().getApplicationAttemptId(),
-                        NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
-                        newToken.encodeToUrlString()
-                            .getBytes(STRING_TO_BYTE_FORMAT));
-                  } catch (IOException e) {
-                    LOG.error(
-                        "Error storing UAM token as AMRMProxy "
-                            + "context entry in NMSS for "
-                            + getApplicationContext().getApplicationAttemptId(),
-                        e);
-                  }
-                }
-              }
-
-              // Notify policy of secondary sub-cluster responses
-              try {
-                policyInterpreter.notifyOfResponse(subClusterId, response);
-              } catch (YarnException e) {
-                LOG.warn(
-                    "notifyOfResponse for policy failed for home sub-cluster "
-                        + subClusterId,
-                    e);
-              }
-            }
-          });
+          new HeartbeatCallBack(subClusterId));
     }
 
     return registrations;
@@ -1195,7 +1134,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                 try {
                   // For appNameSuffix, use subClusterId of the home sub-cluster
                   token = uamPool.launchUAM(subClusterId, config,
-                      appContext.getApplicationAttemptId().getApplicationId(),
+                      attemptId.getApplicationId(),
                       amRegistrationResponse.getQueue(), appContext.getUser(),
                       homeSubClusterId.toString(), true);
 
@@ -1206,8 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                       registerRequest);
                 } catch (Throwable e) {
                   LOG.error("Failed to register application master: "
-                      + subClusterId + " Application: "
-                      + appContext.getApplicationAttemptId(), e);
+                      + subClusterId + " Application: " + attemptId, e);
                 }
                 return new RegisterApplicationMasterResponseInfo(uamResponse,
                     SubClusterId.newInstance(subClusterId), token);
@@ -1232,20 +1170,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           } else {
             LOG.info("Successfully registered unmanaged application master: "
                 + uamResponse.getSubClusterId() + " ApplicationId: "
-                + getApplicationContext().getApplicationAttemptId());
+                + this.attemptId);
             successfulRegistrations.put(uamResponse.getSubClusterId(),
                 uamResponse.getResponse());
 
             // Save the UAM token in registry or NMSS
             if (registryClient != null) {
               registryClient.writeAMRMTokenForUAM(
-                  getApplicationContext().getApplicationAttemptId()
-                      .getApplicationId(),
+                  this.attemptId.getApplicationId(),
                   uamResponse.getSubClusterId().getId(),
                   uamResponse.getUamToken());
             } else if (getNMStateStore() != null) {
-              getNMStateStore().storeAMRMProxyAppContextEntry(
-                  getApplicationContext().getApplicationAttemptId(),
+              getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
                   NMSS_SECONDARY_SC_PREFIX
                       + uamResponse.getSubClusterId().getId(),
                   uamResponse.getUamToken().encodeToUrlString()
@@ -1254,8 +1190,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           }
         } catch (Exception e) {
           LOG.warn("Failed to register unmanaged application master: "
-              + " ApplicationId: "
-              + getApplicationContext().getApplicationAttemptId(), e);
+              + " ApplicationId: " + this.attemptId, e);
         }
       }
     }
@@ -1490,9 +1425,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
               "Duplicate containerID found in the allocated containers. This"
                   + " can happen if the RM epoch is not configured properly."
                   + " ContainerId: " + container.getId().toString()
-                  + " ApplicationId: "
-                  + getApplicationContext().getApplicationAttemptId()
-                  + " From RM: " + subClusterId
+                  + " ApplicationId: " + this.attemptId + " From RM: "
+                  + subClusterId
                   + " . Previous container was from sub-cluster: "
                   + existingSubClusterId);
         }
@@ -1588,6 +1522,59 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   }
 
   /**
+   * Async callback handler for heart beat response from all sub-clusters.
+   */
+  private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
+    private SubClusterId subClusterId;
+
+    HeartbeatCallBack(SubClusterId subClusterId) {
+      this.subClusterId = subClusterId;
+    }
+
+    @Override
+    public void callback(AllocateResponse response) {
+      synchronized (asyncResponseSink) {
+        List<AllocateResponse> responses = null;
+        if (asyncResponseSink.containsKey(subClusterId)) {
+          responses = asyncResponseSink.get(subClusterId);
+        } else {
+          responses = new ArrayList<>();
+          asyncResponseSink.put(subClusterId, responses);
+        }
+        responses.add(response);
+      }
+
+      // Save the new AMRMToken for the UAM if present
+      if (response.getAMRMToken() != null) {
+        Token<AMRMTokenIdentifier> newToken = ConverterUtils
+            .convertFromYarn(response.getAMRMToken(), (Text) null);
+        // Update the token in registry or NMSS
+        if (registryClient != null) {
+          registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
+              subClusterId.getId(), newToken);
+        } else if (getNMStateStore() != null) {
+          try {
+            getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
+                NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
+                newToken.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
+          } catch (IOException e) {
+            LOG.error("Error storing UAM token as AMRMProxy "
+                + "context entry in NMSS for " + attemptId, e);
+          }
+        }
+      }
+
+      // Notify policy of secondary sub-cluster responses
+      try {
+        policyInterpreter.notifyOfResponse(subClusterId, response);
+      } catch (YarnException e) {
+        LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+            + subClusterId, e);
+      }
+    }
+  }
+
+  /**
    * Private structure for encapsulating SubClusterId and
    * RegisterApplicationMasterResponse instances.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org