You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/31 16:10:37 UTC
[11/47] hadoop git commit: YARN-8705. Refactor the UAM heartbeat
thread in preparation for YARN-8696. Contributed by Botong Huang.
YARN-8705. Refactor the UAM heartbeat thread in preparation for YARN-8696. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f1525825
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f1525825
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f1525825
Branch: refs/heads/HDFS-12943
Commit: f1525825623a1307b5aa55c456b6afa3e0c61135
Parents: 7b1fa56
Author: Giovanni Matteo Fumarola <gi...@microsoft.com>
Authored: Mon Aug 27 10:32:22 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@microsoft.com>
Committed: Mon Aug 27 10:32:22 2018 -0700
----------------------------------------------------------------------
.../yarn/server/AMHeartbeatRequestHandler.java | 227 +++++++++++++++++
.../server/uam/UnmanagedApplicationManager.java | 170 ++-----------
.../amrmproxy/FederationInterceptor.java | 245 +++++++++----------
3 files changed, 358 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1525825/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
new file mode 100644
index 0000000..42227bb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
@@ -0,0 +1,227 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Extends Thread and provides an implementation that is used for processing the
+ * AM heart beat request asynchronously and sending back the response using the
+ * callback method registered with the system.
+ */
+public class AMHeartbeatRequestHandler extends Thread {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(AMHeartbeatRequestHandler.class);
+
+ // Indication flag for the thread to keep running
+ private volatile boolean keepRunning;
+
+ private Configuration conf;
+ private ApplicationId applicationId;
+
+ private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
+ private AMRMClientRelayer rmProxyRelayer;
+ private UserGroupInformation userUgi;
+ private int lastResponseId;
+
+ public AMHeartbeatRequestHandler(Configuration conf,
+ ApplicationId applicationId) {
+ super("AMHeartbeatRequestHandler Heartbeat Handler Thread");
+ this.setUncaughtExceptionHandler(
+ new HeartBeatThreadUncaughtExceptionHandler());
+ this.keepRunning = true;
+
+ this.conf = conf;
+ this.applicationId = applicationId;
+ this.requestQueue = new LinkedBlockingQueue<>();
+
+ resetLastResponseId();
+ }
+
+ /**
+ * Shutdown the thread.
+ */
+ public void shutdown() {
+ this.keepRunning = false;
+ this.interrupt();
+ }
+
+ @Override
+ public void run() {
+ while (keepRunning) {
+ AsyncAllocateRequestInfo requestInfo;
+ try {
+ requestInfo = requestQueue.take();
+ if (requestInfo == null) {
+ throw new YarnException(
+ "Null requestInfo taken from request queue");
+ }
+ if (!keepRunning) {
+ break;
+ }
+
+ // change the response id before forwarding the allocate request as we
+ // could have different values for each UAM
+ AllocateRequest request = requestInfo.getRequest();
+ if (request == null) {
+ throw new YarnException("Null allocateRequest from requestInfo");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
+ + ((request.getAskList() == null) ? " empty"
+ : request.getAskList().size()));
+ }
+
+ request.setResponseId(lastResponseId);
+ AllocateResponse response = rmProxyRelayer.allocate(request);
+ if (response == null) {
+ throw new YarnException("Null allocateResponse from allocate");
+ }
+
+ lastResponseId = response.getResponseId();
+ // update token if RM has reissued/renewed
+ if (response.getAMRMToken() != null) {
+ LOG.debug("Received new AMRMToken");
+ YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
+ userUgi, conf);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
+ + ((response.getAllocatedContainers() == null) ? " empty"
+ : response.getAllocatedContainers().size()));
+ }
+
+ if (requestInfo.getCallback() == null) {
+ throw new YarnException("Null callback from requestInfo");
+ }
+ requestInfo.getCallback().callback(response);
+ } catch (InterruptedException ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted while waiting for queue", ex);
+ }
+ } catch (Throwable ex) {
+ LOG.warn(
+ "Error occurred while processing heart beat for " + applicationId,
+ ex);
+ }
+ }
+
+ LOG.info("AMHeartbeatRequestHandler thread for {} is exiting",
+ applicationId);
+ }
+
+ /**
+ * Reset the lastResponseId to zero.
+ */
+ public void resetLastResponseId() {
+ this.lastResponseId = 0;
+ }
+
+ /**
+ * Set the AMRMClientRelayer for RM connection.
+ */
+ public void setAMRMClientRelayer(AMRMClientRelayer relayer) {
+ this.rmProxyRelayer = relayer;
+ }
+
+ /**
+ * Set the UGI for RM connection.
+ */
+ public void setUGI(UserGroupInformation ugi) {
+ this.userUgi = ugi;
+ }
+
+ /**
+ * Sends the specified heart beat request to the resource manager and invokes
+ * the callback asynchronously with the response.
+ *
+ * @param request the allocate request
+ * @param callback the callback method for the request
+ * @throws YarnException if registerAM is not called yet
+ */
+ public void allocateAsync(AllocateRequest request,
+ AsyncCallback<AllocateResponse> callback) throws YarnException {
+ try {
+ this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
+ } catch (InterruptedException ex) {
+ // Should not happen as we have MAX_INT queue length
+ LOG.debug("Interrupted while waiting to put on response queue", ex);
+ }
+ }
+
+ @VisibleForTesting
+ public int getRequestQueueSize() {
+ return this.requestQueue.size();
+ }
+
+ /**
+ * Data structure that encapsulates AllocateRequest and AsyncCallback
+ * instance.
+ */
+ public static class AsyncAllocateRequestInfo {
+ private AllocateRequest request;
+ private AsyncCallback<AllocateResponse> callback;
+
+ public AsyncAllocateRequestInfo(AllocateRequest request,
+ AsyncCallback<AllocateResponse> callback) {
+ Preconditions.checkArgument(request != null,
+ "AllocateRequest cannot be null");
+ Preconditions.checkArgument(callback != null, "Callback cannot be null");
+
+ this.request = request;
+ this.callback = callback;
+ }
+
+ public AsyncCallback<AllocateResponse> getCallback() {
+ return this.callback;
+ }
+
+ public AllocateRequest getRequest() {
+ return this.request;
+ }
+ }
+
+ /**
+ * Uncaught exception handler for the background heartbeat thread.
+ */
+ public class HeartBeatThreadUncaughtExceptionHandler
+ implements UncaughtExceptionHandler {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Heartbeat thread {} for application {} crashed!", t.getName(),
+ applicationId, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1525825/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index abdec19..78dcfb6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -19,11 +19,8 @@
package org.apache.hadoop.yarn.server.uam;
import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.util.EnumSet;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -63,9 +60,9 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
@@ -89,8 +86,7 @@ public class UnmanagedApplicationManager {
public static final String APP_NAME = "UnmanagedAM";
private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
- private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
- private AMRequestHandlerThread handlerThread;
+ private AMHeartbeatRequestHandler heartbeatHandler;
private AMRMClientRelayer rmProxyRelayer;
private ApplicationId applicationId;
private String submitter;
@@ -99,7 +95,6 @@ public class UnmanagedApplicationManager {
private String queueName;
private UserGroupInformation userUgi;
private RegisterApplicationMasterRequest registerRequest;
- private int lastResponseId;
private ApplicationClientProtocol rmClient;
private long asyncApiPollIntervalMillis;
private RecordFactory recordFactory;
@@ -137,8 +132,8 @@ public class UnmanagedApplicationManager {
this.queueName = queueName;
this.submitter = submitter;
this.appNameSuffix = appNameSuffix;
- this.handlerThread = new AMRequestHandlerThread();
- this.requestQueue = new LinkedBlockingQueue<>();
+ this.heartbeatHandler =
+ new AMHeartbeatRequestHandler(this.conf, this.applicationId);
this.rmProxyRelayer = null;
this.connectionInitiated = false;
this.registerRequest = null;
@@ -194,6 +189,9 @@ public class UnmanagedApplicationManager {
this.rmProxyRelayer =
new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
this.conf, this.userUgi, amrmToken), this.applicationId);
+
+ this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
+ this.heartbeatHandler.setUGI(this.userUgi);
}
/**
@@ -215,7 +213,7 @@ public class UnmanagedApplicationManager {
this.applicationId);
RegisterApplicationMasterResponse response =
this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
- this.lastResponseId = 0;
+ this.heartbeatHandler.resetLastResponseId();
for (Container container : response.getContainersFromPreviousAttempts()) {
LOG.debug("RegisterUAM returned existing running container "
@@ -227,12 +225,9 @@ public class UnmanagedApplicationManager {
}
// Only when register succeed that we start the heartbeat thread
- this.handlerThread.setUncaughtExceptionHandler(
- new HeartBeatThreadUncaughtExceptionHandler());
- this.handlerThread.setDaemon(true);
- this.handlerThread.start();
+ this.heartbeatHandler.setDaemon(true);
+ this.heartbeatHandler.start();
- this.lastResponseId = 0;
return response;
}
@@ -248,7 +243,7 @@ public class UnmanagedApplicationManager {
FinishApplicationMasterRequest request)
throws YarnException, IOException {
- this.handlerThread.shutdown();
+ this.heartbeatHandler.shutdown();
if (this.rmProxyRelayer == null) {
if (this.connectionInitiated) {
@@ -277,7 +272,7 @@ public class UnmanagedApplicationManager {
KillApplicationRequest request =
KillApplicationRequest.newInstance(this.applicationId);
- this.handlerThread.shutdown();
+ this.heartbeatHandler.shutdown();
if (this.rmClient == null) {
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
@@ -296,12 +291,8 @@ public class UnmanagedApplicationManager {
*/
public void allocateAsync(AllocateRequest request,
AsyncCallback<AllocateResponse> callback) throws YarnException {
- try {
- this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback));
- } catch (InterruptedException ex) {
- // Should not happen as we have MAX_INT queue length
- LOG.debug("Interrupted while waiting to put on response queue", ex);
- }
+ this.heartbeatHandler.allocateAsync(request, callback);
+
// Two possible cases why the UAM is not successfully registered yet:
// 1. launchUAM is not called at all. Should throw here.
// 2. launchUAM is called but hasn't successfully returned.
@@ -519,139 +510,8 @@ public class UnmanagedApplicationManager {
return this.rmClient.getApplicationReport(request).getApplicationReport();
}
- /**
- * Data structure that encapsulates AllocateRequest and AsyncCallback
- * instance.
- */
- public static class AsyncAllocateRequestInfo {
- private AllocateRequest request;
- private AsyncCallback<AllocateResponse> callback;
-
- public AsyncAllocateRequestInfo(AllocateRequest request,
- AsyncCallback<AllocateResponse> callback) {
- Preconditions.checkArgument(request != null,
- "AllocateRequest cannot be null");
- Preconditions.checkArgument(callback != null, "Callback cannot be null");
-
- this.request = request;
- this.callback = callback;
- }
-
- public AsyncCallback<AllocateResponse> getCallback() {
- return this.callback;
- }
-
- public AllocateRequest getRequest() {
- return this.request;
- }
- }
-
@VisibleForTesting
public int getRequestQueueSize() {
- return this.requestQueue.size();
- }
-
- /**
- * Extends Thread and provides an implementation that is used for processing
- * the AM heart beat request asynchronously and sending back the response
- * using the callback method registered with the system.
- */
- public class AMRequestHandlerThread extends Thread {
-
- // Indication flag for the thread to keep running
- private volatile boolean keepRunning;
-
- public AMRequestHandlerThread() {
- super("UnmanagedApplicationManager Heartbeat Handler Thread");
- this.keepRunning = true;
- }
-
- /**
- * Shutdown the thread.
- */
- public void shutdown() {
- this.keepRunning = false;
- this.interrupt();
- }
-
- @Override
- public void run() {
- while (keepRunning) {
- AsyncAllocateRequestInfo requestInfo;
- try {
- requestInfo = requestQueue.take();
- if (requestInfo == null) {
- throw new YarnException(
- "Null requestInfo taken from request queue");
- }
- if (!keepRunning) {
- break;
- }
-
- // change the response id before forwarding the allocate request as we
- // could have different values for each UAM
- AllocateRequest request = requestInfo.getRequest();
- if (request == null) {
- throw new YarnException("Null allocateRequest from requestInfo");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
- + ((request.getAskList() == null) ? " empty"
- : request.getAskList().size()));
- }
-
- request.setResponseId(lastResponseId);
- AllocateResponse response = rmProxyRelayer.allocate(request);
- if (response == null) {
- throw new YarnException("Null allocateResponse from allocate");
- }
-
- lastResponseId = response.getResponseId();
- // update token if RM has reissued/renewed
- if (response.getAMRMToken() != null) {
- LOG.debug("Received new AMRMToken");
- YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(),
- userUgi, conf);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received Heartbeat reply from RM. Allocated Containers:"
- + ((response.getAllocatedContainers() == null) ? " empty"
- : response.getAllocatedContainers().size()));
- }
-
- if (requestInfo.getCallback() == null) {
- throw new YarnException("Null callback from requestInfo");
- }
- requestInfo.getCallback().callback(response);
- } catch (InterruptedException ex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interrupted while waiting for queue", ex);
- }
- } catch (IOException ex) {
- LOG.warn("IO Error occurred while processing heart beat for "
- + applicationId, ex);
- } catch (Throwable ex) {
- LOG.warn(
- "Error occurred while processing heart beat for " + applicationId,
- ex);
- }
- }
-
- LOG.info("UnmanagedApplicationManager has been stopped for {}. "
- + "AMRequestHandlerThread thread is exiting", applicationId);
- }
- }
-
- /**
- * Uncaught exception handler for the background heartbeat thread.
- */
- protected class HeartBeatThreadUncaughtExceptionHandler
- implements UncaughtExceptionHandler {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.error("Heartbeat thread {} for application {} crashed!",
- t.getName(), applicationId, e);
- }
+ return this.heartbeatHandler.getRequestQueueSize();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1525825/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 65a2277..eb818f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -116,6 +116,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
NMSS_CLASS_PREFIX + "secondarySC/";
public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
+ private ApplicationAttemptId attemptId;
+
/**
* The home sub-cluster is the sub-cluster where the AM container is running
* in.
@@ -125,20 +127,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private volatile int lastHomeResponseId;
/**
- * A flag for work preserving NM restart. If we just recovered, we need to
- * generate an {@link ApplicationMasterNotRegisteredException} exception back
- * to AM (similar to what RM will do after its restart/fail-over) in its next
- * allocate to trigger AM re-register (which we will shield from RM and just
- * return our saved register response) and a full pending requests re-send, so
- * that all the {@link AMRMClientRelayer} will be re-populated with all
- * pending requests.
- *
- * TODO: When split-merge is not idempotent, this can lead to some
- * over-allocation without a full cancel to RM.
- */
- private volatile boolean justRecovered;
-
- /**
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
* using subClusterId as uamId. One UAM is created per sub-cluster RM except
* the home RM.
@@ -156,15 +144,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private Map<String, AMRMClientRelayer> secondaryRelayers;
- /** Thread pool used for asynchronous operations. */
- private ExecutorService threadpool;
-
/**
* Stores the AllocateResponses that are received asynchronously from all the
* sub-cluster resource managers except the home RM.
*/
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
+ /** Thread pool used for asynchronous operations. */
+ private ExecutorService threadpool;
+
+ /**
+ * A flag for work preserving NM restart. If we just recovered, we need to
+ * generate an {@link ApplicationMasterNotRegisteredException} exception back
+ * to AM (similar to what RM will do after its restart/fail-over) in its next
+ * allocate to trigger AM re-register (which we will shield from RM and just
+ * return our saved register response) and a full pending requests re-send, so
+ * that all the {@link AMRMClientRelayer} will be re-populated with all
+ * pending requests.
+ *
+ * TODO: When split-merge is not idempotent, this can lead to some
+ * over-allocation without a full cancel to RM.
+ */
+ private volatile boolean justRecovered;
+
/**
* Used to keep track of the container Id and the sub cluster RM that created
* the container, so that we know which sub-cluster to forward later requests
@@ -179,7 +181,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private RegisterApplicationMasterRequest amRegistrationRequest;
/**
- * The original registration response from home RM. This instance is reused
+ * The original registration response returned to AM. This instance is reused
* for duplicate register request from AM, triggered by timeout between AM and
* AMRMProxy.
*/
@@ -247,12 +249,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
+ this.attemptId = appContext.getApplicationAttemptId();
+ ApplicationId appId = this.attemptId.getApplicationId();
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
- this.homeRMRelayer = new AMRMClientRelayer(
- createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
- this.appOwner),
- getApplicationContext().getApplicationAttemptId().getApplicationId());
+ this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
+ ApplicationMasterProtocol.class, this.appOwner), appId);
this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -267,9 +269,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
@Override
public void recover(Map<String, byte[]> recoveredDataMap) {
super.recover(recoveredDataMap);
- ApplicationAttemptId attemptId =
- getApplicationContext().getApplicationAttemptId();
- LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
+ LOG.info("Recovering data for FederationInterceptor for {}",
+ this.attemptId);
if (recoveredDataMap == null) {
return;
}
@@ -280,7 +281,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
.parseFrom(recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
this.amRegistrationRequest =
new RegisterApplicationMasterRequestPBImpl(pb);
- LOG.info("amRegistrationRequest recovered for {}", attemptId);
+ LOG.info("amRegistrationRequest recovered for {}", this.attemptId);
// Give the register request to homeRMRelayer for future re-registration
this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
@@ -291,7 +292,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
.parseFrom(recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
this.amRegistrationResponse =
new RegisterApplicationMasterResponsePBImpl(pb);
- LOG.info("amRegistrationResponse recovered for {}", attemptId);
+ LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
// Trigger re-register and full pending re-send only if we have a
// saved register response. This should always be true though.
this.justRecovered = true;
@@ -301,9 +302,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Map<String, Token<AMRMTokenIdentifier>> uamMap;
if (this.registryClient != null) {
uamMap = this.registryClient
- .loadStateFromRegistry(attemptId.getApplicationId());
+ .loadStateFromRegistry(this.attemptId.getApplicationId());
LOG.info("Found {} existing UAMs for application {} in Yarn Registry",
- uamMap.size(), attemptId.getApplicationId());
+ uamMap.size(), this.attemptId.getApplicationId());
} else {
uamMap = new HashMap<>();
for (Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
@@ -319,7 +320,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
LOG.info("Found {} existing UAMs for application {} in NMStateStore",
- uamMap.size(), attemptId.getApplicationId());
+ uamMap.size(), this.attemptId.getApplicationId());
}
// Re-attach the UAMs
@@ -336,7 +337,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
try {
this.uamPool.reAttachUAM(subClusterId.getId(), config,
- attemptId.getApplicationId(),
+ this.attemptId.getApplicationId(),
this.amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), this.homeSubClusterId.getId(),
entry.getValue());
@@ -359,9 +360,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
subClusterId);
} catch (Exception e) {
- LOG.error(
- "Error reattaching UAM to " + subClusterId + " for " + attemptId,
- e);
+ LOG.error("Error reattaching UAM to " + subClusterId + " for "
+ + this.attemptId, e);
}
}
@@ -374,8 +374,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
createHomeRMProxy(getApplicationContext(),
ApplicationClientProtocol.class, appSubmitter);
- GetContainersResponse response =
- rmClient.getContainers(GetContainersRequest.newInstance(attemptId));
+ GetContainersResponse response = rmClient
+ .getContainers(GetContainersRequest.newInstance(this.attemptId));
for (ContainerReport container : response.getContainerList()) {
containerIdToSubClusterIdMap.put(container.getContainerId(),
this.homeSubClusterId);
@@ -388,7 +388,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
LOG.info(
"In all {} UAMs {} running containers including AM recovered for {}",
- uamMap.size(), containers, attemptId);
+ uamMap.size(), containers, this.attemptId);
if (this.amRegistrationResponse != null) {
// Initialize the AMRMProxyPolicy
@@ -439,12 +439,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
RegisterApplicationMasterRequestPBImpl pb =
(RegisterApplicationMasterRequestPBImpl)
this.amRegistrationRequest;
- getNMStateStore().storeAMRMProxyAppContextEntry(
- getApplicationContext().getApplicationAttemptId(),
+ getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry for "
- + getApplicationContext().getApplicationAttemptId(), e);
+ + this.attemptId, e);
}
}
}
@@ -479,8 +478,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId);
}
- ApplicationId appId =
- getApplicationContext().getApplicationAttemptId().getApplicationId();
+ ApplicationId appId = this.attemptId.getApplicationId();
reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
if (getNMStateStore() != null) {
@@ -488,12 +486,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
RegisterApplicationMasterResponsePBImpl pb =
(RegisterApplicationMasterResponsePBImpl)
this.amRegistrationResponse;
- getNMStateStore().storeAMRMProxyAppContextEntry(
- getApplicationContext().getApplicationAttemptId(),
+ getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry for "
- + getApplicationContext().getApplicationAttemptId(), e);
+ + this.attemptId, e);
}
}
@@ -535,8 +532,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.lastHomeResponseId = request.getResponseId();
throw new ApplicationMasterNotRegisteredException(
- "AMRMProxy just restarted and recovered for "
- + getApplicationContext().getApplicationAttemptId()
+ "AMRMProxy just restarted and recovered for " + this.attemptId
+ ". AM should re-register and full re-send pending requests.");
}
@@ -553,8 +549,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (this.justRecovered
|| request.getResponseId() > this.lastHomeResponseId) {
LOG.warn("Setting allocate responseId for {} from {} to {}",
- getApplicationContext().getApplicationAttemptId(),
- request.getResponseId(), this.lastHomeResponseId);
+ this.attemptId, request.getResponseId(), this.lastHomeResponseId);
request.setResponseId(this.lastHomeResponseId);
}
@@ -573,8 +568,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// Send the request to the home RM and get the response
AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
- LOG.info("{} heartbeating to home RM with responseId {}",
- getApplicationContext().getApplicationAttemptId(),
+ LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId,
homeRequest.getResponseId());
AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
@@ -612,8 +606,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
LOG.info("{} heartbeat response from home RM with responseId {}",
- getApplicationContext().getApplicationAttemptId(),
- homeResponse.getResponseId());
+ this.attemptId, homeResponse.getResponseId());
// Update lastHomeResponseId in three cases:
// 1. The normal responseId increments
@@ -676,15 +669,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
secondaryRelayers.remove(subClusterId);
if (getNMStateStore() != null) {
- getNMStateStore().removeAMRMProxyAppContextEntry(
- getApplicationContext().getApplicationAttemptId(),
+ getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
NMSS_SECONDARY_SC_PREFIX + subClusterId);
}
}
} catch (Throwable e) {
LOG.warn("Failed to finish unmanaged application master: "
+ "RM address: " + subClusterId + " ApplicationId: "
- + getApplicationContext().getApplicationAttemptId(), e);
+ + attemptId, e);
}
return new FinishApplicationMasterResponseInfo(uamResponse,
subClusterId);
@@ -720,8 +712,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} catch (Throwable e) {
failedToUnRegister = true;
LOG.warn("Failed to finish unmanaged application master: "
- + " ApplicationId: "
- + getApplicationContext().getApplicationAttemptId(), e);
+ + " ApplicationId: " + this.attemptId, e);
}
}
}
@@ -733,8 +724,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// attempt will be launched.
this.uamPool.stop();
if (this.registryClient != null) {
- this.registryClient.removeAppFromRegistry(getApplicationContext()
- .getApplicationAttemptId().getApplicationId());
+ this.registryClient
+ .removeAppFromRegistry(this.attemptId.getApplicationId());
}
}
return homeResponse;
@@ -755,12 +746,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public void shutdown() {
// Do not stop uamPool service and kill UAMs here because of possible second
// app attempt
- if (threadpool != null) {
+ if (this.threadpool != null) {
try {
- threadpool.shutdown();
+ this.threadpool.shutdown();
} catch (Throwable ex) {
}
- threadpool = null;
+ this.threadpool = null;
}
super.shutdown();
}
@@ -1090,59 +1081,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
- new AsyncCallback<AllocateResponse>() {
- @Override
- public void callback(AllocateResponse response) {
- synchronized (asyncResponseSink) {
- List<AllocateResponse> responses = null;
- if (asyncResponseSink.containsKey(subClusterId)) {
- responses = asyncResponseSink.get(subClusterId);
- } else {
- responses = new ArrayList<>();
- asyncResponseSink.put(subClusterId, responses);
- }
- responses.add(response);
- }
-
- // Save the new AMRMToken for the UAM if present
- if (response.getAMRMToken() != null) {
- Token<AMRMTokenIdentifier> newToken = ConverterUtils
- .convertFromYarn(response.getAMRMToken(), (Text) null);
- // Update the token in registry or NMSS
- if (registryClient != null) {
- registryClient
- .writeAMRMTokenForUAM(
- getApplicationContext().getApplicationAttemptId()
- .getApplicationId(),
- subClusterId.getId(), newToken);
- } else if (getNMStateStore() != null) {
- try {
- getNMStateStore().storeAMRMProxyAppContextEntry(
- getApplicationContext().getApplicationAttemptId(),
- NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
- newToken.encodeToUrlString()
- .getBytes(STRING_TO_BYTE_FORMAT));
- } catch (IOException e) {
- LOG.error(
- "Error storing UAM token as AMRMProxy "
- + "context entry in NMSS for "
- + getApplicationContext().getApplicationAttemptId(),
- e);
- }
- }
- }
-
- // Notify policy of secondary sub-cluster responses
- try {
- policyInterpreter.notifyOfResponse(subClusterId, response);
- } catch (YarnException e) {
- LOG.warn(
- "notifyOfResponse for policy failed for home sub-cluster "
- + subClusterId,
- e);
- }
- }
- });
+ new HeartbeatCallBack(subClusterId));
}
return registrations;
@@ -1195,7 +1134,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
try {
// For appNameSuffix, use subClusterId of the home sub-cluster
token = uamPool.launchUAM(subClusterId, config,
- appContext.getApplicationAttemptId().getApplicationId(),
+ attemptId.getApplicationId(),
amRegistrationResponse.getQueue(), appContext.getUser(),
homeSubClusterId.toString(), true);
@@ -1206,8 +1145,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
registerRequest);
} catch (Throwable e) {
LOG.error("Failed to register application master: "
- + subClusterId + " Application: "
- + appContext.getApplicationAttemptId(), e);
+ + subClusterId + " Application: " + attemptId, e);
}
return new RegisterApplicationMasterResponseInfo(uamResponse,
SubClusterId.newInstance(subClusterId), token);
@@ -1232,20 +1170,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
} else {
LOG.info("Successfully registered unmanaged application master: "
+ uamResponse.getSubClusterId() + " ApplicationId: "
- + getApplicationContext().getApplicationAttemptId());
+ + this.attemptId);
successfulRegistrations.put(uamResponse.getSubClusterId(),
uamResponse.getResponse());
// Save the UAM token in registry or NMSS
if (registryClient != null) {
registryClient.writeAMRMTokenForUAM(
- getApplicationContext().getApplicationAttemptId()
- .getApplicationId(),
+ this.attemptId.getApplicationId(),
uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken());
} else if (getNMStateStore() != null) {
- getNMStateStore().storeAMRMProxyAppContextEntry(
- getApplicationContext().getApplicationAttemptId(),
+ getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_SECONDARY_SC_PREFIX
+ uamResponse.getSubClusterId().getId(),
uamResponse.getUamToken().encodeToUrlString()
@@ -1254,8 +1190,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
} catch (Exception e) {
LOG.warn("Failed to register unmanaged application master: "
- + " ApplicationId: "
- + getApplicationContext().getApplicationAttemptId(), e);
+ + " ApplicationId: " + this.attemptId, e);
}
}
}
@@ -1490,9 +1425,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
"Duplicate containerID found in the allocated containers. This"
+ " can happen if the RM epoch is not configured properly."
+ " ContainerId: " + container.getId().toString()
- + " ApplicationId: "
- + getApplicationContext().getApplicationAttemptId()
- + " From RM: " + subClusterId
+ + " ApplicationId: " + this.attemptId + " From RM: "
+ + subClusterId
+ " . Previous container was from sub-cluster: "
+ existingSubClusterId);
}
@@ -1588,6 +1522,59 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
+ * Async callback handler for heart beat response from all sub-clusters.
+ */
+ private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
+ private SubClusterId subClusterId;
+
+ HeartbeatCallBack(SubClusterId subClusterId) {
+ this.subClusterId = subClusterId;
+ }
+
+ @Override
+ public void callback(AllocateResponse response) {
+ synchronized (asyncResponseSink) {
+ List<AllocateResponse> responses = null;
+ if (asyncResponseSink.containsKey(subClusterId)) {
+ responses = asyncResponseSink.get(subClusterId);
+ } else {
+ responses = new ArrayList<>();
+ asyncResponseSink.put(subClusterId, responses);
+ }
+ responses.add(response);
+ }
+
+ // Save the new AMRMToken for the UAM if present
+ if (response.getAMRMToken() != null) {
+ Token<AMRMTokenIdentifier> newToken = ConverterUtils
+ .convertFromYarn(response.getAMRMToken(), (Text) null);
+ // Update the token in registry or NMSS
+ if (registryClient != null) {
+ registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
+ subClusterId.getId(), newToken);
+ } else if (getNMStateStore() != null) {
+ try {
+ getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
+ NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(),
+ newToken.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
+ } catch (IOException e) {
+ LOG.error("Error storing UAM token as AMRMProxy "
+ + "context entry in NMSS for " + attemptId, e);
+ }
+ }
+ }
+
+ // Notify policy of secondary sub-cluster responses
+ try {
+ policyInterpreter.notifyOfResponse(subClusterId, response);
+ } catch (YarnException e) {
+ LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+ + subClusterId, e);
+ }
+ }
+ }
+
+ /**
* Private structure for encapsulating SubClusterId and
* RegisterApplicationMasterResponse instances.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org