You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2018/09/24 18:37:39 UTC
[1/2] hadoop git commit: YARN-8696. [AMRMProxy] FederationInterceptor
upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.
Repository: hadoop
Updated Branches:
refs/heads/trunk 8de5c923b -> 309092280
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index 33617d4..78f6eb0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -26,18 +27,26 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Extends the FederationInterceptor and overrides methods to provide a testable
* implementation of FederationInterceptor.
*/
public class TestableFederationInterceptor extends FederationInterceptor {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestableFederationInterceptor.class);
+
private ConcurrentHashMap<String, MockResourceManagerFacade>
secondaryResourceManagers = new ConcurrentHashMap<>();
private AtomicInteger runningIndex = new AtomicInteger(0);
@@ -58,6 +67,12 @@ public class TestableFederationInterceptor extends FederationInterceptor {
return new TestableUnmanagedAMPoolManager(threadPool);
}
+ @Override
+ protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
+ Configuration conf, ApplicationId appId) {
+ return new TestableAMRequestHandlerThread(conf, appId);
+ }
+
@SuppressWarnings("unchecked")
@Override
protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
@@ -109,6 +124,71 @@ public class TestableFederationInterceptor extends FederationInterceptor {
return secondaryResourceManagers;
}
+ protected MockResourceManagerFacade getSecondaryRM(String scId) {
+ return secondaryResourceManagers.get(scId);
+ }
+
+ /**
+ * Drain all aysnc heartbeat threads, comes in two favors:
+ *
+ * 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to
+ * pick up all pending heartbeat requests. Not necessarily wait for all
+ * threads to finish processing the last request. This is used to make sure
+ * all new UAM are launched by the async threads, but at the same time will
+ * finish draining while (slow) RM is still processing the last heartbeat
+ * request.
+ *
+ * 2. waitForAsyncHBThreadFinish == true. Wait for all async thread to finish
+ * processing all heartbeat requests.
+ */
+ protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish)
+ throws YarnException {
+
+ LOG.info("waiting to drain home heartbeat handler");
+ if (waitForAsyncHBThreadFinish) {
+ getHomeHeartbeartHandler().drainHeartbeatThread();
+ } else {
+ while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ LOG.info("waiting to drain UAM heartbeat handlers");
+ UnmanagedAMPoolManager uamPool = getUnmanagedAMPool();
+ if (waitForAsyncHBThreadFinish) {
+ getUnmanagedAMPool().drainUAMHeartbeats();
+ } else {
+ while (true) {
+ boolean done = true;
+ for (String scId : uamPool.getAllUAMIds()) {
+ if (uamPool.getRequestQueueSize(scId) > 0) {
+ done = false;
+ break;
+ }
+ }
+ if (done) {
+ break;
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ protected UserGroupInformation getUGIWithToken(
+ ApplicationAttemptId appAttemptId) {
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
+ AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
+ ugi.addTokenIdentifier(token);
+ return ugi;
+ }
+
/**
* Extends the UnmanagedAMPoolManager and overrides methods to provide a
* testable implementation of UnmanagedAMPoolManager.
@@ -141,6 +221,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
super(conf, appId, queueName, submitter, appNameSuffix,
keepContainersAcrossApplicationAttempts, "TEST");
+ setHandlerThread(new TestableAMRequestHandlerThread(conf, appId));
}
/**
@@ -156,4 +237,30 @@ public class TestableFederationInterceptor extends FederationInterceptor {
YarnConfiguration.getClusterId(config));
}
}
+
+ /**
+ * Wrap the handler thread so it calls from the same user.
+ */
+ protected class TestableAMRequestHandlerThread
+ extends AMHeartbeatRequestHandler {
+ public TestableAMRequestHandlerThread(Configuration conf,
+ ApplicationId applicationId) {
+ super(conf, applicationId);
+ }
+
+ @Override
+ public void run() {
+ try {
+ getUGIWithToken(getAttemptId())
+ .doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() {
+ TestableAMRequestHandlerThread.super.run();
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 6fe0aa9..70b7498 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -84,7 +84,6 @@ import com.google.common.annotations.VisibleForTesting;
public class ApplicationMasterService extends AbstractService implements
ApplicationMasterProtocol {
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
- private static final int PRE_REGISTER_RESPONSE_ID = -1;
private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler;
@@ -377,11 +376,6 @@ public class ApplicationMasterService extends AbstractService implements
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
- private int getNextResponseId(int responseId) {
- // Loop between 0 to Integer.MAX_VALUE
- return (responseId + 1) & Integer.MAX_VALUE;
- }
-
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
@@ -415,8 +409,8 @@ public class ApplicationMasterService extends AbstractService implements
}
// Normally request.getResponseId() == lastResponse.getResponseId()
- if (getNextResponseId(request.getResponseId()) == lastResponse
- .getResponseId()) {
+ if (AMRMClientUtils.getNextResponseId(
+ request.getResponseId()) == lastResponse.getResponseId()) {
// heartbeat one step old, simply return lastReponse
return lastResponse;
} else if (request.getResponseId() != lastResponse.getResponseId()) {
@@ -461,7 +455,8 @@ public class ApplicationMasterService extends AbstractService implements
* need to worry about unregister call occurring in between (which
* removes the lock object).
*/
- response.setResponseId(getNextResponseId(lastResponse.getResponseId()));
+ response.setResponseId(
+ AMRMClientUtils.getNextResponseId(lastResponse.getResponseId()));
lock.setAllocateResponse(response);
return response;
}
@@ -472,7 +467,7 @@ public class ApplicationMasterService extends AbstractService implements
recordFactory.newRecordInstance(AllocateResponse.class);
// set response id to -1 before application master for the following
// attemptID get registered
- response.setResponseId(PRE_REGISTER_RESPONSE_ID);
+ response.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, new AllocateResponseLock(response));
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-8696. [AMRMProxy] FederationInterceptor
upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.
Posted by gi...@apache.org.
YARN-8696. [AMRMProxy] FederationInterceptor upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/30909228
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/30909228
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/30909228
Branch: refs/heads/trunk
Commit: 3090922805699b8374a359e92323884a4177dc4e
Parents: 8de5c92
Author: Giovanni Matteo Fumarola <gi...@apache.org>
Authored: Mon Sep 24 11:37:05 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@apache.org>
Committed: Mon Sep 24 11:37:05 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 5 +
.../yarn/conf/TestYarnConfigurationFields.java | 2 +
.../hadoop/yarn/client/AMRMClientUtils.java | 7 +
.../yarn/server/AMHeartbeatRequestHandler.java | 23 +-
.../utils/FederationRegistryClient.java | 10 +-
.../yarn/server/uam/UnmanagedAMPoolManager.java | 15 +
.../server/uam/UnmanagedApplicationManager.java | 14 +
.../yarn/server/MockResourceManagerFacade.java | 72 ++--
.../amrmproxy/FederationInterceptor.java | 376 +++++++++++--------
.../amrmproxy/TestFederationInterceptor.java | 298 +++++++++------
.../TestableFederationInterceptor.java | 107 ++++++
.../ApplicationMasterService.java | 15 +-
12 files changed, 634 insertions(+), 310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a82801d..d69ae57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3221,6 +3221,11 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.federation.resolver."
+ "DefaultSubClusterResolverImpl";
+ // the maximum wait time for the first async heartbeat response
+ public static final String FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS =
+ FEDERATION_PREFIX + "amrmproxy.hb.maximum.wait.ms";
+ public static final long DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS = 5000;
+
// AMRMProxy split-merge timeout for active sub-clusters. We will not route
// new asks to expired sub-clusters.
public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index d63933c..6f781fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -106,6 +106,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
+ configurationPropsToSkipCompare
.add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
// Federation StateStore ZK implementation configs to be ignored
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
index b8319cd..34a9b34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
@@ -52,6 +52,8 @@ public final class AMRMClientUtils {
private static final Logger LOG =
LoggerFactory.getLogger(AMRMClientUtils.class);
+ public static final int PRE_REGISTER_RESPONSE_ID = -1;
+
public static final String APP_ALREADY_REGISTERED_MESSAGE =
"Application Master is already registered : ";
@@ -152,6 +154,11 @@ public final class AMRMClientUtils {
}
}
+ public static int getNextResponseId(int responseId) {
+ // Loop between 0 to Integer.MAX_VALUE
+ return (responseId + 1) & Integer.MAX_VALUE;
+ }
+
public static void addToOutstandingSchedulingRequests(
Collection<SchedulingRequest> requests,
Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
index 42227bb..380c216 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java
@@ -47,6 +47,9 @@ public class AMHeartbeatRequestHandler extends Thread {
// Indication flag for the thread to keep running
private volatile boolean keepRunning;
+ // For unit test draining
+ private volatile boolean isThreadWaiting;
+
private Configuration conf;
private ApplicationId applicationId;
@@ -61,6 +64,7 @@ public class AMHeartbeatRequestHandler extends Thread {
this.setUncaughtExceptionHandler(
new HeartBeatThreadUncaughtExceptionHandler());
this.keepRunning = true;
+ this.isThreadWaiting = false;
this.conf = conf;
this.applicationId = applicationId;
@@ -82,12 +86,15 @@ public class AMHeartbeatRequestHandler extends Thread {
while (keepRunning) {
AsyncAllocateRequestInfo requestInfo;
try {
- requestInfo = requestQueue.take();
+ this.isThreadWaiting = true;
+ requestInfo = this.requestQueue.take();
+ this.isThreadWaiting = false;
+
if (requestInfo == null) {
throw new YarnException(
"Null requestInfo taken from request queue");
}
- if (!keepRunning) {
+ if (!this.keepRunning) {
break;
}
@@ -98,7 +105,7 @@ public class AMHeartbeatRequestHandler extends Thread {
throw new YarnException("Null allocateRequest from requestInfo");
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:"
+ LOG.debug("Sending Heartbeat to RM. AskList:"
+ ((request.getAskList() == null) ? " empty"
: request.getAskList().size()));
}
@@ -182,6 +189,16 @@ public class AMHeartbeatRequestHandler extends Thread {
}
@VisibleForTesting
+ public void drainHeartbeatThread() {
+ while (!this.isThreadWaiting || this.requestQueue.size() > 0) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @VisibleForTesting
public int getRequestQueueSize() {
return this.requestQueue.size();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
index 6624318..13545c9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
@@ -78,7 +78,7 @@ public class FederationRegistryClient {
*
* @return the list of known applications
*/
- public List<String> getAllApplications() {
+ public synchronized List<String> getAllApplications() {
// Suppress the exception here because it is valid that the entry does not
// exist
List<String> applications = null;
@@ -99,7 +99,7 @@ public class FederationRegistryClient {
* For testing, delete all application records in registry.
*/
@VisibleForTesting
- public void cleanAllApplications() {
+ public synchronized void cleanAllApplications() {
try {
removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null),
true, false);
@@ -115,7 +115,7 @@ public class FederationRegistryClient {
* @param token the UAM of the application
* @return whether the amrmToken is added or updated to a new value
*/
- public boolean writeAMRMTokenForUAM(ApplicationId appId,
+ public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
String subClusterId, Token<AMRMTokenIdentifier> token) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
@@ -154,7 +154,7 @@ public class FederationRegistryClient {
* @param appId application id
* @return the sub-cluster to UAM token mapping
*/
- public Map<String, Token<AMRMTokenIdentifier>>
+ public synchronized Map<String, Token<AMRMTokenIdentifier>>
loadStateFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>();
// Suppress the exception here because it is valid that the entry does not
@@ -203,7 +203,7 @@ public class FederationRegistryClient {
*
* @param appId application id
*/
- public void removeAppFromRegistry(ApplicationId appId) {
+ public synchronized void removeAppFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
LOG.info("Removing all registry entries for {}", appId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index d708ced..d5a0168 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -407,4 +407,19 @@ public class UnmanagedAMPoolManager extends AbstractService {
return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
}
+ @VisibleForTesting
+ public int getRequestQueueSize(String uamId) throws YarnException {
+ if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+ throw new YarnException("UAM " + uamId + " does not exist");
+ }
+ return this.unmanagedAppMasterMap.get(uamId).getRequestQueueSize();
+ }
+
+ @VisibleForTesting
+ public void drainUAMHeartbeats() {
+ for (UnmanagedApplicationManager uam : this.unmanagedAppMasterMap
+ .values()) {
+ uam.drainHeartbeatThread();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 7c1e154..91d5d6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -225,6 +225,10 @@ public class UnmanagedApplicationManager {
LOG.debug("RegisterUAM returned existing NM token for node "
+ nmToken.getNodeId());
}
+ LOG.info(
+ "RegisterUAM returned {} existing running container and {} NM tokens",
+ response.getContainersFromPreviousAttempts().size(),
+ response.getNMTokensFromPreviousAttempts().size());
// Only when register succeed that we start the heartbeat thread
this.heartbeatHandler.setDaemon(true);
@@ -516,4 +520,14 @@ public class UnmanagedApplicationManager {
public int getRequestQueueSize() {
return this.heartbeatHandler.getRequestQueueSize();
}
+
+ @VisibleForTesting
+ protected void setHandlerThread(AMHeartbeatRequestHandler thread) {
+ this.heartbeatHandler = thread;
+ }
+
+ @VisibleForTesting
+ protected void drainHeartbeatThread() {
+ this.heartbeatHandler.drainHeartbeatThread();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 958b1f1..50a4bff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -189,8 +189,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
private HashSet<ApplicationId> applicationMap = new HashSet<>();
private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>();
- private HashMap<ApplicationAttemptId,
- List<ContainerId>> applicationContainerIdMap = new HashMap<>();
+ private HashMap<ApplicationId, List<ContainerId>> applicationContainerIdMap =
+ new HashMap<>();
+ private int rmId;
private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf;
private int subClusterId;
@@ -203,6 +204,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
private boolean shouldReRegisterNext = false;
+ private boolean shouldWaitForSyncNextAllocate = false;
+
// For unit test synchronization
private static Object syncObj = new Object();
@@ -218,6 +221,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public MockResourceManagerFacade(Configuration conf, int startContainerIndex,
int subClusterId, boolean isRunning) {
this.conf = conf;
+ this.rmId = startContainerIndex;
this.containerIndex.set(startContainerIndex);
this.subClusterId = subClusterId;
this.isRunning = isRunning;
@@ -259,17 +263,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning();
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Registering application attempt: " + attemptId);
+ ApplicationId appId = attemptId.getApplicationId();
List<Container> containersFromPreviousAttempt = null;
synchronized (applicationContainerIdMap) {
- if (applicationContainerIdMap.containsKey(attemptId)) {
- if (keepContainerOnUams.contains(attemptId.getApplicationId())) {
+ if (applicationContainerIdMap.containsKey(appId)) {
+ if (keepContainerOnUams.contains(appId)) {
// For UAM with the keepContainersFromPreviousAttempt flag, return all
// running containers
containersFromPreviousAttempt = new ArrayList<>();
- for (ContainerId containerId : applicationContainerIdMap
- .get(attemptId)) {
+ for (ContainerId containerId : applicationContainerIdMap.get(appId)) {
containersFromPreviousAttempt.add(Container.newInstance(containerId,
null, null, null, null, null));
}
@@ -279,7 +283,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
}
} else {
// Keep track of the containers that are returned to this application
- applicationContainerIdMap.put(attemptId, new ArrayList<ContainerId>());
+ applicationContainerIdMap.put(appId, new ArrayList<ContainerId>());
}
}
@@ -314,6 +318,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Finishing application attempt: " + attemptId);
+ ApplicationId appId = attemptId.getApplicationId();
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@@ -324,8 +329,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
synchronized (applicationContainerIdMap) {
// Remove the containers that were being tracked for this application
Assert.assertTrue("The application id is NOT registered: " + attemptId,
- applicationContainerIdMap.containsKey(attemptId));
- applicationContainerIdMap.remove(attemptId);
+ applicationContainerIdMap.containsKey(appId));
+ applicationContainerIdMap.remove(appId);
}
return FinishApplicationMasterResponse.newInstance(
@@ -350,6 +355,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Allocate from application attempt: " + attemptId);
+ ApplicationId appId = attemptId.getApplicationId();
if (shouldReRegisterNext) {
String message = "AM is not registered, should re-register.";
@@ -357,6 +363,21 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
throw new ApplicationMasterNotRegisteredException(message);
}
+ // Wait for signal for certain test cases
+ synchronized (syncObj) {
+ if (shouldWaitForSyncNextAllocate) {
+ shouldWaitForSyncNextAllocate = false;
+
+ LOG.info("Allocate call in RM start waiting");
+ try {
+ syncObj.wait();
+ LOG.info("Allocate call in RM wait finished");
+ } catch (InterruptedException e) {
+ LOG.info("Allocate call in RM wait interrupted", e);
+ }
+ }
+ }
+
ArrayList<Container> containerList = new ArrayList<Container>();
if (request.getAskList() != null) {
for (ResourceRequest rr : request.getAskList()) {
@@ -381,9 +402,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
// will need it in future
Assert.assertTrue(
"The application id is Not registered before allocate(): "
- + attemptId,
- applicationContainerIdMap.containsKey(attemptId));
- List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
+ + appId,
+ applicationContainerIdMap.containsKey(appId));
+ List<ContainerId> ids = applicationContainerIdMap.get(appId);
ids.add(containerId);
}
}
@@ -395,12 +416,10 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
&& request.getReleaseList().size() > 0) {
LOG.info("Releasing containers: " + request.getReleaseList().size());
synchronized (applicationContainerIdMap) {
- Assert
- .assertTrue(
- "The application id is not registered before allocate(): "
- + attemptId,
- applicationContainerIdMap.containsKey(attemptId));
- List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
+ Assert.assertTrue(
+ "The application id is not registered before allocate(): " + appId,
+ applicationContainerIdMap.containsKey(appId));
+ List<ContainerId> ids = applicationContainerIdMap.get(appId);
for (ContainerId id : request.getReleaseList()) {
boolean found = false;
@@ -426,7 +445,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
+ " for application attempt: " + conf.get("AMRMTOKEN"));
// Always issue a new AMRMToken as if RM rolled master key
- Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
+ Token newAMRMToken = Token.newInstance(new byte[0],
+ Integer.toString(this.rmId), new byte[0], "");
return AllocateResponse.newInstance(0, completedList, containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
@@ -434,6 +454,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
new ArrayList<UpdatedContainer>());
}
+ public void setWaitForSyncNextAllocate(boolean wait) {
+ synchronized (syncObj) {
+ shouldWaitForSyncNextAllocate = wait;
+ }
+ }
+
@Override
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
@@ -624,14 +650,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
validateRunning();
- ApplicationAttemptId attemptId = request.getApplicationAttemptId();
+ ApplicationId appId = request.getApplicationAttemptId().getApplicationId();
List<ContainerReport> containers = new ArrayList<>();
synchronized (applicationContainerIdMap) {
// Return the list of running containers that were being tracked for this
// application
- Assert.assertTrue("The application id is NOT registered: " + attemptId,
- applicationContainerIdMap.containsKey(attemptId));
- List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
+ Assert.assertTrue("The application id is NOT registered: " + appId,
+ applicationContainerIdMap.containsKey(appId));
+ List<ContainerId> ids = applicationContainerIdMap.get(appId);
for (ContainerId c : ids) {
containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0,
null, null, 0, null, null));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 1bf882f..c02296d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -62,14 +64,18 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
@@ -80,9 +86,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
-import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,6 +122,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
NMSS_CLASS_PREFIX + "secondarySC/";
public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
+ private static final RecordFactory RECORD_FACTORY =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ /**
+ * From AM's perspective, FederationInterceptor behaves exactly the same as
+ * YarnRM (ApplicationMasterService). This is to remember the last heart beat
+ * response, used to handle duplicate heart beat and responseId from AM.
+ */
+ private AllocateResponse lastAllocateResponse;
+ private final Object lastAllocateResponseLock = new Object();
+
private ApplicationAttemptId attemptId;
/**
@@ -124,7 +141,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private AMRMClientRelayer homeRMRelayer;
private SubClusterId homeSubClusterId;
- private volatile int lastHomeResponseId;
+ private AMHeartbeatRequestHandler homeHeartbeartHandler;
/**
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@@ -146,7 +163,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/**
* Stores the AllocateResponses that are received asynchronously from all the
- * sub-cluster resource managers except the home RM.
+ * sub-cluster resource managers, including home RM.
*/
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
@@ -194,14 +211,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/** The policy used to split requests among sub-clusters. */
private FederationAMRMProxyPolicy policyInterpreter;
- /**
- * The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken
- * issued by home RM.
- */
- private UserGroupInformation appOwner;
-
private FederationRegistryClient registryClient;
+ // the maximum wait time for the first async heart beat response
+ private long heartbeatMaxWaitTimeMs;
+
+ private MonotonicClock clock = new MonotonicClock();
+
/**
* Creates an instance of the FederationInterceptor class.
*/
@@ -213,7 +229,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.secondaryRelayers = new ConcurrentHashMap<>();
this.amRegistrationRequest = null;
this.amRegistrationResponse = null;
- this.lastHomeResponseId = Integer.MAX_VALUE;
this.justRecovered = false;
}
@@ -233,8 +248,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
setConf(conf);
}
+ // The proxy ugi used to talk to home RM as well as Yarn Registry, loaded
+ // with the up-to-date AMRMToken issued by home RM.
+ UserGroupInformation appOwner;
try {
- this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
+ appOwner = UserGroupInformation.createProxyUser(appContext.getUser(),
UserGroupInformation.getCurrentUser());
} catch (Exception ex) {
throw new YarnRuntimeException(ex);
@@ -242,10 +260,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (appContext.getRegistryClient() != null) {
this.registryClient = new FederationRegistryClient(conf,
- appContext.getRegistryClient(), this.appOwner);
+ appContext.getRegistryClient(), appOwner);
// Add all app tokens for Yarn Registry access
if (appContext.getCredentials() != null) {
- this.appOwner.addCredentials(appContext.getCredentials());
+ appOwner.addCredentials(appContext.getCredentials());
}
}
@@ -254,9 +272,21 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
- ApplicationMasterProtocol.class, this.appOwner), appId,
+ ApplicationMasterProtocol.class, appOwner), appId,
this.homeSubClusterId.toString());
+ this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId);
+ this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer);
+ this.homeHeartbeartHandler.setUGI(appOwner);
+ this.homeHeartbeartHandler.setDaemon(true);
+ this.homeHeartbeartHandler.start();
+
+ // set lastResponseId to -1 before application master registers
+ this.lastAllocateResponse =
+ RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
+ this.lastAllocateResponse
+ .setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
+
this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -265,6 +295,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.uamPool.init(conf);
this.uamPool.start();
+
+ this.heartbeatMaxWaitTimeMs =
+ conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
}
@Override
@@ -272,6 +306,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
super.recover(recoveredDataMap);
LOG.info("Recovering data for FederationInterceptor for {}",
this.attemptId);
+ this.justRecovered = true;
+
if (recoveredDataMap == null) {
return;
}
@@ -294,9 +330,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.amRegistrationResponse =
new RegisterApplicationMasterResponsePBImpl(pb);
LOG.info("amRegistrationResponse recovered for {}", this.attemptId);
- // Trigger re-register and full pending re-send only if we have a
- // saved register response. This should always be true though.
- this.justRecovered = true;
}
// Recover UAM amrmTokens from registry or NMSS
@@ -355,6 +388,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
.getContainersFromPreviousAttempts()) {
containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
containers++;
+ LOG.debug(" From subcluster " + subClusterId
+ + " running container " + container.getId());
}
LOG.info("Recovered {} running containers from UAM in {}",
response.getContainersFromPreviousAttempts().size(),
@@ -384,7 +419,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
LOG.debug(" From home RM " + this.homeSubClusterId
+ " running container " + container.getContainerId());
}
- LOG.info("{} running containers including AM recovered from home RM ",
+ LOG.info("{} running containers including AM recovered from home RM {}",
response.getContainerList().size(), this.homeSubClusterId);
LOG.info(
@@ -411,8 +446,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* so that when AM registers more than once, it returns the same register
* success response instead of throwing
* {@link InvalidApplicationMasterRequestException}. Furthermore, we present
- * to AM as if we are the RM that never fails over. When actual RM fails over,
- * we always re-register automatically.
+ * to AM as if we are the RM that never fails over (except when AMRMProxy
+ * restarts). When actual RM fails over, we always re-register automatically.
*
* We did this because FederationInterceptor can receive concurrent register
* requests from AM because of timeout between AM and AMRMProxy, which is
@@ -425,6 +460,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public synchronized RegisterApplicationMasterResponse
registerApplicationMaster(RegisterApplicationMasterRequest request)
throws YarnException, IOException {
+
+ // Reset the heartbeat responseId to zero upon register
+ synchronized (this.lastAllocateResponseLock) {
+ this.lastAllocateResponse.setResponseId(0);
+ }
+ this.justRecovered = false;
+
// If AM is calling with a different request, complain
if (this.amRegistrationRequest != null) {
if (!this.amRegistrationRequest.equals(request)) {
@@ -524,34 +566,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
@Override
public AllocateResponse allocate(AllocateRequest request)
- throws YarnException {
+ throws YarnException, IOException {
Preconditions.checkArgument(this.policyInterpreter != null,
"Allocate should be called after registerApplicationMaster");
- if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) {
- // Save the responseId home RM is expecting
- this.lastHomeResponseId = request.getResponseId();
-
+ if (this.justRecovered) {
throw new ApplicationMasterNotRegisteredException(
"AMRMProxy just restarted and recovered for " + this.attemptId
+ ". AM should re-register and full re-send pending requests.");
}
- // Override responseId in the request in two cases:
- //
- // 1. After we just recovered after an NM restart and AM's responseId is
- // reset due to the exception we generate. We need to override the
- // responseId to the one homeRM expects.
- //
- // 2. After homeRM fail-over, the allocate response with reseted responseId
- // might not be returned successfully back to AM because of RPC connection
- // timeout between AM and AMRMProxy. In this case, we remember and reset the
- // responseId for AM.
- if (this.justRecovered
- || request.getResponseId() > this.lastHomeResponseId) {
- LOG.warn("Setting allocate responseId for {} from {} to {}",
- this.attemptId, request.getResponseId(), this.lastHomeResponseId);
- request.setResponseId(this.lastHomeResponseId);
+ // Check responseId and handle duplicate heartbeat exactly same as RM
+ synchronized (this.lastAllocateResponseLock) {
+ LOG.info("Heartbeat from " + this.attemptId + " with responseId "
+ + request.getResponseId() + " when we are expecting "
+ + this.lastAllocateResponse.getResponseId());
+ // Normally request.getResponseId() == lastResponse.getResponseId()
+ if (AMRMClientUtils.getNextResponseId(
+ request.getResponseId()) == this.lastAllocateResponse
+ .getResponseId()) {
+ // heartbeat one step old, simply return lastReponse
+ return this.lastAllocateResponse;
+ } else if (request.getResponseId() != this.lastAllocateResponse
+ .getResponseId()) {
+ throw new InvalidApplicationMasterRequestException(
+ AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(attemptId,
+ this.lastAllocateResponse.getResponseId(),
+ request.getResponseId()));
+ }
}
try {
@@ -560,71 +602,55 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Map<SubClusterId, AllocateRequest> requests =
splitAllocateRequest(request);
- // Send the requests to the secondary sub-cluster resource managers.
- // These secondary requests are send asynchronously and the responses will
- // be collected and merged with the home response. In addition, it also
- // return the newly registered Unmanaged AMs.
- Registrations newRegistrations =
- sendRequestsToSecondaryResourceManagers(requests);
-
- // Send the request to the home RM and get the response
- AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
- LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId,
- homeRequest.getResponseId());
-
- AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
-
- // Reset the flag after the first successful homeRM allocate response,
- // otherwise keep overriding the responseId of new allocate request
- if (this.justRecovered) {
- this.justRecovered = false;
+ /**
+ * Send the requests to the all sub-cluster resource managers. All
+ * requests are synchronously triggered but sent asynchronously. Later the
+ * responses will be collected and merged. In addition, it also returns
+ * the newly registered UAMs.
+ */
+ Registrations newRegistrations = sendRequestsToResourceManagers(requests);
+
+ // Wait for the first async response to arrive
+ long startTime = this.clock.getTime();
+ synchronized (this.asyncResponseSink) {
+ try {
+ this.asyncResponseSink.wait(this.heartbeatMaxWaitTimeMs);
+ } catch (InterruptedException e) {
+ }
}
+ long firstResponseTime = this.clock.getTime() - startTime;
- // Notify policy of home response
+ // An extra brief wait for other async heart beats, so that most of their
+ // responses can make it back to AM in the same heart beat round trip.
try {
- this.policyInterpreter.notifyOfResponse(this.homeSubClusterId,
- homeResponse);
- } catch (YarnException e) {
- LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
- + this.homeSubClusterId, e);
+ Thread.sleep(firstResponseTime);
+ } catch (InterruptedException e) {
}
- // If the resource manager sent us a new token, add to the current user
- if (homeResponse.getAMRMToken() != null) {
- LOG.debug("Received new AMRMToken");
- YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(),
- this.appOwner, getConf());
- }
+ // Prepare the response to AM
+ AllocateResponse response =
+ RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
- // Merge the responses from home and secondary sub-cluster RMs
- homeResponse = mergeAllocateResponses(homeResponse);
+ // Merge all responses from response sink
+ mergeAllocateResponses(response);
// Merge the containers and NMTokens from the new registrations into
- // the homeResponse.
+ // the response
if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
- homeResponse = mergeRegistrationResponses(homeResponse,
+ mergeRegistrationResponses(response,
newRegistrations.getSuccessfulRegistrations());
}
- LOG.info("{} heartbeat response from home RM with responseId {}",
- this.attemptId, homeResponse.getResponseId());
-
- // Update lastHomeResponseId in three cases:
- // 1. The normal responseId increments
- // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
- // over, AMRMClientRelayer auto re-register and full re-send for homeRM.
- // 3. lastHomeResponseId == MAX_INT. This is the initial case or
- // responseId about to overflow and wrap around
- if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
- || homeResponse.getResponseId() == 1
- || this.lastHomeResponseId == Integer.MAX_VALUE) {
- this.lastHomeResponseId = homeResponse.getResponseId();
+ // update the responseId and return the final response to AM
+ synchronized (this.lastAllocateResponseLock) {
+ response.setResponseId(AMRMClientUtils
+ .getNextResponseId(this.lastAllocateResponse.getResponseId()));
+ this.lastAllocateResponse = response;
}
-
- // return the final response to the application master.
- return homeResponse;
- } catch (IOException ex) {
- LOG.error("Exception encountered while processing heart beat", ex);
+ return response;
+ } catch (Throwable ex) {
+ LOG.error("Exception encountered while processing heart beat for "
+ + this.attemptId, ex);
throw new YarnException(ex);
}
}
@@ -696,6 +722,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
FinishApplicationMasterResponse homeResponse =
this.homeRMRelayer.finishApplicationMaster(request);
+ // Stop the home heartbeat thread
+ this.homeHeartbeartHandler.shutdown();
+
if (subClusterIds.size() > 0) {
// Wait for other sub-cluster resource managers to return the
// response and merge it with the home response
@@ -758,10 +787,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
this.threadpool = null;
}
- homeRMRelayer.shutdown();
- for(AMRMClientRelayer relayer : secondaryRelayers.values()){
+
+ // Stop the home heartbeat thread
+ this.homeHeartbeartHandler.shutdown();
+ this.homeRMRelayer.shutdown();
+ for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) {
relayer.shutdown();
}
+
super.shutdown();
}
@@ -781,8 +814,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
@VisibleForTesting
- protected int getLastHomeResponseId() {
- return this.lastHomeResponseId;
+ protected ApplicationAttemptId getAttemptId() {
+ return this.attemptId;
+ }
+
+ @VisibleForTesting
+ protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() {
+ return this.homeHeartbeartHandler;
}
/**
@@ -798,6 +836,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return new UnmanagedAMPoolManager(threadPool);
}
+ @VisibleForTesting
+ protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
+ Configuration conf, ApplicationId appId) {
+ return new AMHeartbeatRequestHandler(conf, appId);
+ }
+
/**
* Create a proxy instance that is used to connect to the Home resource
* manager.
@@ -872,7 +916,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ "Reattaching in parallel", uamMap.size(), appId);
ExecutorCompletionService<RegisterApplicationMasterResponse>
- completionService = new ExecutorCompletionService<>(threadpool);
+ completionService = new ExecutorCompletionService<>(this.threadpool);
for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
final SubClusterId subClusterId =
@@ -1047,16 +1091,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/**
* This methods sends the specified AllocateRequests to the appropriate
- * sub-cluster resource managers.
+ * sub-cluster resource managers asynchronously.
*
* @param requests contains the heart beat requests to send to the resource
- * manager keyed by the resource manager address
+ * manager keyed by the sub-cluster id
* @return the registration responses from the newly added sub-cluster
* resource managers
* @throws YarnException
* @throws IOException
*/
- private Registrations sendRequestsToSecondaryResourceManagers(
+ private Registrations sendRequestsToResourceManagers(
Map<SubClusterId, AllocateRequest> requests)
throws YarnException, IOException {
@@ -1065,32 +1109,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Registrations registrations = registerWithNewSubClusters(requests.keySet());
// Now that all the registrations are done, send the allocation request
- // to the sub-cluster RMs using the Unmanaged application masters
- // asynchronously and don't wait for the response. The responses will
- // arrive asynchronously and will be added to the response sink. These
- // responses will be sent to the application master in some future heart
- // beat response.
+ // to the sub-cluster RMs asynchronously and don't wait for the response.
+ // The responses will arrive asynchronously and will be added to the
+ // response sink, then merged and sent to the application master.
for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
- final SubClusterId subClusterId = entry.getKey();
+ SubClusterId subClusterId = entry.getKey();
if (subClusterId.equals(this.homeSubClusterId)) {
- // Skip the request for the home sub-cluster resource manager.
- // It will be handled separately in the allocate() method
- continue;
- }
-
- if (!this.uamPool.hasUAMId(subClusterId.getId())) {
- // TODO: This means that the registration for this sub-cluster RM
- // failed. For now, we ignore the resource requests and continue
- // but we need to fix this and handle this situation. One way would
- // be to send the request to another RM by consulting the policy.
- LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
- subClusterId);
- continue;
+ // Request for the home sub-cluster resource manager
+ this.homeHeartbeartHandler.allocateAsync(entry.getValue(),
+ new HeartbeatCallBack(this.homeSubClusterId, false));
+ } else {
+ if (!this.uamPool.hasUAMId(subClusterId.getId())) {
+ // TODO: This means that the registration for this sub-cluster RM
+ // failed. For now, we ignore the resource requests and continue
+ // but we need to fix this and handle this situation. One way would
+ // be to send the request to another RM by consulting the policy.
+ LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
+ subClusterId);
+ continue;
+ }
+ this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
+ new HeartbeatCallBack(subClusterId, true));
}
-
- this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
- new HeartbeatCallBack(subClusterId));
}
return registrations;
@@ -1123,7 +1164,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.amRegistrationRequest;
final AMRMProxyApplicationContext appContext = getApplicationContext();
ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
- completionService = new ExecutorCompletionService<>(threadpool);
+ completionService = new ExecutorCompletionService<>(this.threadpool);
for (final String subClusterId : newSubClusters) {
completionService
@@ -1208,21 +1249,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
- * Merges the responses from other sub-clusters that we received
- * asynchronously with the specified home cluster response and keeps track of
- * the containers received from each sub-cluster resource managers.
+ * Merge the responses from all sub-clusters that we received asynchronously
+ * and keeps track of the containers received from each sub-cluster resource
+ * managers.
*/
- private AllocateResponse mergeAllocateResponses(
- AllocateResponse homeResponse) {
- // Timing issue, we need to remove the completed and then save the new ones.
- removeFinishedContainersFromCache(
- homeResponse.getCompletedContainersStatuses());
- cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
- this.homeSubClusterId);
-
+ private void mergeAllocateResponses(AllocateResponse mergedResponse) {
synchronized (this.asyncResponseSink) {
- for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink
- .entrySet()) {
+ for (Entry<SubClusterId, List<AllocateResponse>> entry :
+ this.asyncResponseSink.entrySet()) {
SubClusterId subClusterId = entry.getKey();
List<AllocateResponse> responses = entry.getValue();
if (responses.size() > 0) {
@@ -1231,14 +1265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
response.getCompletedContainersStatuses());
cacheAllocatedContainers(response.getAllocatedContainers(),
subClusterId);
- mergeAllocateResponse(homeResponse, response, subClusterId);
+ mergeAllocateResponse(mergedResponse, response, subClusterId);
}
responses.clear();
}
}
}
-
- return homeResponse;
}
/**
@@ -1256,11 +1288,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
- * Helper method for merging the responses from the secondary sub cluster RMs
- * with the home response to return to the AM.
+ * Helper method for merging the registration responses from the secondary sub
+ * cluster RMs into the allocate response to return to the AM.
*/
- private AllocateResponse mergeRegistrationResponses(
- AllocateResponse homeResponse,
+ private void mergeRegistrationResponses(AllocateResponse homeResponse,
Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
@@ -1292,13 +1323,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
}
-
- return homeResponse;
}
private void mergeAllocateResponse(AllocateResponse homeResponse,
AllocateResponse otherResponse, SubClusterId otherRMAddress) {
+ if (otherResponse.getAMRMToken() != null) {
+ // Propagate only the new amrmToken from home sub-cluster back to
+ // AMRMProxyService
+ if (otherRMAddress.equals(this.homeSubClusterId)) {
+ homeResponse.setAMRMToken(otherResponse.getAMRMToken());
+ } else {
+ throw new YarnRuntimeException(
+ "amrmToken from UAM " + otherRMAddress + " should be null here");
+ }
+ }
+
if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
homeResponse.getAllocatedContainers()
@@ -1406,9 +1446,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
SubClusterId subClusterId) {
for (Container container : containers) {
LOG.debug("Adding container {}", container);
- if (containerIdToSubClusterIdMap.containsKey(container.getId())) {
+
+ if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) {
SubClusterId existingSubClusterId =
- containerIdToSubClusterIdMap.get(container.getId());
+ this.containerIdToSubClusterIdMap.get(container.getId());
if (existingSubClusterId.equals(subClusterId)) {
/*
* When RM fails over, the new RM master might send out the same
@@ -1441,7 +1482,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
- containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
+ this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
}
}
@@ -1463,7 +1504,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
newRequest.setProgress(originalAMRequest.getProgress());
requestMap.put(subClusterId, newRequest);
}
-
return newRequest;
}
@@ -1472,7 +1512,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private static AllocateRequest createAllocateRequest() {
AllocateRequest request =
- AllocateRequest.newInstance(0, 0, null, null, null);
+ RECORD_FACTORY.newRecordInstance(AllocateRequest.class);
request.setAskList(new ArrayList<ResourceRequest>());
request.setReleaseList(new ArrayList<ContainerId>());
ResourceBlacklistRequest blackList =
@@ -1526,6 +1566,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
@VisibleForTesting
+ protected UnmanagedAMPoolManager getUnmanagedAMPool() {
+ return this.uamPool;
+ }
+
+ @VisibleForTesting
public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
return this.asyncResponseSink;
}
@@ -1535,9 +1580,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
private SubClusterId subClusterId;
+ private boolean isUAM;
- HeartbeatCallBack(SubClusterId subClusterId) {
+ HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) {
this.subClusterId = subClusterId;
+ this.isUAM = isUAM;
}
@Override
@@ -1551,16 +1598,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
asyncResponseSink.put(subClusterId, responses);
}
responses.add(response);
+ // Notify main thread about the response arrival
+ asyncResponseSink.notifyAll();
}
// Save the new AMRMToken for the UAM if present
- if (response.getAMRMToken() != null) {
+ if (this.isUAM && response.getAMRMToken() != null) {
Token<AMRMTokenIdentifier> newToken = ConverterUtils
.convertFromYarn(response.getAMRMToken(), (Text) null);
+ // Do not further propagate the new amrmToken for UAM
+ response.setAMRMToken(null);
+
// Update the token in registry or NMSS
if (registryClient != null) {
- registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
- subClusterId.getId(), newToken);
+ if (registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
+ subClusterId.getId(), newToken)) {
+ try {
+ AMRMTokenIdentifier identifier = new AMRMTokenIdentifier();
+ identifier.readFields(new DataInputStream(
+ new ByteArrayInputStream(newToken.getIdentifier())));
+ LOG.info(
+ "Received new UAM amrmToken with keyId {} and "
+ + "service {} from {} for {}, written to Registry",
+ identifier.getKeyId(), newToken.getService(), subClusterId,
+ attemptId);
+ } catch (IOException e) {
+ }
+ }
} else if (getNMStateStore() != null) {
try {
getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
@@ -1573,11 +1637,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
- // Notify policy of secondary sub-cluster responses
+ // Notify policy of allocate response
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) {
- LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+ LOG.warn("notifyOfResponse for policy failed for sub-cluster "
+ subClusterId, e);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/30909228/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index a837eed..407ae83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Executors;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -95,6 +97,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
private int testAppId;
private ApplicationAttemptId attemptId;
+ private volatile int lastResponseId;
+
@Override
public void setUp() throws IOException {
super.setUp();
@@ -120,6 +124,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(),
attemptId, "test-user", null, null, null, registry));
interceptor.cleanupRegistry();
+
+ lastResponseId = 0;
}
@Override
@@ -174,8 +180,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
private List<Container> getContainersAndAssert(int numberOfResourceRequests,
int numberOfAllocationExcepted) throws Exception {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
- allocateRequest.setResponseId(1);
-
List<Container> containers =
new ArrayList<Container>(numberOfResourceRequests);
List<ResourceRequest> askList =
@@ -187,22 +191,31 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
allocateRequest.setAskList(askList);
+ allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull("allocate() returned null response", allocateResponse);
+ checkAMRMToken(allocateResponse.getAMRMToken());
+ lastResponseId = allocateResponse.getResponseId();
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in the original request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
+ // Make sure this request is picked up by all async heartbeat handlers
+ interceptor.drainAllAsyncQueue(false);
+
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
while (containers.size() < numberOfAllocationExcepted
&& numHeartbeat++ < 10) {
- allocateResponse =
- interceptor.allocate(Records.newRecord(AllocateRequest.class));
+ allocateRequest = Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(lastResponseId);
+ allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull("allocate() returned null response",
allocateResponse);
+ checkAMRMToken(allocateResponse.getAMRMToken());
+ lastResponseId = allocateResponse.getResponseId();
containers.addAll(allocateResponse.getAllocatedContainers());
@@ -220,8 +233,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
throws Exception {
Assert.assertTrue(containers.size() > 0);
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
- allocateRequest.setResponseId(1);
-
List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
for (Container container : containers) {
relList.add(container.getId());
@@ -229,8 +240,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
allocateRequest.setReleaseList(relList);
+ allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull(allocateResponse);
+ checkAMRMToken(allocateResponse.getAMRMToken());
+ lastResponseId = allocateResponse.getResponseId();
// The release request will be split and handled by the corresponding UAM.
// The release containers returned by the mock resource managers will be
@@ -244,14 +258,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
LOG.info("Number of containers received in the original request: "
+ Integer.toString(newlyFinished.size()));
+ // Make sure this request is picked up by all async heartbeat handlers
+ interceptor.drainAllAsyncQueue(false);
+
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
int numHeartbeat = 0;
while (containersForReleasedContainerIds.size() < relList.size()
&& numHeartbeat++ < 10) {
- allocateResponse =
- interceptor.allocate(Records.newRecord(AllocateRequest.class));
+ allocateRequest = Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(lastResponseId);
+ allocateResponse = interceptor.allocate(allocateRequest);
Assert.assertNotNull(allocateResponse);
+ checkAMRMToken(allocateResponse.getAMRMToken());
+ lastResponseId = allocateResponse.getResponseId();
+
newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
@@ -267,65 +288,81 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
containersForReleasedContainerIds.size());
}
+ private void checkAMRMToken(Token amrmToken) {
+ if (amrmToken != null) {
+ // The token should be the one issued by home MockRM
+ Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0)));
+ }
+ }
+
@Test
public void testMultipleSubClusters() throws Exception {
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ // Register the application
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(0);
+ registerReq.setTrackingUrl("");
- // Register the application
- RegisterApplicationMasterRequest registerReq =
- Records.newRecord(RegisterApplicationMasterRequest.class);
- registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(0);
- registerReq.setTrackingUrl("");
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
- RegisterApplicationMasterResponse registerResponse =
- interceptor.registerApplicationMaster(registerReq);
- Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers, with sc1 and sc2 active
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+ registerSubCluster(SubClusterId.newInstance("SC-2"));
+
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+ Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
- Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+ // Allocate the second batch of containers, with sc1 and sc3 active
+ deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
+ registerSubCluster(SubClusterId.newInstance("SC-3"));
- // Allocate the first batch of containers, with sc1 and sc2 active
- registerSubCluster(SubClusterId.newInstance("SC-1"));
- registerSubCluster(SubClusterId.newInstance("SC-2"));
+ numberOfContainers = 1;
+ containers.addAll(
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
+ Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
- int numberOfContainers = 3;
- List<Container> containers =
- getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
- Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
-
- // Allocate the second batch of containers, with sc1 and sc3 active
- deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
- registerSubCluster(SubClusterId.newInstance("SC-3"));
-
- numberOfContainers = 1;
- containers.addAll(
- getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
- Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
-
- // Allocate the third batch of containers with only in home sub-cluster
- // active
- deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
- deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
- registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
-
- numberOfContainers = 2;
- containers.addAll(
- getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
- Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
-
- // Release all containers
- releaseContainersAndAssert(containers);
-
- // Finish the application
- FinishApplicationMasterRequest finishReq =
- Records.newRecord(FinishApplicationMasterRequest.class);
- finishReq.setDiagnostics("");
- finishReq.setTrackingUrl("");
- finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
-
- FinishApplicationMasterResponse finshResponse =
- interceptor.finishApplicationMaster(finishReq);
- Assert.assertNotNull(finshResponse);
- Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ // Allocate the third batch of containers with only in home sub-cluster
+ // active
+ deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
+ deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
+ registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+
+ numberOfContainers = 2;
+ containers.addAll(
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
+ Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
+
+ // Release all containers
+ releaseContainersAndAssert(containers);
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finshResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+ return null;
+ }
+ });
}
/*
@@ -333,49 +370,58 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
*/
@Test
public void testReregister() throws Exception {
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
- // Register the application
- RegisterApplicationMasterRequest registerReq =
- Records.newRecord(RegisterApplicationMasterRequest.class);
- registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(0);
- registerReq.setTrackingUrl("");
+ // Register the application
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(0);
+ registerReq.setTrackingUrl("");
- RegisterApplicationMasterResponse registerResponse =
- interceptor.registerApplicationMaster(registerReq);
- Assert.assertNotNull(registerResponse);
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
- Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
- // Allocate the first batch of containers
- registerSubCluster(SubClusterId.newInstance("SC-1"));
- registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+ // Allocate the first batch of containers
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+ registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
- interceptor.setShouldReRegisterNext();
+ interceptor.setShouldReRegisterNext();
- int numberOfContainers = 3;
- List<Container> containers =
- getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
- Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+ Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
- interceptor.setShouldReRegisterNext();
+ interceptor.setShouldReRegisterNext();
- // Release all containers
- releaseContainersAndAssert(containers);
+ // Release all containers
+ releaseContainersAndAssert(containers);
- interceptor.setShouldReRegisterNext();
+ interceptor.setShouldReRegisterNext();
- // Finish the application
- FinishApplicationMasterRequest finishReq =
- Records.newRecord(FinishApplicationMasterRequest.class);
- finishReq.setDiagnostics("");
- finishReq.setTrackingUrl("");
- finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
- FinishApplicationMasterResponse finshResponse =
- interceptor.finishApplicationMaster(finishReq);
- Assert.assertNotNull(finshResponse);
- Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ FinishApplicationMasterResponse finshResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ return null;
+ }
+ });
}
/*
@@ -442,6 +488,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
// Use port number 1001 to let mock RM block in the register call
response = interceptor.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 1001, null));
+ lastResponseId = 0;
} catch (Exception e) {
LOG.info("Register thread exception", e);
response = null;
@@ -460,9 +507,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
testRecover(null);
}
- public void testRecover(RegistryOperations registryObj) throws Exception {
- ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
- userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
+ protected void testRecover(final RegistryOperations registryObj)
+ throws Exception {
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
interceptor = new TestableFederationInterceptor();
@@ -480,6 +529,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
@@ -492,6 +542,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+ // Make sure all async hb threads are done
+ interceptor.drainAllAsyncQueue(true);
+
// Prepare for Federation Interceptor restart and recover
Map<String, byte[]> recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId);
@@ -517,22 +570,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
- Assert.assertEquals(Integer.MAX_VALUE,
- interceptor.getLastHomeResponseId());
// The first allocate call expects a fail-over exception and re-register
- int responseId = 10;
- AllocateRequest allocateRequest =
- Records.newRecord(AllocateRequest.class);
- allocateRequest.setResponseId(responseId);
try {
- interceptor.allocate(allocateRequest);
+ AllocateRequest allocateRequest =
+ Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(lastResponseId);
+ AllocateResponse allocateResponse =
+ interceptor.allocate(allocateRequest);
+ lastResponseId = allocateResponse.getResponseId();
Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
+ " after FederationInterceptor restarts and recovers");
} catch (ApplicationMasterNotRegisteredException e) {
}
interceptor.registerApplicationMaster(registerReq);
- Assert.assertEquals(responseId, interceptor.getLastHomeResponseId());
+ lastResponseId = 0;
// Release all containers
releaseContainersAndAssert(containers);
@@ -614,6 +666,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
}
}
@@ -629,6 +682,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
// Register the application second time with a different request obj
registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
@@ -637,6 +691,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
registerReq.setTrackingUrl("different");
try {
registerResponse = interceptor.registerApplicationMaster(registerReq);
+ lastResponseId = 0;
Assert.fail("Should throw if a different request obj is used");
} catch (YarnException e) {
}
@@ -689,20 +744,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
@Test
public void testSecondAttempt() throws Exception {
- ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
- userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
+ final RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application
- RegisterApplicationMasterRequest registerReq =
- Records.newRecord(RegisterApplicationMasterRequest.class);
- registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
- registerReq.setTrackingUrl("");
-
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
@@ -714,10 +771,13 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
- System.out.println(c.getId() + " ha");
+ LOG.info("Allocated container " + c.getId());
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+ // Make sure all async hb threads are done
+ interceptor.drainAllAsyncQueue(true);
+
// Preserve the mock RM instances for secondaries
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
@@ -729,8 +789,20 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor = new TestableFederationInterceptor(null, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registry));
- registerResponse = interceptor.registerApplicationMaster(registerReq);
+ return null;
+ }
+ });
+ // Update the ugi with new attemptId
+ ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ lastResponseId = 0;
+
+ int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(numberOfContainers,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org