You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/08/02 00:32:32 UTC
[38/50] [abbrv] hadoop git commit: YARN-6724. Add ability to
blacklist sub-clusters when invoking Routing policies. (Giovanni Matteo
Fumarola via Subru).
YARN-6724. Add ability to blacklist sub-clusters when invoking Routing policies. (Giovanni Matteo Fumarola via Subru).
(cherry picked from commit f8e5de59697cb78686f0e605dc7e93628b5f3297)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4cfec943
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4cfec943
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4cfec943
Branch: refs/heads/trunk
Commit: 4cfec943b177e2123a935e70d39776521883c2bc
Parents: 433ee44
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Jun 21 19:08:47 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Aug 1 17:28:26 2017 -0700
----------------------------------------------------------------------
.../federation/policies/RouterPolicyFacade.java | 15 +++++--
.../policies/router/FederationRouterPolicy.java | 18 +++++---
.../policies/router/HashBasedRouterPolicy.java | 22 ++++++++--
.../policies/router/LoadBasedRouterPolicy.java | 7 +++-
.../policies/router/PriorityRouterPolicy.java | 7 +++-
.../policies/router/RejectRouterPolicy.java | 26 ++++++++----
.../router/UniformRandomRouterPolicy.java | 23 +++++++++--
.../router/WeightedRandomRouterPolicy.java | 11 ++++-
.../policies/BaseFederationPoliciesTest.java | 2 +-
.../policies/TestRouterPolicyFacade.java | 12 +++---
.../policies/router/BaseRouterPoliciesTest.java | 43 +++++++++++++++++++-
.../router/TestHashBasedRouterPolicy.java | 2 +-
.../router/TestLoadBasedRouterPolicy.java | 2 +-
.../router/TestPriorityRouterPolicy.java | 2 +-
.../policies/router/TestRejectRouterPolicy.java | 4 +-
.../router/TestUniformRandomRouterPolicy.java | 2 +-
.../router/TestWeightedRandomRouterPolicy.java | 2 +-
17 files changed, 157 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index 5e31a08..44c1b10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -110,16 +111,22 @@ public class RouterPolicyFacade {
* This method provides a wrapper of all policy functionalities for routing .
* Internally it manages configuration changes, and policy init/reinit.
*
- * @param appSubmissionContext the application to route.
+ * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+ * has to be routed to an appropriate subCluster for execution.
*
- * @return the id of the subcluster that will be the "home" for this
+ * @param blackListSubClusters the list of subClusters as identified by
+ * {@link SubClusterId} to blackList from the selection of the home
+ * subCluster.
+ *
+ * @return the {@link SubClusterId} that will be the "home" for this
* application.
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this app.
*/
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext,
+ List<SubClusterId> blackListSubClusters) throws YarnException {
// the maps are concurrent, but we need to protect from reset()
// reinitialization mid-execution by creating a new reference local to this
@@ -186,7 +193,7 @@ public class RouterPolicyFacade {
+ "and no default specified.");
}
- return policy.getHomeSubcluster(appSubmissionContext);
+ return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
index 90ea0a8..9325bd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.List;
+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
@@ -29,16 +31,22 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
/**
- * Determines the sub-cluster that the user application submision should be
+ * Determines the sub-cluster that the user application submission should be
* routed to.
*
- * @param appSubmissionContext the context for the app being submitted.
+ * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+ * has to be routed to an appropriate subCluster for execution.
+ *
+ * @param blackListSubClusters the list of subClusters as identified by
+ * {@link SubClusterId} to blackList from the selection of the home
+ * subCluster.
*
- * @return the sub-cluster as identified by {@link SubClusterId} to route the
- * request to.
+ * @return the {@link SubClusterId} that will be the "home" for this
+ * application.
*
* @throws YarnException if the policy cannot determine a viable subcluster.
*/
SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext) throws YarnException;
+ ApplicationSubmissionContext appSubmissionContext,
+ List<SubClusterId> blackListSubClusters) throws YarnException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
index e40e87e..257a9fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
@@ -55,19 +55,35 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy {
* sub-cluster, as far as the number of active sub-cluster and their names
* remain the same.
*
- * @param appSubmissionContext the context for the app being submitted.
+ * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+ * has to be routed to an appropriate subCluster for execution.
*
- * @return a hash-based chosen subcluster.
+ * @param blackListSubClusters the list of subClusters as identified by
+ * {@link SubClusterId} to blackList from the selection of the home
+ * subCluster.
+ *
+ * @return a hash-based chosen {@link SubClusterId} that will be the "home"
+ * for this application.
*
* @throws YarnException if there are no active subclusters.
*/
+ @Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext,
+ List<SubClusterId> blackListSubClusters) throws YarnException {
// throws if no active subclusters available
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
+ if (blackListSubClusters != null) {
+
+ // Remove from the active SubClusters from StateStore the blacklisted ones
+ for (SubClusterId scId : blackListSubClusters) {
+ activeSubclusters.remove(scId);
+ }
+ }
+
validate(appSubmissionContext);
int chosenPosition = Math.abs(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index 2ca15bf..c124001 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -62,7 +63,8 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext,
+ List<SubClusterId> blacklist) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
@@ -76,6 +78,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
long currBestMem = -1;
for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters
.entrySet()) {
+ if (blacklist != null && blacklist.contains(entry.getKey())) {
+ continue;
+ }
SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
if (weights.containsKey(id) && weights.get(id) > 0) {
long availableMemory = getAvailableMemory(entry.getValue());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index 13d9140..59f8767 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -34,7 +35,8 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext,
+ List<SubClusterId> blacklist) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
@@ -50,6 +52,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
Float currentBest = Float.MIN_VALUE;
for (SubClusterId id : activeSubclusters.keySet()) {
SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
+ if (blacklist != null && blacklist.contains(id)) {
+ continue;
+ }
if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) {
currentBest = weights.get(idInfo);
chosen = id;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
index faf3279..b4c0192 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.List;
+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -27,8 +29,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/**
* This {@link FederationRouterPolicy} simply rejects all incoming requests.
- * This is useful to prevent applications running in a queue to be run
- * anywhere in the federated cluster.
+ * This is useful to prevent applications running in a queue to be run anywhere
+ * in the federated cluster.
*/
public class RejectRouterPolicy extends AbstractRouterPolicy {
@@ -44,23 +46,31 @@ public class RejectRouterPolicy extends AbstractRouterPolicy {
/**
* The policy always reject requests.
*
- * @param appSubmissionContext the context for the app being submitted.
+ * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+ * has to be routed to an appropriate subCluster for execution.
+ *
+ * @param blackListSubClusters the list of subClusters as identified by
+ * {@link SubClusterId} to blackList from the selection of the home
+ * subCluster.
*
* @return (never).
*
- * @throws YarnException (always) to prevent applications in this queue to
- * be run anywhere in the federated cluster.
+ * @throws YarnException (always) to prevent applications in this queue to be
+ * run anywhere in the federated cluster.
*/
+ @Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext,
+ List<SubClusterId> blackListSubClusters) throws YarnException {
// run standard validation, as error might differ
validate(appSubmissionContext);
throw new FederationPolicyException("The policy configured for this queue"
+ " (" + appSubmissionContext.getQueue() + ") reject all routing "
- + "requests by construction. Application " + appSubmissionContext
- .getApplicationId() + " cannot be routed to any RM.");
+ + "requests by construction. Application "
+ + appSubmissionContext.getApplicationId()
+ + " cannot be routed to any RM.");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index d820449..bc729b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -59,18 +59,24 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
}
/**
- * Simply picks a random active subcluster to start the AM (this does NOT
+ * Simply picks a random active subCluster to start the AM (this does NOT
* depend on the weights in the policy).
*
- * @param appSubmissionContext the context for the app being submitted
- * (ignored).
+ * @param appSubmissionContext the {@link ApplicationSubmissionContext} that
+ * has to be routed to an appropriate subCluster for execution.
+ *
+ * @param blackListSubClusters the list of subClusters as identified by
+ * {@link SubClusterId} to blackList from the selection of the home
+ * subCluster.
*
* @return a randomly chosen subcluster.
*
* @throws YarnException if there are no active subclusters.
*/
+ @Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext,
+ List<SubClusterId> blackListSubClusters) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
@@ -79,6 +85,15 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
getActiveSubclusters();
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+
+ if (blackListSubClusters != null) {
+
+ // Remove from the active SubClusters from StateStore the blacklisted ones
+ for (SubClusterId scId : blackListSubClusters) {
+ list.remove(scId);
+ }
+ }
+
return list.get(rand.nextInt(list.size()));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index 5727134..7f230a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -41,7 +42,8 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext,
+ List<SubClusterId> blacklist) throws YarnException {
// null checks and default-queue behavior
validate(appSubmissionContext);
@@ -58,6 +60,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
float totActiveWeight = 0;
for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
+ if (blacklist != null && blacklist.contains(entry.getKey().toId())) {
+ continue;
+ }
if (entry.getKey() != null
&& activeSubclusters.containsKey(entry.getKey().toId())) {
totActiveWeight += entry.getValue();
@@ -66,6 +71,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
float lookupValue = rand.nextFloat() * totActiveWeight;
for (SubClusterId id : activeSubclusters.keySet()) {
+ if (blacklist != null && blacklist.contains(id)) {
+ continue;
+ }
SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
if (weights.containsKey(idInfo)) {
lookupValue -= weights.get(idInfo);
@@ -77,4 +85,5 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
// should never happen
return null;
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index 6bd8bf0..23978ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -104,7 +104,7 @@ public abstract class BaseFederationPoliciesTest {
ConfigurableFederationPolicy localPolicy = getPolicy();
if (localPolicy instanceof FederationRouterPolicy) {
((FederationRouterPolicy) localPolicy)
- .getHomeSubcluster(getApplicationSubmissionContext());
+ .getHomeSubcluster(getApplicationSubmissionContext(), null);
} else {
String[] hosts = new String[] {"host1", "host2"};
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
index 5fa02d6..d0e2dec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
@@ -95,7 +95,7 @@ public class TestRouterPolicyFacade {
// first call runs using standard UniformRandomRouterPolicy
SubClusterId chosen =
- routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof UniformRandomRouterPolicy);
@@ -107,7 +107,7 @@ public class TestRouterPolicyFacade {
.newInstance(getPriorityPolicy(queue1)));
// second call is routed by new policy PriorityRouterPolicy
- chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(chosen.equals(subClusterIds.get(0)));
Assert.assertTrue(routerFacade.globalPolicyMap
.get(queue1) instanceof PriorityRouterPolicy);
@@ -126,7 +126,7 @@ public class TestRouterPolicyFacade {
// when invoked it returns the expected SubClusterId.
SubClusterId chosen =
- routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
// now the caching of policies must have added an entry for this queue
@@ -160,19 +160,19 @@ public class TestRouterPolicyFacade {
String uninitQueue = "non-initialized-queue";
when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue);
SubClusterId chosen =
- routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
// empty string
when(applicationSubmissionContext.getQueue()).thenReturn("");
- chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
// null queue also falls back to default
when(applicationSubmissionContext.getQueue()).thenReturn(null);
- chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext);
+ chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertTrue(subClusterIds.contains(chosen));
Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
index 2e7a0af..c7a7767 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
@@ -18,11 +18,19 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@@ -40,12 +48,43 @@ public abstract class BaseRouterPoliciesTest
ApplicationSubmissionContext.newInstance(null, null, null, null, null,
false, false, 0, Resources.none(), null, false, null, null);
SubClusterId chosen =
- localPolicy.getHomeSubcluster(applicationSubmissionContext);
+ localPolicy.getHomeSubcluster(applicationSubmissionContext, null);
Assert.assertNotNull(chosen);
}
@Test(expected = FederationPolicyException.class)
public void testNullAppContext() throws YarnException {
- ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null);
+ ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null, null);
+ }
+
+ @Test
+ public void testBlacklistSubcluster() throws YarnException {
+ FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
+ ApplicationSubmissionContext applicationSubmissionContext =
+ ApplicationSubmissionContext.newInstance(null, null, null, null, null,
+ false, false, 0, Resources.none(), null, false, null, null);
+ Map<SubClusterId, SubClusterInfo> activeSubClusters =
+ getActiveSubclusters();
+ if (activeSubClusters != null && activeSubClusters.size() > 1
+ && !(localPolicy instanceof RejectRouterPolicy)) {
+ // blacklist all the active subcluster but one.
+ Random random = new Random();
+ List<SubClusterId> blacklistSubclusters =
+ new ArrayList<SubClusterId>(activeSubClusters.keySet());
+ SubClusterId removed = blacklistSubclusters
+ .remove(random.nextInt(blacklistSubclusters.size()));
+ // bias LoadBasedRouterPolicy
+ getPolicyInfo().getRouterPolicyWeights()
+ .put(new SubClusterIdInfo(removed), 1.0f);
+ FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+ getPolicyInfo(), getActiveSubclusters());
+
+ SubClusterId chosen = localPolicy.getHomeSubcluster(
+ applicationSubmissionContext, blacklistSubclusters);
+
+ // check that the selected sub-cluster is only one not blacklisted
+ Assert.assertNotNull(chosen);
+ Assert.assertEquals(removed, chosen);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
index af7fe43..ee3e09d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
@@ -70,7 +70,7 @@ public class TestHashBasedRouterPolicy extends BaseRouterPoliciesTest {
for (int i = 0; i < jobPerSub * numSubclusters; i++) {
when(applicationSubmissionContext.getQueue()).thenReturn("queue" + i);
chosen = ((FederationRouterPolicy) getPolicy())
- .getHomeSubcluster(applicationSubmissionContext);
+ .getHomeSubcluster(applicationSubmissionContext, null);
counter.get(chosen).addAndGet(1);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
index b70b4aa..dc8f99b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
@@ -97,7 +97,7 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
public void testLoadIsRespected() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
- .getHomeSubcluster(getApplicationSubmissionContext());
+ .getHomeSubcluster(getApplicationSubmissionContext(), null);
// check the "planted" best cluster is chosen
Assert.assertEquals("sc05", chosen.getId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
index 42d919d..3c036c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
@@ -78,7 +78,7 @@ public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest {
@Test
public void testPickLowestWeight() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
- .getHomeSubcluster(getApplicationSubmissionContext());
+ .getHomeSubcluster(getApplicationSubmissionContext(), null);
Assert.assertEquals("sc5", chosen.getId());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
index 049ebbf..1747f73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java
@@ -47,7 +47,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
@Test(expected = FederationPolicyException.class)
public void testNoClusterIsChosen() throws YarnException {
((FederationRouterPolicy) getPolicy())
- .getHomeSubcluster(getApplicationSubmissionContext());
+ .getHomeSubcluster(getApplicationSubmissionContext(), null);
}
@Override
@@ -57,7 +57,7 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(null, null, null, null, null,
false, false, 0, Resources.none(), null, false, null, null);
- localPolicy.getHomeSubcluster(applicationSubmissionContext);
+ localPolicy.getHomeSubcluster(applicationSubmissionContext, null);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
index b45aa2a..05490ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
@@ -57,7 +57,7 @@ public class TestUniformRandomRouterPolicy extends BaseRouterPoliciesTest {
@Test
public void testOneSubclusterIsChosen() throws YarnException {
SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
- .getHomeSubcluster(getApplicationSubmissionContext());
+ .getHomeSubcluster(getApplicationSubmissionContext(), null);
Assert.assertTrue(getActiveSubclusters().keySet().contains(chosen));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cfec943/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
index 09173e6..c969a30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
@@ -98,7 +98,7 @@ public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
for (float i = 0; i < numberOfDraws; i++) {
SubClusterId chosenId = ((FederationRouterPolicy) getPolicy())
- .getHomeSubcluster(getApplicationSubmissionContext());
+ .getHomeSubcluster(getApplicationSubmissionContext(), null);
counter.get(chosenId).incrementAndGet();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org