You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/08/02 00:25:22 UTC
[40/50] [abbrv] hadoop git commit: YARN-3659. Federation: routing
client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola
via Subru).
YARN-3659. Federation: routing client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52daa6d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52daa6d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52daa6d9
Branch: refs/heads/YARN-2915
Commit: 52daa6d971ae408d121b3737ea8c0575e7e8516d
Parents: f8e5de5
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Jun 26 13:27:26 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Aug 1 17:22:12 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 8 +
.../yarn/conf/TestYarnConfigurationFields.java | 5 +
.../hadoop/yarn/conf/TestYarnConfiguration.java | 1 -
...ionPolicyInitializationContextValidator.java | 2 +-
.../policies/FederationPolicyUtils.java | 35 +
.../federation/policies/RouterPolicyFacade.java | 3 +-
.../policies/router/HashBasedRouterPolicy.java | 5 +
.../policies/router/LoadBasedRouterPolicy.java | 5 +
.../policies/router/PriorityRouterPolicy.java | 5 +
.../router/UniformRandomRouterPolicy.java | 4 +
.../router/WeightedRandomRouterPolicy.java | 5 +
.../yarn/server/MockResourceManagerFacade.java | 165 ++++-
.../policies/router/BaseRouterPoliciesTest.java | 28 +
.../utils/FederationStateStoreTestUtil.java | 15 +-
.../hadoop-yarn-server-router/pom.xml | 5 +
.../yarn/server/router/RouterServerUtil.java | 63 ++
.../AbstractClientRequestInterceptor.java | 41 +-
.../DefaultClientRequestInterceptor.java | 31 +-
.../clientrm/FederationClientInterceptor.java | 684 +++++++++++++++++++
.../router/clientrm/BaseRouterClientRMTest.java | 22 +-
.../TestFederationClientInterceptor.java | 403 +++++++++++
.../TestFederationClientInterceptorRetry.java | 295 ++++++++
.../TestableFederationClientInterceptor.java | 75 ++
23 files changed, 1866 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1432867..0c0dd11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2674,6 +2674,14 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.rmadmin."
+ "DefaultRMAdminRequestInterceptor";
+ /**
+ * The number of retries for GetNewApplication and SubmitApplication in
+ * {@code FederationClientInterceptor}.
+ */
+ public static final String ROUTER_CLIENTRM_SUBMIT_RETRY =
+ ROUTER_PREFIX + "submit.retry";
+ public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;
+
////////////////////////////////
// Other Configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index c3cb78d..a439cd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -136,6 +136,11 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
+ // Ignore all Router Federation variables
+
+ configurationPrefixToSkipCompare
+ .add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
+
// Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
index 7389423..a053fdb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.Test;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
index 3c44e7e..da63bc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
@@ -51,7 +51,7 @@ public final class FederationPolicyInitializationContextValidator {
if (policyContext.getFederationSubclusterResolver() == null) {
throw new FederationPolicyInitializationException(
- "The FederationStateStoreFacase provided is null. Cannot"
+ "The FederationSubclusterResolver provided is null. Cannot"
+ " reinitalize successfully.");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
index 37ce942..97e4848 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -41,6 +43,9 @@ public final class FederationPolicyUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FederationPolicyUtils.class);
+ public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE =
+ "No active SubCluster available to submit the request.";
+
/** Disable constructor. */
private FederationPolicyUtils() {
}
@@ -165,4 +170,34 @@ public final class FederationPolicyUtils {
return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
}
+ /**
+ * Validate if there is any active subcluster that is not blacklisted, it will
+ * throw an exception if there are no usable subclusters.
+ *
+ * @param activeSubClusters the list of subClusters as identified by
+ * {@link SubClusterId} currently active.
+ * @param blackListSubClusters the list of subClusters as identified by
+ * {@link SubClusterId} to blackList from the selection of the home
+ * subCluster.
+ * @throws FederationPolicyException if there are no usable subclusters.
+ */
+ public static void validateSubClusterAvailability(
+ List<SubClusterId> activeSubClusters,
+ List<SubClusterId> blackListSubClusters)
+ throws FederationPolicyException {
+ if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
+ if (blackListSubClusters == null) {
+ return;
+ }
+ for (SubClusterId scId : activeSubClusters) {
+ if (!blackListSubClusters.contains(scId)) {
+ // There is at least one active subcluster
+ return;
+ }
+ }
+ }
+ throw new FederationPolicyException(
+ FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index 44c1b10..52c2905 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -56,7 +57,7 @@ public class RouterPolicyFacade {
@VisibleForTesting
Map<String, FederationRouterPolicy> globalPolicyMap;
- public RouterPolicyFacade(YarnConfiguration conf,
+ public RouterPolicyFacade(Configuration conf,
FederationStateStoreFacade facade, SubClusterResolver resolver,
SubClusterId homeSubcluster)
throws FederationPolicyInitializationException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
index 257a9fe..cc11880 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -76,6 +77,10 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
+ FederationPolicyUtils.validateSubClusterAvailability(
+ new ArrayList<SubClusterId>(activeSubclusters.keySet()),
+ blackListSubClusters);
+
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index c124001..06e445b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -17,12 +17,14 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -72,6 +74,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
+ FederationPolicyUtils.validateSubClusterAvailability(
+ new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
+
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
SubClusterIdInfo chosen = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index 59f8767..a1f7666 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -17,11 +17,13 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -44,6 +46,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
+ FederationPolicyUtils.validateSubClusterAvailability(
+ new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
+
// This finds the sub-cluster with the highest weight among the
// currently active ones.
Map<SubClusterIdInfo, Float> weights =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index bc729b7..7a8be91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -86,6 +87,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+ FederationPolicyUtils.validateSubClusterAvailability(list,
+ blackListSubClusters);
+
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index 7f230a7..aec7576 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -51,6 +53,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
+ FederationPolicyUtils.validateSubClusterAvailability(
+ new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
+
// note: we cannot pre-compute the weights, as the set of activeSubcluster
// changes dynamically (and this would unfairly spread the load to
// sub-clusters adjacent to an inactive one), hence we need to count/scan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 65c12c6..68c55ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server;
import java.io.IOException;
+import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -154,6 +155,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.base.Strings;
/**
@@ -175,6 +177,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
new HashMap<ContainerId, Container>();
private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf;
+ private int subClusterId;
+ final private AtomicInteger applicationCounter = new AtomicInteger(0);
+
+ // True if the Mock RM is running, false otherwise.
+ // This property allows us to write tests for specific scenario as Yarn RM
+ // down e.g. network issue, failover.
+ private boolean isRunning;
private boolean shouldReRegisterNext = false;
@@ -187,14 +196,25 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public MockResourceManagerFacade(Configuration conf,
int startContainerIndex) {
+ this(conf, startContainerIndex, 0, true);
+ }
+
+ public MockResourceManagerFacade(Configuration conf, int startContainerIndex,
+ int subClusterId, boolean isRunning) {
this.conf = conf;
this.containerIndex.set(startContainerIndex);
+ this.subClusterId = subClusterId;
+ this.isRunning = isRunning;
}
public void setShouldReRegisterNext() {
shouldReRegisterNext = true;
}
+ public void setRunningMode(boolean mode) {
+ this.isRunning = mode;
+ }
+
private static String getAppIdentifier() throws IOException {
AMRMTokenIdentifier result = null;
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
@@ -208,10 +228,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
return result != null ? result.getApplicationAttemptId().toString() : "";
}
+ private void validateRunning() throws ConnectException {
+ if (!isRunning) {
+ throw new ConnectException("RM is stopped");
+ }
+ }
+
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
String amrmToken = getAppIdentifier();
LOG.info("Registering application attempt: " + amrmToken);
@@ -248,6 +277,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
String amrmToken = getAppIdentifier();
LOG.info("Finishing application attempt: " + amrmToken);
@@ -284,6 +316,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
if (request.getAskList() != null && request.getAskList().size() > 0
&& request.getReleaseList() != null
&& request.getReleaseList().size() > 0) {
@@ -391,6 +426,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
+ validateRunning();
+
GetApplicationReportResponse response =
Records.newRecord(GetApplicationReportResponse.class);
ApplicationReport report = Records.newRecord(ApplicationReport.class);
@@ -407,6 +444,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
GetApplicationAttemptReportRequest request)
throws YarnException, IOException {
+ validateRunning();
+
GetApplicationAttemptReportResponse response =
Records.newRecord(GetApplicationAttemptReportResponse.class);
ApplicationAttemptReport report =
@@ -420,12 +459,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException {
- return GetNewApplicationResponse.newInstance(null, null, null);
+
+ validateRunning();
+
+ return GetNewApplicationResponse.newInstance(ApplicationId.newInstance(
+ subClusterId, applicationCounter.incrementAndGet()), null, null);
}
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
ApplicationId appId = null;
if (request.getApplicationSubmissionContext() != null) {
appId = request.getApplicationSubmissionContext().getApplicationId();
@@ -438,6 +484,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
ApplicationId appId = null;
if (request.getApplicationId() != null) {
appId = request.getApplicationId();
@@ -453,48 +502,72 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetClusterMetricsResponse.newInstance(null);
}
@Override
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return GetApplicationsResponse.newInstance(null);
}
@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return GetClusterNodesResponse.newInstance(null);
}
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return GetQueueInfoResponse.newInstance(null);
}
@Override
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetQueueUserAclsInfoResponse.newInstance(null);
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetDelegationTokenResponse.newInstance(null);
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return RenewDelegationTokenResponse.newInstance(0);
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return CancelDelegationTokenResponse.newInstance();
}
@@ -502,36 +575,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return MoveApplicationAcrossQueuesResponse.newInstance();
}
@Override
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetApplicationAttemptsResponse.newInstance(null);
}
@Override
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetContainerReportResponse.newInstance(null);
}
@Override
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return GetContainersResponse.newInstance(null);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return ReservationSubmissionResponse.newInstance();
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return ReservationListResponse
.newInstance(new ArrayList<ReservationAllocationState>());
}
@@ -539,18 +630,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return ReservationUpdateResponse.newInstance();
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return ReservationDeleteResponse.newInstance();
}
@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetNodesToLabelsResponse
.newInstance(new HashMap<NodeId, Set<String>>());
}
@@ -558,18 +658,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetClusterNodeLabelsResponse.newInstance(new ArrayList<NodeLabel>());
}
@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetLabelsToNodesResponse.newInstance(null);
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return GetNewReservationResponse
.newInstance(ReservationId.newInstance(0, 0));
}
@@ -577,6 +686,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return FailApplicationAttemptResponse.newInstance();
}
@@ -584,12 +696,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return UpdateApplicationPriorityResponse.newInstance(null);
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return new SignalContainerResponsePBImpl();
}
@@ -597,18 +715,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return UpdateApplicationTimeoutsResponse.newInstance();
}
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException {
+
+ validateRunning();
+
return RefreshQueuesResponse.newInstance();
}
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws StandbyException, YarnException, IOException {
+
+ validateRunning();
+
return RefreshNodesResponse.newInstance();
}
@@ -616,6 +743,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws StandbyException, YarnException, IOException {
+
+ validateRunning();
+
return RefreshSuperUserGroupsConfigurationResponse.newInstance();
}
@@ -623,36 +753,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException {
+
+ validateRunning();
+
return RefreshUserToGroupsMappingsResponse.newInstance();
}
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return RefreshAdminAclsResponse.newInstance();
}
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return RefreshServiceAclsResponse.newInstance();
}
@Override
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return UpdateNodeResourceResponse.newInstance();
}
@Override
public RefreshNodesResourcesResponse refreshNodesResources(
RefreshNodesResourcesRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return RefreshNodesResourcesResponse.newInstance();
}
@Override
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return AddToClusterNodeLabelsResponse.newInstance();
}
@@ -660,12 +808,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return RemoveFromClusterNodeLabelsResponse.newInstance();
}
@Override
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+
+ validateRunning();
+
return ReplaceLabelsOnNodeResponse.newInstance();
}
@@ -673,6 +827,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException {
+
+ validateRunning();
+
return CheckForDecommissioningNodesResponse.newInstance(null);
}
@@ -680,11 +837,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request)
throws YarnException, IOException {
+
+ validateRunning();
+
return RefreshClusterMaxPriorityResponse.newInstance();
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
+
+ validateRunning();
+
return new String[0];
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
index c7a7767..d09ba75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
@@ -26,6 +26,7 @@ import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -87,4 +88,31 @@ public abstract class BaseRouterPoliciesTest
Assert.assertEquals(removed, chosen);
}
}
+
+ /**
+ * This test validates the correctness of blacklist logic in case the cluster
+ * has no active subclusters.
+ */
+ @Test
+ public void testAllBlacklistSubcluster() throws YarnException {
+ FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
+ ApplicationSubmissionContext applicationSubmissionContext =
+ ApplicationSubmissionContext.newInstance(null, null, null, null, null,
+ false, false, 0, Resources.none(), null, false, null, null);
+ Map<SubClusterId, SubClusterInfo> activeSubClusters =
+ getActiveSubclusters();
+ if (activeSubClusters != null && activeSubClusters.size() > 1
+ && !(localPolicy instanceof RejectRouterPolicy)) {
+ List<SubClusterId> blacklistSubclusters =
+ new ArrayList<SubClusterId>(activeSubClusters.keySet());
+ try {
+ localPolicy.getHomeSubcluster(applicationSubmissionContext,
+ blacklistSubclusters);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage()
+ .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
index 649a61b..423bf86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
@@ -69,7 +70,7 @@ public class FederationStateStoreTestUtil {
SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability");
}
- private void registerSubCluster(SubClusterId subClusterId)
+ public void registerSubCluster(SubClusterId subClusterId)
throws YarnException {
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
@@ -164,4 +165,16 @@ public class FederationStateStoreTestUtil {
return result.getPolicyConfiguration();
}
+ public void deregisterAllSubClusters() throws YarnException {
+ for (SubClusterId sc : getAllSubClusterIds(true)) {
+ deRegisterSubCluster(sc);
+ }
+ }
+
+ private void deRegisterSubCluster(SubClusterId subClusterId)
+ throws YarnException {
+ stateStore.deregisterSubCluster(SubClusterDeregisterRequest
+ .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
index f9169e1..f27b2b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
@@ -58,6 +58,11 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
new file mode 100644
index 0000000..cc96da6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common utility methods used by the Router server.
+ *
+ */
+@Private
+@Unstable
+public final class RouterServerUtil {
+
+ /** Disable constructor. */
+ private RouterServerUtil() {
+ }
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(RouterServerUtil.class);
+
+ /**
+ * Throws an exception due to an error.
+ *
+ * @param errMsg the error message
+ * @param t the throwable raised in the called class.
+ * @throws YarnException on failure
+ */
+ @Public
+ @Unstable
+ public static void logAndThrowException(String errMsg, Throwable t)
+ throws YarnException {
+ if (t != null) {
+ LOG.error(errMsg, t);
+ throw new YarnException(errMsg, t);
+ } else {
+ LOG.error(errMsg);
+ throw new YarnException(errMsg);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
index 5980b03..01ba3bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
@@ -18,7 +18,13 @@
package org.apache.hadoop.yarn.server.router.clientrm;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements the {@link ClientRequestInterceptor} interface and provides common
@@ -28,9 +34,16 @@ import org.apache.hadoop.conf.Configuration;
*/
public abstract class AbstractClientRequestInterceptor
implements ClientRequestInterceptor {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractClientRequestInterceptor.class);
+
private Configuration conf;
private ClientRequestInterceptor nextInterceptor;
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected UserGroupInformation user = null;
+
/**
* Sets the {@link ClientRequestInterceptor} in the chain.
*/
@@ -63,9 +76,10 @@ public abstract class AbstractClientRequestInterceptor
* Initializes the {@link ClientRequestInterceptor}.
*/
@Override
- public void init(String user) {
+ public void init(String userName) {
+ setupUser(userName);
if (this.nextInterceptor != null) {
- this.nextInterceptor.init(user);
+ this.nextInterceptor.init(userName);
}
}
@@ -87,4 +101,27 @@ public abstract class AbstractClientRequestInterceptor
return this.nextInterceptor;
}
+ private void setupUser(String userName) {
+
+ try {
+ // Do not create a proxy user if user name matches the user name on
+ // current UGI
+ if (userName.equalsIgnoreCase(
+ UserGroupInformation.getCurrentUser().getUserName())) {
+ user = UserGroupInformation.getCurrentUser();
+ } else {
+ user = UserGroupInformation.createProxyUser(userName,
+ UserGroupInformation.getCurrentUser());
+ }
+ } catch (IOException e) {
+ String message = "Error while creating Router ClientRM Service for user:";
+ if (user != null) {
+ message += ", user: " + user;
+ }
+
+ LOG.info(message);
+ throw new YarnRuntimeException(message, e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
index 9e2bfed..71de6b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@@ -85,8 +84,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRespo
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -98,27 +95,14 @@ import com.google.common.annotations.VisibleForTesting;
*/
public class DefaultClientRequestInterceptor
extends AbstractClientRequestInterceptor {
- private static final Logger LOG =
- LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
private ApplicationClientProtocol clientRMProxy;
- private UserGroupInformation user = null;
@Override
public void init(String userName) {
super.init(userName);
- try {
- // Do not create a proxy user if user name matches the user name on
- // current UGI
- if (userName.equalsIgnoreCase(
- UserGroupInformation.getCurrentUser().getUserName())) {
- user = UserGroupInformation.getCurrentUser();
- } else {
- user = UserGroupInformation.createProxyUser(userName,
- UserGroupInformation.getCurrentUser());
- }
-
- final Configuration conf = this.getConf();
+ final Configuration conf = this.getConf();
+ try {
clientRMProxy =
user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
@Override
@@ -127,16 +111,9 @@ public class DefaultClientRequestInterceptor
ApplicationClientProtocol.class);
}
});
- } catch (IOException e) {
- String message = "Error while creating Router ClientRM Service for user:";
- if (user != null) {
- message += ", user: " + user;
- }
-
- LOG.info(message);
- throw new YarnRuntimeException(message, e);
} catch (Exception e) {
- throw new YarnRuntimeException(e);
+ throw new YarnRuntimeException(
+ "Unable to create the interface to reach the YarnRM", e);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
new file mode 100644
index 0000000..ecf53ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -0,0 +1,684 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Extends the {@code AbstractRequestInterceptorClient} class and provides an
+ * implementation for federation of YARN RM and scaling an application across
+ * multiple YARN SubClusters. All the federation specific implementation is
+ * encapsulated in this class. This is always the last intercepter in the chain.
+ */
+public class FederationClientInterceptor
+ extends AbstractClientRequestInterceptor {
+
+ /*
+ * TODO YARN-6740 Federation Router (hiding multiple RMs for
+ * ApplicationClientProtocol) phase 2.
+ *
+ * The current implementation finalized the main 4 calls (getNewApplication,
+ * submitApplication, forceKillApplication and getApplicationReport). Those
+ * allow us to execute applications E2E.
+ */
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FederationClientInterceptor.class);
+
+ private int numSubmitRetries;
+ private Map<SubClusterId, ApplicationClientProtocol> clientRMProxies;
+ private FederationStateStoreFacade federationFacade;
+ private Random rand;
+ private RouterPolicyFacade policyFacade;
+
+ @Override
+ public void init(String userName) {
+ super.init(userName);
+
+ federationFacade = FederationStateStoreFacade.getInstance();
+ rand = new Random(System.currentTimeMillis());
+
+ final Configuration conf = this.getConf();
+
+ try {
+ policyFacade = new RouterPolicyFacade(conf, federationFacade,
+ this.federationFacade.getSubClusterResolver(), null);
+ } catch (FederationPolicyInitializationException e) {
+ LOG.error(e.getMessage());
+ }
+
+ numSubmitRetries =
+ conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
+
+ clientRMProxies =
+ new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
+
+ }
+
+ @Override
+ public void setNextInterceptor(ClientRequestInterceptor next) {
+ throw new YarnRuntimeException("setNextInterceptor is being called on "
+ + "FederationClientRequestInterceptor, which should be the last one "
+ + "in the chain. Check if the interceptor pipeline configuration "
+ + "is correct");
+ }
+
+ @VisibleForTesting
+ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
+ SubClusterId subClusterId) throws YarnException {
+
+ if (clientRMProxies.containsKey(subClusterId)) {
+ return clientRMProxies.get(subClusterId);
+ }
+
+ ApplicationClientProtocol clientRMProxy = null;
+ try {
+ clientRMProxy =
+ user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
+ @Override
+ public ApplicationClientProtocol run() throws Exception {
+ return ClientRMProxy.createRMProxy(getConf(),
+ ApplicationClientProtocol.class);
+ }
+ });
+ } catch (Exception e) {
+ RouterServerUtil.logAndThrowException(
+ "Unable to create the interface to reach the SubCluster "
+ + subClusterId,
+ e);
+ }
+
+ clientRMProxies.put(subClusterId, clientRMProxy);
+ return clientRMProxy;
+ }
+
+ private SubClusterId getRandomActiveSubCluster(
+ Map<SubClusterId, SubClusterInfo> activeSubclusters)
+ throws YarnException {
+
+ if (activeSubclusters == null || activeSubclusters.size() < 1) {
+ RouterServerUtil.logAndThrowException(
+ FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
+ }
+ List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+
+ return list.get(rand.nextInt(list.size()));
+ }
+
+ /**
+ * Yarn Router forwards every getNewApplication requests to any RM. During
+ * this operation there will be no communication with the State Store. The
+ * Router will forward the requests to any SubCluster. The Router will retry
+ * to submit the request on #numSubmitRetries different SubClusters. The
+ * SubClusters are randomly chosen from the active ones.
+ *
+ * Possible failures and behaviors:
+ *
+ * Client: identical behavior as {@code ClientRMService}.
+ *
+ * Router: the Client will timeout and resubmit.
+ *
+ * ResourceManager: the Router will timeout and contacts another RM.
+ *
+ * StateStore: not in the execution.
+ */
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ Map<SubClusterId, SubClusterInfo> subClustersActive =
+ federationFacade.getSubClusters(true);
+
+ for (int i = 0; i < numSubmitRetries; ++i) {
+ SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+ LOG.debug(
+ "getNewApplication try #" + i + " on SubCluster " + subClusterId);
+ ApplicationClientProtocol clientRMProxy =
+ getClientRMProxyForSubCluster(subClusterId);
+ GetNewApplicationResponse response = null;
+ try {
+ response = clientRMProxy.getNewApplication(request);
+ } catch (Exception e) {
+ LOG.warn("Unable to create a new ApplicationId in SubCluster "
+ + subClusterId.getId(), e);
+ }
+
+ if (response != null) {
+ return response;
+ } else {
+ // Empty response from the ResourceManager.
+ // Blacklist this subcluster for this request.
+ subClustersActive.remove(subClusterId);
+ }
+
+ }
+
+ String errMsg = "Fail to create a new application.";
+ LOG.error(errMsg);
+ throw new YarnException(errMsg);
+ }
+
+ /**
+ * Today, in YARN there are no checks of any applicationId submitted.
+ *
+ * Base scenarios:
+ *
+ * The Client submits an application to the Router. • The Router selects one
+ * SubCluster to forward the request. • The Router inserts a tuple into
+ * StateStore with the selected SubCluster (e.g. SC1) and the appId. • The
+ * State Store replies with the selected SubCluster (e.g. SC1). • The Router
+ * submits the request to the selected SubCluster.
+ *
+ * In case of State Store failure:
+ *
+ * The client submits an application to the Router. • The Router selects one
+ * SubCluster to forward the request. • The Router inserts a tuple into State
+ * Store with the selected SubCluster (e.g. SC1) and the appId. • Due to the
+ * State Store down the Router times out and it will retry depending on the
+ * FederationFacade settings. • The Router replies to the client with an error
+ * message.
+ *
+ * If State Store fails after inserting the tuple: identical behavior as
+ * {@code ClientRMService}.
+ *
+ * In case of Router failure:
+ *
+ * Scenario 1 – Crash before submission to the ResourceManager
+ *
+ * The Client submits an application to the Router. • The Router selects one
+ * SubCluster to forward the request. • The Router inserts a tuple into State
+ * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+ * crashes. • The Client timeouts and resubmits the application. • The Router
+ * selects one SubCluster to forward the request. • The Router inserts a tuple
+ * into State Store with the selected SubCluster (e.g. SC2) and the appId. •
+ * Because the tuple is already inserted in the State Store, it returns the
+ * previous selected SubCluster (e.g. SC1). • The Router submits the request
+ * to the selected SubCluster (e.g. SC1).
+ *
+ * Scenario 2 – Crash after submission to the ResourceManager
+ *
+ * • The Client submits an application to the Router. • The Router selects one
+ * SubCluster to forward the request. • The Router inserts a tuple into State
+ * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+ * submits the request to the selected SubCluster. • The Router crashes. • The
+ * Client timeouts and resubmit the application. • The Router selects one
+ * SubCluster to forward the request. • The Router inserts a tuple into State
+ * Store with the selected SubCluster (e.g. SC2) and the appId. • The State
+ * Store replies with the selected SubCluster (e.g. SC1). • The Router submits
+ * the request to the selected SubCluster (e.g. SC1). When a client re-submits
+ * the same application to the same RM, it does not raise an exception and
+ * replies with operation successful message.
+ *
+ * In case of Client failure: identical behavior as {@code ClientRMService}.
+ *
+ * In case of ResourceManager failure:
+ *
+ * The Client submits an application to the Router. • The Router selects one
+ * SubCluster to forward the request. • The Router inserts a tuple into State
+ * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+ * submits the request to the selected SubCluster. • The entire SubCluster is
+ * down – all the RMs in HA or the master RM is not reachable. • The Router
+ * times out. • The Router selects a new SubCluster to forward the request. •
+ * The Router update a tuple into State Store with the selected SubCluster
+ * (e.g. SC2) and the appId. • The State Store replies with OK answer. • The
+ * Router submits the request to the selected SubCluster (e.g. SC2).
+ */
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ if (request == null || request.getApplicationSubmissionContext() == null
+ || request.getApplicationSubmissionContext()
+ .getApplicationId() == null) {
+ RouterServerUtil
+ .logAndThrowException("Missing submitApplication request or "
+ + "applicationSubmissionContex information.", null);
+ }
+
+ ApplicationId applicationId =
+ request.getApplicationSubmissionContext().getApplicationId();
+
+ List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
+
+ for (int i = 0; i < numSubmitRetries; ++i) {
+
+ SubClusterId subClusterId = policyFacade.getHomeSubcluster(
+ request.getApplicationSubmissionContext(), blacklist);
+ LOG.info("submitApplication appId" + applicationId + " try #" + i
+ + " on SubCluster " + subClusterId);
+
+ ApplicationHomeSubCluster appHomeSubCluster =
+ ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+
+ if (i == 0) {
+ try {
+ // persist the mapping of applicationId and the subClusterId which has
+ // been selected as its home
+ subClusterId =
+ federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
+ } catch (YarnException e) {
+ String message = "Unable to insert the ApplicationId " + applicationId
+ + " into the FederationStateStore";
+ RouterServerUtil.logAndThrowException(message, e);
+ }
+ } else {
+ try {
+ // update the mapping of applicationId and the home subClusterId to
+ // the new subClusterId we have selected
+ federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
+ } catch (YarnException e) {
+ String message = "Unable to update the ApplicationId " + applicationId
+ + " into the FederationStateStore";
+ SubClusterId subClusterIdInStateStore =
+ federationFacade.getApplicationHomeSubCluster(applicationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Application " + applicationId
+ + " already submitted on SubCluster " + subClusterId);
+ } else {
+ RouterServerUtil.logAndThrowException(message, e);
+ }
+ }
+ }
+
+ ApplicationClientProtocol clientRMProxy =
+ getClientRMProxyForSubCluster(subClusterId);
+
+ SubmitApplicationResponse response = null;
+ try {
+ response = clientRMProxy.submitApplication(request);
+ } catch (Exception e) {
+ LOG.warn("Unable to submit the application " + applicationId
+ + "to SubCluster " + subClusterId.getId(), e);
+ }
+
+ if (response != null) {
+ LOG.info("Application "
+ + request.getApplicationSubmissionContext().getApplicationName()
+ + " with appId " + applicationId + " submitted on " + subClusterId);
+ return response;
+ } else {
+ // Empty response from the ResourceManager.
+ // Blacklist this subcluster for this request.
+ blacklist.add(subClusterId);
+ }
+ }
+
+ String errMsg = "Application "
+ + request.getApplicationSubmissionContext().getApplicationName()
+ + " with appId " + applicationId + " failed to be submitted.";
+ LOG.error(errMsg);
+ throw new YarnException(errMsg);
+ }
+
+ /**
+ * The Yarn Router will forward to the respective Yarn RM in which the AM is
+ * running.
+ *
+ * Possible failures and behaviors:
+ *
+ * Client: identical behavior as {@code ClientRMService}.
+ *
+ * Router: the Client will timeout and resubmit the request.
+ *
+ * ResourceManager: the Router will timeout and the call will fail.
+ *
+ * State Store: the Router will timeout and it will retry depending on the
+ * FederationFacade settings - if the failure happened before the select
+ * operation.
+ */
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+
+ if (request == null || request.getApplicationId() == null) {
+ RouterServerUtil.logAndThrowException(
+ "Missing forceKillApplication request or ApplicationId.", null);
+ }
+ ApplicationId applicationId = request.getApplicationId();
+ SubClusterId subClusterId = null;
+
+ try {
+ subClusterId = federationFacade
+ .getApplicationHomeSubCluster(request.getApplicationId());
+ } catch (YarnException e) {
+ RouterServerUtil.logAndThrowException("Application " + applicationId
+ + " does not exist in FederationStateStore", e);
+ }
+
+ ApplicationClientProtocol clientRMProxy =
+ getClientRMProxyForSubCluster(subClusterId);
+
+ KillApplicationResponse response = null;
+ try {
+ LOG.info("forceKillApplication " + applicationId + " on SubCluster "
+ + subClusterId);
+ response = clientRMProxy.forceKillApplication(request);
+ } catch (Exception e) {
+ LOG.error("Unable to kill the application report for "
+ + request.getApplicationId() + "to SubCluster "
+ + subClusterId.getId(), e);
+ throw e;
+ }
+
+ if (response == null) {
+ LOG.error("No response when attempting to kill the application "
+ + applicationId + " to SubCluster " + subClusterId.getId());
+ }
+
+ return response;
+ }
+
+ /**
+ * The Yarn Router will forward to the respective Yarn RM in which the AM is
+ * running.
+ *
+ * Possible failure:
+ *
+ * Client: identical behavior as {@code ClientRMService}.
+ *
+ * Router: the Client will timeout and resubmit the request.
+ *
+ * ResourceManager: the Router will timeout and the call will fail.
+ *
+ * State Store: the Router will timeout and it will retry depending on the
+ * FederationFacade settings - if the failure happened before the select
+ * operation.
+ */
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+
+ if (request == null || request.getApplicationId() == null) {
+ RouterServerUtil.logAndThrowException(
+ "Missing getApplicationReport request or applicationId information.",
+ null);
+ }
+
+ SubClusterId subClusterId = null;
+
+ try {
+ subClusterId = federationFacade
+ .getApplicationHomeSubCluster(request.getApplicationId());
+ } catch (YarnException e) {
+ RouterServerUtil
+ .logAndThrowException("Application " + request.getApplicationId()
+ + " does not exist in FederationStateStore", e);
+ }
+
+ ApplicationClientProtocol clientRMProxy =
+ getClientRMProxyForSubCluster(subClusterId);
+
+ GetApplicationReportResponse response = null;
+ try {
+ response = clientRMProxy.getApplicationReport(request);
+ } catch (Exception e) {
+ LOG.error("Unable to get the application report for "
+ + request.getApplicationId() + "to SubCluster "
+ + subClusterId.getId(), e);
+ throw e;
+ }
+
+ if (response == null) {
+ LOG.error("No response when attempting to retrieve the report of "
+ + "the application " + request.getApplicationId() + " to SubCluster "
+ + subClusterId.getId());
+ }
+
+ return response;
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
index 7e15084..7fc4719 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
@@ -119,29 +119,41 @@ public abstract class BaseRouterClientRMTest {
return this.clientrmService;
}
- @Before
- public void setUp() {
- this.conf = new YarnConfiguration();
+ protected YarnConfiguration createConfiguration() {
+ YarnConfiguration config = new YarnConfiguration();
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
- this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+ config.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + mockPassThroughInterceptorClass + ","
+ MockClientRequestInterceptor.class.getName());
- this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+ config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
TEST_MAX_CACHE_SIZE);
+ return config;
+ }
+ @Before
+ public void setUp() {
+ this.conf = createConfiguration();
this.dispatcher = new AsyncDispatcher();
this.dispatcher.init(conf);
this.dispatcher.start();
this.clientrmService = createAndStartRouterClientRMService();
}
+ public void setUpConfig() {
+ this.conf = createConfiguration();
+ }
+
+ protected Configuration getConf() {
+ return this.conf;
+ }
+
@After
public void tearDown() {
if (clientrmService != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org