You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/06/07 21:49:41 UTC
[50/50] [abbrv] hadoop git commit: YARN-6511. Federation:
transparently spanning application across multiple sub-clusters. (Botong
Huang via Subru).
YARN-6511. Federation: transparently spanning application across multiple sub-clusters. (Botong Huang via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3ed6365
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3ed6365
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3ed6365
Branch: refs/heads/YARN-2915
Commit: e3ed6365bb17cf09ddc26440c80e210bf024b450
Parents: 031febd
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Jun 7 14:45:51 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Wed Jun 7 14:46:41 2017 -0700
----------------------------------------------------------------------
.../policies/FederationPolicyUtils.java | 168 +++++
.../federation/policies/RouterPolicyFacade.java | 21 +-
.../amrmproxy/FederationInterceptor.java | 685 ++++++++++++++++++-
.../amrmproxy/TestFederationInterceptor.java | 252 +++++++
.../TestableFederationInterceptor.java | 6 +
5 files changed, 1096 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ed6365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
new file mode 100644
index 0000000..37ce942
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for Federation policy.
+ */
+@Private
+public final class FederationPolicyUtils {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationPolicyUtils.class);
+
+ /** Disable constructor. */
+ private FederationPolicyUtils() {
+ }
+
+ /**
+ * A utilize method to instantiate a policy manager class given the type
+ * (class name) from {@link SubClusterPolicyConfiguration}.
+ *
+ * @param newType class name of the policy manager to create
+ * @return Policy manager
+ * @throws FederationPolicyInitializationException if fails
+ */
+ public static FederationPolicyManager instantiatePolicyManager(String newType)
+ throws FederationPolicyInitializationException {
+ FederationPolicyManager federationPolicyManager = null;
+ try {
+ // create policy instance and set queue
+ Class<?> c = Class.forName(newType);
+ federationPolicyManager = (FederationPolicyManager) c.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new FederationPolicyInitializationException(e);
+ } catch (InstantiationException e) {
+ throw new FederationPolicyInitializationException(e);
+ } catch (IllegalAccessException e) {
+ throw new FederationPolicyInitializationException(e);
+ }
+ return federationPolicyManager;
+ }
+
+ /**
+ * Get Federation policy configuration from state store, using default queue
+ * and configuration as fallback.
+ *
+ * @param queue the queue of the application
+ * @param conf the Yarn configuration
+ * @param federationFacade state store facade
+ * @return SubClusterPolicyConfiguration recreated
+ */
+ public static SubClusterPolicyConfiguration loadPolicyConfiguration(
+ String queue, Configuration conf,
+ FederationStateStoreFacade federationFacade) {
+
+ // The facade might cache this request, based on its parameterization
+ SubClusterPolicyConfiguration configuration = null;
+ if (queue != null) {
+ try {
+ configuration = federationFacade.getPolicyConfiguration(queue);
+ } catch (YarnException e) {
+ LOG.warn("Failed to get policy from FederationFacade with queue "
+ + queue + ": " + e.getMessage());
+ }
+ }
+
+ // If there is no policy configured for this queue, fallback to the baseline
+ // policy that is configured either in the store or via XML config
+ if (configuration == null) {
+ LOG.info("No policy configured for queue {} in StateStore,"
+ + " fallback to default queue", queue);
+ queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
+ try {
+ configuration = federationFacade.getPolicyConfiguration(queue);
+ } catch (YarnException e) {
+ LOG.warn("No fallback behavior defined in store, defaulting to XML "
+ + "configuration fallback behavior.");
+ }
+ }
+
+ // or from XML conf otherwise.
+ if (configuration == null) {
+ LOG.info("No policy configured for default queue {} in StateStore,"
+ + " fallback to local config", queue);
+
+ String defaultFederationPolicyManager =
+ conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
+ String defaultPolicyParamString =
+ conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+ ByteBuffer defaultPolicyParam = ByteBuffer
+ .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
+
+ configuration = SubClusterPolicyConfiguration.newInstance(queue,
+ defaultFederationPolicyManager, defaultPolicyParam);
+ }
+ return configuration;
+ }
+
+ /**
+ * Get AMRMProxy policy from state store, using default queue and
+ * configuration as fallback.
+ *
+ * @param queue the queue of the application
+ * @param oldPolicy the previous policy instance (can be null)
+ * @param conf the Yarn configuration
+ * @param federationFacade state store facade
+ * @param homeSubClusterId home sub-cluster id
+ * @return FederationAMRMProxyPolicy recreated
+ * @throws FederationPolicyInitializationException if fails
+ */
+ public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue,
+ FederationAMRMProxyPolicy oldPolicy, Configuration conf,
+ FederationStateStoreFacade federationFacade,
+ SubClusterId homeSubClusterId)
+ throws FederationPolicyInitializationException {
+
+ // Local policy and its configuration
+ SubClusterPolicyConfiguration configuration =
+ loadPolicyConfiguration(queue, conf, federationFacade);
+
+ // Instantiate the policyManager and get policy
+ FederationPolicyInitializationContext context =
+ new FederationPolicyInitializationContext(configuration,
+ federationFacade.getSubClusterResolver(), federationFacade,
+ homeSubClusterId);
+
+ LOG.info("Creating policy manager of type: " + configuration.getType());
+ FederationPolicyManager federationPolicyManager =
+ instantiatePolicyManager(configuration.getType());
+ // set queue, reinit policy if required (implementation lazily check
+ // content of conf), and cache it
+ federationPolicyManager.setQueue(configuration.getQueue());
+ return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ed6365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index 8c22623..5e31a08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -95,7 +95,7 @@ public class RouterPolicyFacade {
new FederationPolicyInitializationContext(configuration,
subClusterResolver, federationFacade, homeSubcluster);
FederationPolicyManager fallbackPolicyManager =
- instantiatePolicyManager(configuration.getType());
+ FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
fallbackPolicyManager.setQueue(defaulKey);
// add to the cache the fallback behavior
@@ -209,7 +209,7 @@ public class RouterPolicyFacade {
FederationRouterPolicy routerPolicy = policyMap.get(queue);
FederationPolicyManager federationPolicyManager =
- instantiatePolicyManager(newType);
+ FederationPolicyUtils.instantiatePolicyManager(newType);
// set queue, reinit policy if required (implementation lazily check
// content of conf), and cache it
federationPolicyManager.setQueue(queue);
@@ -224,23 +224,6 @@ public class RouterPolicyFacade {
}
}
- private static FederationPolicyManager instantiatePolicyManager(
- String newType) throws FederationPolicyInitializationException {
- FederationPolicyManager federationPolicyManager = null;
- try {
- // create policy instance and set queue
- Class c = Class.forName(newType);
- federationPolicyManager = (FederationPolicyManager) c.newInstance();
- } catch (ClassNotFoundException e) {
- throw new FederationPolicyInitializationException(e);
- } catch (InstantiationException e) {
- throw new FederationPolicyInitializationException(e);
- } catch (IllegalAccessException e) {
- throw new FederationPolicyInitializationException(e);
- }
- return federationPolicyManager;
- }
-
/**
* This method flushes all cached configurations and policies. This should be
* invoked if the facade remains activity after very large churn of queues in
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ed6365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 5f82d69..ffe47f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -24,7 +24,14 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@@ -38,20 +45,35 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
/**
* Extends the AbstractRequestInterceptor and provides an implementation for
* federation of YARN RM and scaling an application across multiple YARN
@@ -70,6 +92,27 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private SubClusterId homeSubClusterId;
/**
+ * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
+ * using subClusterId as uamId. One UAM is created per sub-cluster RM except
+ * the home RM.
+ *
+ * Creation and register of UAM in secondary sub-clusters happen on-demand,
+ * when AMRMProxy policy routes resource request to these sub-clusters for the
+ * first time. AM heart beats to them are also handled asynchronously for
+ * performance reasons.
+ */
+ private UnmanagedAMPoolManager uamPool;
+
+ /** Thread pool used for asynchronous operations. */
+ private ExecutorService threadpool;
+
+ /**
+ * Stores the AllocateResponses that are received asynchronously from all the
+ * sub-cluster resource managers except the home RM.
+ */
+ private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
+
+ /**
* Used to keep track of the container Id and the sub cluster RM that created
* the container, so that we know which sub-cluster to forward later requests
* about existing containers to.
@@ -89,7 +132,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private RegisterApplicationMasterResponse amRegistrationResponse;
- /** The proxy ugi used to talk to home RM. */
+ private FederationStateStoreFacade federationFacade;
+
+ private SubClusterResolver subClusterResolver;
+
+ /** The policy used to split requests among sub-clusters. */
+ private FederationAMRMProxyPolicy policyInterpreter;
+
+ /**
+ * The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken
+ * issued by home RM.
+ */
private UserGroupInformation appOwner;
/**
@@ -97,6 +150,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
public FederationInterceptor() {
this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
+ this.asyncResponseSink = new ConcurrentHashMap<>();
+ this.threadpool = Executors.newCachedThreadPool();
+ this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
+ this.amRegistrationRequest = null;
this.amRegistrationResponse = null;
}
@@ -126,6 +183,15 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRM = createHomeRMProxy(appContext);
+
+ this.federationFacade = FederationStateStoreFacade.getInstance();
+ this.subClusterResolver = this.federationFacade.getSubClusterResolver();
+
+ // AMRMProxyPolicy will be initialized in registerApplicationMaster
+ this.policyInterpreter = null;
+
+ this.uamPool.init(conf);
+ this.uamPool.start();
}
/**
@@ -202,7 +268,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (queue == null) {
LOG.warn("Received null queue for application "
+ getApplicationContext().getApplicationAttemptId().getApplicationId()
- + " from home subcluster. Will use default queue name "
+ + " from home sub-cluster. Will use default queue name "
+ YarnConfiguration.DEFAULT_QUEUE_NAME
+ " for getting AMRMProxyPolicy");
} else {
@@ -211,6 +277,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ " belongs to queue " + queue);
}
+ // Initialize the AMRMProxyPolicy
+ try {
+ this.policyInterpreter =
+ FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
+ getConf(), this.federationFacade, this.homeSubClusterId);
+ } catch (FederationPolicyInitializationException e) {
+ throw new YarnRuntimeException(e);
+ }
return this.amRegistrationResponse;
}
@@ -221,6 +295,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException {
+ Preconditions.checkArgument(this.policyInterpreter != null,
+ "Allocate should be called after registerApplicationMaster");
try {
// Split the heart beat request into multiple requests, one for each
@@ -228,12 +304,28 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
Map<SubClusterId, AllocateRequest> requests =
splitAllocateRequest(request);
+ // Send the requests to the secondary sub-cluster resource managers.
+ // These secondary requests are send asynchronously and the responses will
+ // be collected and merged with the home response. In addition, it also
+ // return the newly registered Unmanaged AMs.
+ Registrations newRegistrations =
+ sendRequestsToSecondaryResourceManagers(requests);
+
// Send the request to the home RM and get the response
AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
requests.get(this.homeSubClusterId), this.homeRM,
this.amRegistrationRequest,
getApplicationContext().getApplicationAttemptId());
+ // Notify policy of home response
+ try {
+ this.policyInterpreter.notifyOfResponse(this.homeSubClusterId,
+ homeResponse);
+ } catch (YarnException e) {
+ LOG.warn("notifyOfResponse for policy failed for home sub-cluster "
+ + this.homeSubClusterId, e);
+ }
+
// If the resource manager sent us a new token, add to the current user
if (homeResponse.getAMRMToken() != null) {
LOG.debug("Received new AMRMToken");
@@ -244,6 +336,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// Merge the responses from home and secondary sub-cluster RMs
homeResponse = mergeAllocateResponses(homeResponse);
+ // Merge the containers and NMTokens from the new registrations into
+ // the homeResponse.
+ if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
+ homeResponse = mergeRegistrationResponses(homeResponse,
+ newRegistrations.getSuccessfulRegistrations());
+ }
+
// return the final response to the application master.
return homeResponse;
} catch (IOException ex) {
@@ -261,10 +360,83 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
FinishApplicationMasterRequest request)
throws YarnException, IOException {
+ // TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
+ boolean failedToUnRegister = false;
+ ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =
+ null;
+
+ // Application master is completing operation. Send the finish
+ // application master request to all the registered sub-cluster resource
+ // managers in parallel, wait for the responses and aggregate the results.
+ Set<String> subClusterIds = this.uamPool.getAllUAMIds();
+ if (subClusterIds.size() > 0) {
+ final FinishApplicationMasterRequest finishRequest = request;
+ compSvc =
+ new ExecutorCompletionService<FinishApplicationMasterResponseInfo>(
+ this.threadpool);
+
+ LOG.info("Sending finish application request to {} sub-cluster RMs",
+ subClusterIds.size());
+ for (final String subClusterId : subClusterIds) {
+ compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>() {
+ @Override
+ public FinishApplicationMasterResponseInfo call() throws Exception {
+ LOG.info("Sending finish application request to RM {}",
+ subClusterId);
+ FinishApplicationMasterResponse uamResponse = null;
+ try {
+ uamResponse =
+ uamPool.finishApplicationMaster(subClusterId, finishRequest);
+ } catch (Throwable e) {
+ LOG.warn("Failed to finish unmanaged application master: "
+ + "RM address: " + subClusterId + " ApplicationId: "
+ + getApplicationContext().getApplicationAttemptId(), e);
+ }
+ return new FinishApplicationMasterResponseInfo(uamResponse,
+ subClusterId);
+ }
+ });
+ }
+ }
+
+ // While the finish application request is being processed
+ // asynchronously by other sub-cluster resource managers, send the same
+ // request to the home resource manager on this thread.
FinishApplicationMasterResponse homeResponse =
AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
this.amRegistrationRequest,
getApplicationContext().getApplicationAttemptId());
+
+ if (subClusterIds.size() > 0) {
+ // Wait for other sub-cluster resource managers to return the
+ // response and merge it with the home response
+ LOG.info(
+ "Waiting for finish application response from {} sub-cluster RMs",
+ subClusterIds.size());
+ for (int i = 0; i < subClusterIds.size(); ++i) {
+ try {
+ Future<FinishApplicationMasterResponseInfo> future = compSvc.take();
+ FinishApplicationMasterResponseInfo uamResponse = future.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received finish application response from RM: "
+ + uamResponse.getSubClusterId());
+ }
+ if (uamResponse.getResponse() == null
+ || !uamResponse.getResponse().getIsUnregistered()) {
+ failedToUnRegister = true;
+ }
+ } catch (Throwable e) {
+ failedToUnRegister = true;
+ LOG.warn("Failed to finish unmanaged application master: "
+ + " ApplicationId: "
+ + getApplicationContext().getApplicationAttemptId(), e);
+ }
+ }
+ }
+
+ if (failedToUnRegister) {
+ homeResponse.setIsUnregistered(false);
+ }
return homeResponse;
}
@@ -281,10 +453,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
@Override
public void shutdown() {
+ if (this.uamPool != null) {
+ this.uamPool.stop();
+ }
+ if (threadpool != null) {
+ try {
+ threadpool.shutdown();
+ } catch (Throwable ex) {
+ }
+ threadpool = null;
+ }
super.shutdown();
}
/**
+ * Create the UAM pool manager for secondary sub-clsuters. For unit test to
+ * override.
+ *
+ * @param threadPool the thread pool to use
+ * @return the UAM pool manager instance
+ */
+ @VisibleForTesting
+ protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
+ ExecutorService threadPool) {
+ return new UnmanagedAMPoolManager(threadPool);
+ }
+
+ /**
* Returns instance of the ApplicationMasterProtocol proxy class that is used
* to connect to the Home resource manager.
*
@@ -302,6 +497,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
+ private SubClusterId getSubClusterForNode(String nodeName) {
+ SubClusterId subClusterId = null;
+ try {
+ subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName);
+ } catch (YarnException e) {
+ LOG.error("Failed to resolve sub-cluster for node " + nodeName
+ + ", skipping this node", e);
+ return null;
+ }
+ if (subClusterId == null) {
+ LOG.error("Failed to resolve sub-cluster for node {}, skipping this node",
+ nodeName);
+ return null;
+ }
+ return subClusterId;
+ }
+
/**
* In federation, the heart beat request needs to be sent to all the sub
* clusters from which the AM has requested containers. This method splits the
@@ -317,20 +529,39 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
requestMap);
+ // Create heart beat request instances for all other already registered
+ // sub-cluster resource managers
+ Set<String> subClusterIds = this.uamPool.getAllUAMIds();
+ for (String subClusterId : subClusterIds) {
+ findOrCreateAllocateRequestForSubCluster(
+ SubClusterId.newInstance(subClusterId), request, requestMap);
+ }
+
if (!isNullOrEmpty(request.getAskList())) {
- AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
- this.homeSubClusterId, request, requestMap);
- newRequest.getAskList().addAll(request.getAskList());
+ // Ask the federation policy interpreter to split the ask list for
+ // sending it to all the sub-cluster resource managers.
+ Map<SubClusterId, List<ResourceRequest>> asks =
+ splitResourceRequests(request.getAskList());
+
+ // Add the askLists to the corresponding sub-cluster requests.
+ for (Entry<SubClusterId, List<ResourceRequest>> entry : asks.entrySet()) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ entry.getKey(), request, requestMap);
+ newRequest.getAskList().addAll(entry.getValue());
+ }
}
if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
request.getResourceBlacklistRequest().getBlacklistAdditions())) {
for (String resourceName : request.getResourceBlacklistRequest()
.getBlacklistAdditions()) {
- AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
- this.homeSubClusterId, request, requestMap);
- newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
- .add(resourceName);
+ SubClusterId subClusterId = getSubClusterForNode(resourceName);
+ if (subClusterId != null) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ subClusterId, request, requestMap);
+ newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
+ .add(resourceName);
+ }
}
}
@@ -338,10 +569,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
request.getResourceBlacklistRequest().getBlacklistRemovals())) {
for (String resourceName : request.getResourceBlacklistRequest()
.getBlacklistRemovals()) {
- AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
- this.homeSubClusterId, request, requestMap);
- newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
- .add(resourceName);
+ SubClusterId subClusterId = getSubClusterForNode(resourceName);
+ if (subClusterId != null) {
+ AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
+ subClusterId, request, requestMap);
+ newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
+ .add(resourceName);
+ }
}
}
@@ -371,6 +605,174 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
+ * This methods sends the specified AllocateRequests to the appropriate
+ * sub-cluster resource managers.
+ *
+ * @param requests contains the heart beat requests to send to the resource
+ * manager keyed by the resource manager address
+ * @return the registration responses from the newly added sub-cluster
+ * resource managers
+ * @throws YarnException
+ * @throws IOException
+ */
+ private Registrations sendRequestsToSecondaryResourceManagers(
+ Map<SubClusterId, AllocateRequest> requests)
+ throws YarnException, IOException {
+
+ // Create new UAM instances for the sub-cluster that we have not seen
+ // before
+ Registrations registrations = registerWithNewSubClusters(requests.keySet());
+
+ // Now that all the registrations are done, send the allocation request
+ // to the sub-cluster RMs using the Unmanaged application masters
+ // asynchronously and don't wait for the response. The responses will
+ // arrive asynchronously and will be added to the response sink. These
+ // responses will be sent to the application master in some future heart
+ // beat response.
+ for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
+ final SubClusterId subClusterId = entry.getKey();
+
+ if (subClusterId.equals(this.homeSubClusterId)) {
+ // Skip the request for the home sub-cluster resource manager.
+ // It will be handled separately in the allocate() method
+ continue;
+ }
+
+ if (!this.uamPool.hasUAMId(subClusterId.getId())) {
+ // TODO: This means that the registration for this sub-cluster RM
+ // failed. For now, we ignore the resource requests and continue
+ // but we need to fix this and handle this situation. One way would
+ // be to send the request to another RM by consulting the policy.
+ LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
+ subClusterId);
+ continue;
+ }
+
+ this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
+ new AsyncCallback<AllocateResponse>() {
+ @Override
+ public void callback(AllocateResponse response) {
+ synchronized (asyncResponseSink) {
+ List<AllocateResponse> responses = null;
+ if (asyncResponseSink.containsKey(subClusterId)) {
+ responses = asyncResponseSink.get(subClusterId);
+ } else {
+ responses = new ArrayList<>();
+ asyncResponseSink.put(subClusterId, responses);
+ }
+ responses.add(response);
+ }
+
+ // Notify policy of secondary sub-cluster responses
+ try {
+ policyInterpreter.notifyOfResponse(subClusterId, response);
+ } catch (YarnException e) {
+ LOG.warn(
+ "notifyOfResponse for policy failed for home sub-cluster "
+ + subClusterId,
+ e);
+ }
+ }
+ });
+ }
+
+ return registrations;
+ }
+
+ /**
+ * This method ensures that Unmanaged AMs are created for each of the
+ * specified sub-cluster specified in the input and registers with the
+ * corresponding resource managers.
+ */
+ private Registrations registerWithNewSubClusters(
+ Set<SubClusterId> subClusterSet) throws IOException {
+
+ List<SubClusterId> failedRegistrations = new ArrayList<>();
+ Map<SubClusterId, RegisterApplicationMasterResponse>
+ successfulRegistrations = new HashMap<>();
+
+ // Check to see if there are any new sub-clusters in this request
+ // list and create and register Unmanaged AM instance for the new ones
+ List<String> newSubClusters = new ArrayList<>();
+ for (SubClusterId subClusterId : subClusterSet) {
+ if (!subClusterId.equals(this.homeSubClusterId)
+ && !this.uamPool.hasUAMId(subClusterId.getId())) {
+ newSubClusters.add(subClusterId.getId());
+ }
+ }
+
+ if (newSubClusters.size() > 0) {
+ final RegisterApplicationMasterRequest registerRequest =
+ this.amRegistrationRequest;
+ final AMRMProxyApplicationContext appContext = getApplicationContext();
+ ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
+ completionService = new ExecutorCompletionService<>(threadpool);
+
+ for (final String subClusterId : newSubClusters) {
+ completionService
+ .submit(new Callable<RegisterApplicationMasterResponseInfo>() {
+ @Override
+ public RegisterApplicationMasterResponseInfo call()
+ throws Exception {
+
+ // Create a config loaded with federation on and subclusterId
+ // for each UAM
+ YarnConfiguration config = new YarnConfiguration(getConf());
+ FederationProxyProviderUtil.updateConfForFederation(config,
+ subClusterId);
+
+ RegisterApplicationMasterResponse uamResponse = null;
+ try {
+ // For appNameSuffix, use subClusterId of the home sub-cluster
+ uamResponse = uamPool.createAndRegisterNewUAM(subClusterId,
+ registerRequest, config,
+ appContext.getApplicationAttemptId().getApplicationId(),
+ amRegistrationResponse.getQueue(), appContext.getUser(),
+ homeSubClusterId.toString());
+ } catch (Throwable e) {
+ LOG.error("Failed to register application master: "
+ + subClusterId + " Application: "
+ + appContext.getApplicationAttemptId(), e);
+ }
+ return new RegisterApplicationMasterResponseInfo(uamResponse,
+ SubClusterId.newInstance(subClusterId));
+ }
+ });
+ }
+
+ // Wait for other sub-cluster resource managers to return the
+ // response and add it to the Map for returning to the caller
+ for (int i = 0; i < newSubClusters.size(); ++i) {
+ try {
+ Future<RegisterApplicationMasterResponseInfo> future =
+ completionService.take();
+ RegisterApplicationMasterResponseInfo uamResponse = future.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received register application response from RM: "
+ + uamResponse.getSubClusterId());
+ }
+
+ if (uamResponse.getResponse() == null) {
+ failedRegistrations.add(uamResponse.getSubClusterId());
+ } else {
+ LOG.info("Successfully registered unmanaged application master: "
+ + uamResponse.getSubClusterId() + " ApplicationId: "
+ + getApplicationContext().getApplicationAttemptId());
+ successfulRegistrations.put(uamResponse.getSubClusterId(),
+ uamResponse.getResponse());
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to register unmanaged application master: "
+ + " ApplicationId: "
+ + getApplicationContext().getApplicationAttemptId(), e);
+ }
+ }
+ }
+
+ return new Registrations(successfulRegistrations, failedRegistrations);
+ }
+
+ /**
* Merges the responses from other sub-clusters that we received
* asynchronously with the specified home cluster response and keeps track of
* the containers received from each sub-cluster resource managers.
@@ -388,6 +790,24 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
cacheAllocatedContainers(homeResponse.getAllocatedContainers(),
this.homeSubClusterId);
+ synchronized (this.asyncResponseSink) {
+ for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink
+ .entrySet()) {
+ SubClusterId subClusterId = entry.getKey();
+ List<AllocateResponse> responses = entry.getValue();
+ if (responses.size() > 0) {
+ for (AllocateResponse response : responses) {
+ removeFinishedContainersFromCache(
+ response.getCompletedContainersStatuses());
+ cacheAllocatedContainers(response.getAllocatedContainers(),
+ subClusterId);
+ mergeAllocateResponse(homeResponse, response, subClusterId);
+ }
+ responses.clear();
+ }
+ }
+ }
+
return homeResponse;
}
@@ -405,6 +825,130 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
+ * Helper method for merging the responses from the secondary sub cluster RMs
+ * with the home response to return to the AM.
+ */
+ private AllocateResponse mergeRegistrationResponses(
+ AllocateResponse homeResponse,
+ Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
+
+ for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry :
+ registrations.entrySet()) {
+ RegisterApplicationMasterResponse registration = entry.getValue();
+
+ if (!isNullOrEmpty(registration.getContainersFromPreviousAttempts())) {
+ List<Container> tempContainers = homeResponse.getAllocatedContainers();
+ if (!isNullOrEmpty(tempContainers)) {
+ tempContainers
+ .addAll(registration.getContainersFromPreviousAttempts());
+ homeResponse.setAllocatedContainers(tempContainers);
+ } else {
+ homeResponse.setAllocatedContainers(
+ registration.getContainersFromPreviousAttempts());
+ }
+ cacheAllocatedContainers(
+ registration.getContainersFromPreviousAttempts(), entry.getKey());
+ }
+
+ if (!isNullOrEmpty(registration.getNMTokensFromPreviousAttempts())) {
+ List<NMToken> tempTokens = homeResponse.getNMTokens();
+ if (!isNullOrEmpty(tempTokens)) {
+ tempTokens.addAll(registration.getNMTokensFromPreviousAttempts());
+ homeResponse.setNMTokens(tempTokens);
+ } else {
+ homeResponse
+ .setNMTokens(registration.getNMTokensFromPreviousAttempts());
+ }
+ }
+ }
+
+ return homeResponse;
+ }
+
+ private void mergeAllocateResponse(AllocateResponse homeResponse,
+ AllocateResponse otherResponse, SubClusterId otherRMAddress) {
+
+ if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) {
+ if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) {
+ homeResponse.getAllocatedContainers()
+ .addAll(otherResponse.getAllocatedContainers());
+ } else {
+ homeResponse
+ .setAllocatedContainers(otherResponse.getAllocatedContainers());
+ }
+ }
+
+ if (otherResponse.getAvailableResources() != null) {
+ if (homeResponse.getAvailableResources() != null) {
+ homeResponse.setAvailableResources(
+ Resources.add(homeResponse.getAvailableResources(),
+ otherResponse.getAvailableResources()));
+ } else {
+ homeResponse
+ .setAvailableResources(otherResponse.getAvailableResources());
+ }
+ }
+
+ if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
+ if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
+ homeResponse.getCompletedContainersStatuses()
+ .addAll(otherResponse.getCompletedContainersStatuses());
+ } else {
+ homeResponse.setCompletedContainersStatuses(
+ otherResponse.getCompletedContainersStatuses());
+ }
+ }
+
+ if (!isNullOrEmpty(otherResponse.getUpdatedNodes())) {
+ if (!isNullOrEmpty(homeResponse.getUpdatedNodes())) {
+ homeResponse.getUpdatedNodes().addAll(otherResponse.getUpdatedNodes());
+ } else {
+ homeResponse.setUpdatedNodes(otherResponse.getUpdatedNodes());
+ }
+ }
+
+ if (!isNullOrEmpty(otherResponse.getNMTokens())) {
+ if (!isNullOrEmpty(homeResponse.getNMTokens())) {
+ homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
+ } else {
+ homeResponse.setNMTokens(otherResponse.getNMTokens());
+ }
+ }
+
+ PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
+ PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
+
+ if (homePreempMessage == null && otherPreempMessage != null) {
+ homeResponse.setPreemptionMessage(otherPreempMessage);
+ }
+
+ if (homePreempMessage != null && otherPreempMessage != null) {
+ PreemptionContract par1 = homePreempMessage.getContract();
+ PreemptionContract par2 = otherPreempMessage.getContract();
+
+ if (par1 == null && par2 != null) {
+ homePreempMessage.setContract(par2);
+ }
+
+ if (par1 != null && par2 != null) {
+ par1.getResourceRequest().addAll(par2.getResourceRequest());
+ par2.getContainers().addAll(par2.getContainers());
+ }
+
+ StrictPreemptionContract spar1 = homePreempMessage.getStrictContract();
+ StrictPreemptionContract spar2 = otherPreempMessage.getStrictContract();
+
+ if (spar1 == null && spar2 != null) {
+ homePreempMessage.setStrictContract(spar2);
+ }
+
+ if (spar1 != null && spar2 != null) {
+ spar1.getContainers().addAll(spar2.getContainers());
+ }
+ }
+ }
+
+ /**
* Add allocated containers to cache mapping.
*/
private void cacheAllocatedContainers(List<Container> containers,
@@ -418,10 +962,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// container allocation more than once. Just move on in this case.
LOG.warn(
"Duplicate containerID: {} found in the allocated containers"
- + " from same subcluster: {}, so ignoring.",
+ + " from same sub-cluster: {}, so ignoring.",
container.getId(), subClusterId);
} else {
- // The same container allocation from different subclusters,
+ // The same container allocation from different sub-clusters,
// something is wrong.
// TODO: YARN-6667 if some subcluster RM is configured wrong, we
// should not fail the entire heartbeat.
@@ -432,7 +976,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ " ApplicationId: "
+ getApplicationContext().getApplicationAttemptId()
+ " From RM: " + subClusterId
- + " . Previous container was from subcluster: "
+ + " . Previous container was from sub-cluster: "
+ existingSubClusterId);
}
}
@@ -498,7 +1042,102 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
/**
- * Utility method to check if the specified Collection is null or empty
+ * Splits the specified request to send it to different sub clusters. The
+ * splitting algorithm is very simple. If the request does not have a node
+ * preference, the policy decides the sub cluster. If the request has a node
+ * preference and if locality is required, then it is sent to the sub cluster
+ * that contains the requested node. If node preference is specified and
+ * locality is not required, then the policy decides the sub cluster.
+ *
+ * @param askList the ask list to split
+ * @return the split asks
+ * @throws YarnException if split fails
+ */
+ protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
+ List<ResourceRequest> askList) throws YarnException {
+ return this.policyInterpreter.splitResourceRequests(askList);
+ }
+
+ @VisibleForTesting
+ public int getUnmanagedAMPoolSize() {
+ return this.uamPool.getAllUAMIds().size();
+ }
+
+ /**
+ * Private structure for encapsulating SubClusterId and
+ * RegisterApplicationMasterResponse instances.
+ */
+ private static class RegisterApplicationMasterResponseInfo {
+ private RegisterApplicationMasterResponse response;
+ private SubClusterId subClusterId;
+
+ RegisterApplicationMasterResponseInfo(
+ RegisterApplicationMasterResponse response, SubClusterId subClusterId) {
+ this.response = response;
+ this.subClusterId = subClusterId;
+ }
+
+ public RegisterApplicationMasterResponse getResponse() {
+ return response;
+ }
+
+ public SubClusterId getSubClusterId() {
+ return subClusterId;
+ }
+ }
+
+ /**
+ * Private structure for encapsulating SubClusterId and
+ * FinishApplicationMasterResponse instances.
+ */
+ private static class FinishApplicationMasterResponseInfo {
+ private FinishApplicationMasterResponse response;
+ private String subClusterId;
+
+ FinishApplicationMasterResponseInfo(
+ FinishApplicationMasterResponse response, String subClusterId) {
+ this.response = response;
+ this.subClusterId = subClusterId;
+ }
+
+ public FinishApplicationMasterResponse getResponse() {
+ return response;
+ }
+
+ public String getSubClusterId() {
+ return subClusterId;
+ }
+ }
+
+ /**
+ * Private structure for encapsulating successful and failed application
+ * master registration responses.
+ */
+ private static class Registrations {
+ private Map<SubClusterId, RegisterApplicationMasterResponse>
+ successfulRegistrations;
+ private List<SubClusterId> failedRegistrations;
+
+ Registrations(
+ Map<SubClusterId, RegisterApplicationMasterResponse>
+ successfulRegistrations,
+ List<SubClusterId> failedRegistrations) {
+ this.successfulRegistrations = successfulRegistrations;
+ this.failedRegistrations = failedRegistrations;
+ }
+
+ public Map<SubClusterId, RegisterApplicationMasterResponse>
+ getSuccessfulRegistrations() {
+ return this.successfulRegistrations;
+ }
+
+ public List<SubClusterId> getFailedRegistrations() {
+ return this.failedRegistrations;
+ }
+ }
+
+ /**
+ * Utility method to check if the specified Collection is null or empty.
*
* @param c the collection object
* @param <T> element type of the collection
@@ -507,4 +1146,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public static <T> boolean isNullOrEmpty(Collection<T> c) {
return (c == null || c.size() == 0);
}
+
+ /**
+ * Utility method to check if the specified Collection is null or empty.
+ *
+ * @param c the map object
+ * @param <T1> key type of the map
+ * @param <T2> value type of the map
+ * @return whether is it is null or empty
+ */
+ public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) {
+ return (c == null || c.size() == 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ed6365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index e1aedb4..e791b9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -18,12 +18,31 @@
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -43,6 +62,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
public static final String HOME_SC_ID = "SC-home";
private TestableFederationInterceptor interceptor;
+ private MemoryFederationStateStore stateStore;
private int testAppId;
private ApplicationAttemptId attemptId;
@@ -52,6 +72,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
super.setUp();
interceptor = new TestableFederationInterceptor();
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(getConf());
+ FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+ getConf());
+
testAppId = 1;
attemptId = getApplicationAttemptId(testAppId);
interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
@@ -80,11 +105,238 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + TestableFederationInterceptor.class.getName());
+ conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+ UniformBroadcastPolicyManager.class.getName());
+
conf.set(YarnConfiguration.RM_CLUSTER_ID, HOME_SC_ID);
+ // Disable StateStoreFacade cache
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
return conf;
}
+ private void registerSubCluster(SubClusterId subClusterId)
+ throws YarnException {
+ stateStore
+ .registerSubCluster(SubClusterRegisterRequest.newInstance(SubClusterInfo
+ .newInstance(subClusterId, "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3",
+ "1.2.3.4:4", SubClusterState.SC_RUNNING, 0, "capacity")));
+ }
+
+ private void deRegisterSubCluster(SubClusterId subClusterId)
+ throws YarnException {
+ stateStore.deregisterSubCluster(SubClusterDeregisterRequest
+ .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
+ }
+
+ private List<Container> getContainersAndAssert(int numberOfResourceRequests,
+ int numberOfAllocationExcepted) throws Exception {
+ AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(1);
+
+ List<Container> containers =
+ new ArrayList<Container>(numberOfResourceRequests);
+ List<ResourceRequest> askList =
+ new ArrayList<ResourceRequest>(numberOfResourceRequests);
+ for (int id = 0; id < numberOfResourceRequests; id++) {
+ askList.add(createResourceRequest("test-node-" + Integer.toString(id),
+ 6000, 2, id % 5, 1));
+ }
+
+ allocateRequest.setAskList(askList);
+
+ AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
+ Assert.assertNotNull("allocate() returned null response", allocateResponse);
+
+ containers.addAll(allocateResponse.getAllocatedContainers());
+ LOG.info("Number of allocated containers in the original request: "
+ + Integer.toString(allocateResponse.getAllocatedContainers().size()));
+
+ // Send max 10 heart beats to receive all the containers. If not, we will
+ // fail the test
+ int numHeartbeat = 0;
+ while (containers.size() < numberOfAllocationExcepted
+ && numHeartbeat++ < 10) {
+ allocateResponse =
+ interceptor.allocate(Records.newRecord(AllocateRequest.class));
+ Assert.assertNotNull("allocate() returned null response",
+ allocateResponse);
+
+ containers.addAll(allocateResponse.getAllocatedContainers());
+
+ LOG.info("Number of allocated containers in this request: "
+ + Integer.toString(allocateResponse.getAllocatedContainers().size()));
+ LOG.info("Total number of allocated containers: "
+ + Integer.toString(containers.size()));
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(numberOfAllocationExcepted, containers.size());
+ return containers;
+ }
+
+ private void releaseContainersAndAssert(List<Container> containers)
+ throws Exception {
+ Assert.assertTrue(containers.size() > 0);
+ AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(1);
+
+ List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
+ for (Container container : containers) {
+ relList.add(container.getId());
+ }
+
+ allocateRequest.setReleaseList(relList);
+
+ AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
+ Assert.assertNotNull(allocateResponse);
+
+ // The way the mock resource manager is setup, it will return the containers
+ // that were released in the allocated containers. The release request will
+ // be split and handled by the corresponding UAM. The release containers
+ // returned by the mock resource managers will be aggregated and returned
+ // back to us and we can check if total request size and returned size are
+ // the same
+ List<Container> containersForReleasedContainerIds =
+ new ArrayList<Container>();
+ containersForReleasedContainerIds
+ .addAll(allocateResponse.getAllocatedContainers());
+ LOG.info("Number of containers received in the original request: "
+ + Integer.toString(allocateResponse.getAllocatedContainers().size()));
+
+ // Send max 10 heart beats to receive all the containers. If not, we will
+ // fail the test
+ int numHeartbeat = 0;
+ while (containersForReleasedContainerIds.size() < relList.size()
+ && numHeartbeat++ < 10) {
+ allocateResponse =
+ interceptor.allocate(Records.newRecord(AllocateRequest.class));
+ Assert.assertNotNull(allocateResponse);
+ containersForReleasedContainerIds
+ .addAll(allocateResponse.getAllocatedContainers());
+
+ LOG.info("Number of containers received in this request: "
+ + Integer.toString(allocateResponse.getAllocatedContainers().size()));
+ LOG.info("Total number of containers received: "
+ + Integer.toString(containersForReleasedContainerIds.size()));
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(relList.size(),
+ containersForReleasedContainerIds.size());
+ }
+
+ @Test
+ public void testMultipleSubClusters() throws Exception {
+
+ // Register the application
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers, with sc1 and sc2 active
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+ registerSubCluster(SubClusterId.newInstance("SC-2"));
+
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+ Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the second batch of containers, with sc1 and sc3 active
+ deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
+ registerSubCluster(SubClusterId.newInstance("SC-3"));
+
+ numberOfContainers = 1;
+ containers.addAll(
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
+ Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the third batch of containers with only in home sub-cluster
+ // active
+ deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
+ deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
+ registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+
+ numberOfContainers = 2;
+ containers.addAll(
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
+ Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
+
+ // Release all containers
+ releaseContainersAndAssert(containers);
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finshResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ }
+
+ /*
+ * Test re-register when RM fails over.
+ */
+ @Test
+ public void testReregister() throws Exception {
+
+ // Register the application
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+ registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+
+ interceptor.setShouldReRegisterNext();
+
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+ Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+ interceptor.setShouldReRegisterNext();
+
+ // Release all containers
+ releaseContainersAndAssert(containers);
+
+ interceptor.setShouldReRegisterNext();
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finshResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ }
+
@Test
public void testRequestInterceptorChainCreation() throws Exception {
RequestInterceptor root =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3ed6365/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index 0ca7488..d4b8735 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -45,6 +45,12 @@ public class TestableFederationInterceptor extends FederationInterceptor {
private MockResourceManagerFacade mockRm;
@Override
+ protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
+ ExecutorService threadPool) {
+ return new TestableUnmanagedAMPoolManager(threadPool);
+ }
+
+ @Override
protected ApplicationMasterProtocol createHomeRMProxy(
AMRMProxyApplicationContext appContext) {
synchronized (this) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org