You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 00:58:26 UTC
[05/50] [abbrv] hadoop git commit: YARN-5325. Stateless ARMRMProxy
policies implementation. (Carlo Curino via Subru).
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index e57709f..5de749f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -17,8 +17,8 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.Map;
+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -30,34 +30,27 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.util.Map;
-
/**
* This implements a simple load-balancing policy. The policy "weights" are
* binary 0/1 values that enable/disable each sub-cluster, and the policy peaks
* the sub-cluster with the least load to forward this application.
*/
-public class LoadBasedRouterPolicy
- extends BaseWeightedRouterPolicy {
-
- private static final Log LOG =
- LogFactory.getLog(LoadBasedRouterPolicy.class);
+public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
@Override
- public void reinitialize(FederationPolicyInitializationContext
- federationPolicyContext)
+ public void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
// remember old policyInfo
WeightedPolicyInfo tempPolicy = getPolicyInfo();
- //attempt new initialization
- super.reinitialize(federationPolicyContext);
+ // attempt new initialization
+ super.reinitialize(policyContext);
- //check extra constraints
+ // check extra constraints
for (Float weight : getPolicyInfo().getRouterPolicyWeights().values()) {
if (weight != 0 && weight != 1) {
- //reset to old policyInfo if check fails
+ // reset to old policyInfo if check fails
setPolicyInfo(tempPolicy);
throw new FederationPolicyInitializationException(
this.getClass().getCanonicalName()
@@ -69,18 +62,16 @@ public class LoadBasedRouterPolicy
@Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext)
- throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
- Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
- .getRouterPolicyWeights();
+ Map<SubClusterIdInfo, Float> weights =
+ getPolicyInfo().getRouterPolicyWeights();
SubClusterIdInfo chosen = null;
long currBestMem = -1;
- for (Map.Entry<SubClusterId, SubClusterInfo> entry :
- activeSubclusters
+ for (Map.Entry<SubClusterId, SubClusterInfo> entry : activeSubclusters
.entrySet()) {
SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
if (weights.containsKey(id) && weights.get(id) > 0) {
@@ -95,8 +86,7 @@ public class LoadBasedRouterPolicy
return chosen.toId();
}
- private long getAvailableMemory(SubClusterInfo value)
- throws YarnException {
+ private long getAvailableMemory(SubClusterInfo value) throws YarnException {
try {
long mem = -1;
JSONObject obj = new JSONObject(value.getCapability());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index a8ac5f7..bc3a1f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -17,39 +17,32 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.Map;
+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-import java.util.Map;
-
/**
* This implements a policy that interprets "weights" as a ordered list of
* preferences among sub-clusters. Highest weight among active subclusters is
* chosen.
*/
-public class PriorityRouterPolicy
- extends BaseWeightedRouterPolicy {
-
- private static final Log LOG =
- LogFactory.getLog(PriorityRouterPolicy.class);
+public class PriorityRouterPolicy extends AbstractRouterPolicy {
@Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext)
- throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
// This finds the sub-cluster with the highest weight among the
// currently active ones.
- Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
- .getRouterPolicyWeights();
+ Map<SubClusterIdInfo, Float> weights =
+ getPolicyInfo().getRouterPolicyWeights();
SubClusterId chosen = null;
Float currentBest = Float.MIN_VALUE;
for (SubClusterId id : activeSubclusters.keySet()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index 1774961..b8f9cc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -17,6 +17,11 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -25,11 +30,6 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
/**
* This simple policy picks at uniform random among any of the currently active
* subclusters. This policy is easy to use and good for testing.
@@ -39,7 +39,7 @@ import java.util.Random;
* of the "weights", in which case the {@link UniformRandomRouterPolicy} send
* load to them, while {@code WeightedRandomRouterPolicy} does not.
*/
-public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
+public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
private Random rand;
@@ -49,14 +49,14 @@ public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
@Override
public void reinitialize(
- FederationPolicyInitializationContext federationPolicyContext)
+ FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator
- .validate(federationPolicyContext, this.getClass().getCanonicalName());
+ .validate(policyContext, this.getClass().getCanonicalName());
- //note: this overrides BaseWeighterRouterPolicy and ignores the weights
+ // note: this overrides AbstractRouterPolicy and ignores the weights
- setPolicyContext(federationPolicyContext);
+ setPolicyContext(policyContext);
}
/**
@@ -64,21 +64,19 @@ public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
* depend on the weights in the policy).
*
* @param appSubmissionContext the context for the app being submitted
- * (ignored).
+ * (ignored).
*
* @return a randomly chosen subcluster.
*
* @throws YarnException if there are no active subclusters.
*/
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext)
- throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
- List<SubClusterId> list =
- new ArrayList<>(activeSubclusters.keySet());
+ List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
return list.get(rand.nextInt(list.size()));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index 0777677..ac75ae9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -18,32 +18,30 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.util.Map;
+import java.util.Random;
+
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-
-import java.util.Map;
-import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This policy implements a weighted random sample among currently active
* sub-clusters.
*/
-public class WeightedRandomRouterPolicy
- extends BaseWeightedRouterPolicy {
+public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
- private static final Log LOG =
- LogFactory.getLog(WeightedRandomRouterPolicy.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WeightedRandomRouterPolicy.class);
private Random rand = new Random(System.currentTimeMillis());
@Override
public SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext)
- throws YarnException {
+ ApplicationSubmissionContext appSubmissionContext) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
@@ -52,13 +50,13 @@ public class WeightedRandomRouterPolicy
// changes dynamically (and this would unfairly spread the load to
// sub-clusters adjacent to an inactive one), hence we need to count/scan
// the list and based on weight pick the next sub-cluster.
- Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
- .getRouterPolicyWeights();
+ Map<SubClusterIdInfo, Float> weights =
+ getPolicyInfo().getRouterPolicyWeights();
float totActiveWeight = 0;
- for(Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()){
- if(entry.getKey()!=null && activeSubclusters.containsKey(entry.getKey()
- .toId())){
+ for (Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()) {
+ if (entry.getKey() != null
+ && activeSubclusters.containsKey(entry.getKey().toId())) {
totActiveWeight += entry.getValue();
}
}
@@ -73,7 +71,7 @@ public class WeightedRandomRouterPolicy
return id;
}
}
- //should never happen
+ // should never happen
return null;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java
index 5d0fcb6..e445ac3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/package-info.java
@@ -17,4 +17,3 @@
*/
/** Router policies. **/
package org.apache.hadoop.yarn.server.federation.policies.router;
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
index 8238633..6b4f60c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/resolver/AbstractSubClusterResolver.java
@@ -57,11 +57,11 @@ public abstract class AbstractSubClusterResolver implements SubClusterResolver {
return rackToSubClusters.get(rackname);
}
- protected Map<String, SubClusterId> getNodeToSubCluster() {
+ public Map<String, SubClusterId> getNodeToSubCluster() {
return nodeToSubCluster;
}
- protected Map<String, Set<SubClusterId>> getRackToSubClusters() {
+ public Map<String, Set<SubClusterId>> getRackToSubClusters() {
return rackToSubClusters;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index 8da92b9..ba897da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -22,14 +22,17 @@ import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -49,6 +52,7 @@ public abstract class BaseFederationPoliciesTest {
private ApplicationSubmissionContext applicationSubmissionContext =
mock(ApplicationSubmissionContext.class);
private Random rand = new Random();
+ private SubClusterId homeSubCluster;
@Test
public void testReinitilialize() throws YarnException {
@@ -88,16 +92,22 @@ public abstract class BaseFederationPoliciesTest {
getPolicy().reinitialize(fpc);
}
- @Test(expected = NoActiveSubclustersException.class)
+ @Test(expected = FederationPolicyException.class)
public void testNoSubclusters() throws YarnException {
// empty the activeSubclusters map
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
getPolicyInfo(), new HashMap<>());
- ConfigurableFederationPolicy currentPolicy = getPolicy();
- if (currentPolicy instanceof FederationRouterPolicy) {
- ((FederationRouterPolicy) currentPolicy)
+ ConfigurableFederationPolicy localPolicy = getPolicy();
+ if (localPolicy instanceof FederationRouterPolicy) {
+ ((FederationRouterPolicy) localPolicy)
.getHomeSubcluster(getApplicationSubmissionContext());
+ } else {
+ String[] hosts = new String[] {"host1", "host2" };
+ List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
+ .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
+ ((FederationAMRMProxyPolicy) localPolicy)
+ .splitResourceRequests(resourceRequests);
}
}
@@ -152,4 +162,12 @@ public abstract class BaseFederationPoliciesTest {
this.rand = rand;
}
+ public SubClusterId getHomeSubCluster() {
+ return homeSubCluster;
+ }
+
+ public void setHomeSubCluster(SubClusterId homeSubCluster) {
+ this.homeSubCluster = homeSubCluster;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
index e840b3f..c79fd2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
@@ -16,22 +16,20 @@
* limitations under the License.
*/
-
package org.apache.hadoop.yarn.server.federation.policies;
+import java.nio.ByteBuffer;
+
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.Before;
import org.junit.Test;
-import java.nio.ByteBuffer;
-
/**
* Test class for {@link FederationPolicyInitializationContextValidator}.
*/
@@ -45,11 +43,10 @@ public class TestFederationPolicyInitializationContextValidator {
@Before
public void setUp() throws Exception {
goodFacade = FederationPoliciesTestUtil.initFacade();
- goodConfig =
- new MockPolicyManager().serializeConf();
- goodSR =FederationPoliciesTestUtil.initResolver();
- context = new
- FederationPolicyInitializationContext(goodConfig, goodSR, goodFacade);
+ goodConfig = new MockPolicyManager().serializeConf();
+ goodSR = FederationPoliciesTestUtil.initResolver();
+ context = new FederationPolicyInitializationContext(goodConfig, goodSR,
+ goodFacade);
}
@Test
@@ -100,8 +97,7 @@ public class TestFederationPolicyInitializationContextValidator {
@Override
public FederationAMRMProxyPolicy getAMRMPolicy(
- FederationPolicyInitializationContext
- federationPolicyInitializationContext,
+ FederationPolicyInitializationContext policyContext,
FederationAMRMProxyPolicy oldInstance)
throws FederationPolicyInitializationException {
return null;
@@ -109,8 +105,7 @@ public class TestFederationPolicyInitializationContextValidator {
@Override
public FederationRouterPolicy getRouterPolicy(
- FederationPolicyInitializationContext
- federationPolicyInitializationContext,
+ FederationPolicyInitializationContext policyContext,
FederationRouterPolicy oldInstance)
throws FederationPolicyInitializationException {
return null;
@@ -120,8 +115,8 @@ public class TestFederationPolicyInitializationContextValidator {
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
ByteBuffer buf = ByteBuffer.allocate(0);
- return SubClusterPolicyConfiguration
- .newInstance("queue1", this.getClass().getCanonicalName(), buf);
+ return SubClusterPolicyConfiguration.newInstance("queue1",
+ this.getClass().getCanonicalName(), buf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
new file mode 100644
index 0000000..a21f53d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple test class for the {@link BroadcastAMRMProxyPolicy}.
+ */
+public class TestBroadcastAMRMProxyFederationPolicy
+ extends BaseFederationPoliciesTest {
+
+ @Before
+ public void setUp() throws Exception {
+ setPolicy(new BroadcastAMRMProxyPolicy());
+ // needed for base test to work
+ setPolicyInfo(mock(WeightedPolicyInfo.class));
+
+ for (int i = 1; i <= 2; i++) {
+ SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
+ SubClusterInfo sci = mock(SubClusterInfo.class);
+ when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+ when(sci.getSubClusterId()).thenReturn(sc.toId());
+ getActiveSubclusters().put(sc.toId(), sci);
+ }
+
+ FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+ mock(WeightedPolicyInfo.class), getActiveSubclusters());
+
+ }
+
+ @Test
+ public void testSplitAllocateRequest() throws Exception {
+ // verify the request is broadcasted to all subclusters
+ String[] hosts = new String[] {"host1", "host2" };
+ List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
+ .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
+
+ Map<SubClusterId, List<ResourceRequest>> response =
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+ Assert.assertTrue(response.size() == 2);
+ for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
+ .entrySet()) {
+ Assert.assertTrue(getActiveSubclusters().get(entry.getKey()) != null);
+ for (ResourceRequest r : entry.getValue()) {
+ Assert.assertTrue(resourceRequests.contains(r));
+ }
+ }
+ for (SubClusterId subClusterId : getActiveSubclusters().keySet()) {
+ for (ResourceRequest r : response.get(subClusterId)) {
+ Assert.assertTrue(resourceRequests.contains(r));
+ }
+ }
+ }
+
+ @Test
+ public void testNotifyOfResponse() throws Exception {
+ String[] hosts = new String[] {"host1", "host2" };
+ List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
+ .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
+ Map<SubClusterId, List<ResourceRequest>> response =
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+
+ try {
+ ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
+ SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));
+ Assert.fail();
+ } catch (FederationPolicyException f) {
+ System.out.println("Expected: " + f.getMessage());
+ }
+
+ ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
+ SubClusterId.newInstance("sc1"), mock(AllocateResponse.class));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
new file mode 100644
index 0000000..2654a06
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple test class for the {@link LocalityMulticastAMRMProxyPolicy}.
+ */
+public class TestLocalityMulticastAMRMProxyPolicy
+ extends BaseFederationPoliciesTest {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestLocalityMulticastAMRMProxyPolicy.class);
+
+ @Before
+ public void setUp() throws Exception {
+ setPolicy(new LocalityMulticastAMRMProxyPolicy());
+ setPolicyInfo(new WeightedPolicyInfo());
+ Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+ Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
+
+ // simulate 20 subclusters with a 5% chance of being inactive
+ for (int i = 0; i < 6; i++) {
+ SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i);
+ // sub-cluster 3 is not active
+ if (i != 3) {
+ SubClusterInfo sci = mock(SubClusterInfo.class);
+ when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+ when(sci.getSubClusterId()).thenReturn(sc.toId());
+ getActiveSubclusters().put(sc.toId(), sci);
+ }
+
+ float weight = 1 / 10f;
+ routerWeights.put(sc, weight);
+ amrmWeights.put(sc, weight);
+ // sub-cluster 4 is "disabled" in the weights
+ if (i == 4) {
+ routerWeights.put(sc, 0f);
+ amrmWeights.put(sc, 0f);
+ }
+ }
+
+ getPolicyInfo().setRouterPolicyWeights(routerWeights);
+ getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
+ getPolicyInfo().setHeadroomAlpha(0.5f);
+ setHomeSubCluster(SubClusterId.newInstance("homesubcluster"));
+
+ }
+
+ @Test
+ public void testReinitilialize() throws YarnException {
+ initializePolicy();
+ }
+
+ private void initializePolicy() throws YarnException {
+ setFederationPolicyContext(new FederationPolicyInitializationContext());
+ SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
+ getFederationPolicyContext().setFederationSubclusterResolver(resolver);
+ ByteBuffer buf = getPolicyInfo().toByteBuffer();
+ getFederationPolicyContext().setSubClusterPolicyConfiguration(
+ SubClusterPolicyConfiguration.newInstance("queue1",
+ getPolicy().getClass().getCanonicalName(), buf));
+ getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
+ FederationPoliciesTestUtil.initializePolicyContext(
+ getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
+ getActiveSubclusters());
+ }
+
+ @Test
+ public void testSplitBasedOnHeadroom() throws Exception {
+
+ // Tests how the headroom info are used to split based on the capacity
+ // each RM claims to give us.
+ // Configure policy to be 100% headroom based
+ getPolicyInfo().setHeadroomAlpha(1.0f);
+
+ initializePolicy();
+ List<ResourceRequest> resourceRequests = createSimpleRequest();
+
+ prepPolicyWithHeadroom();
+
+ Map<SubClusterId, List<ResourceRequest>> response =
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+
+ // pretty print requests
+ LOG.info("Initial headroom");
+ prettyPrintRequests(response);
+
+ validateSplit(response, resourceRequests);
+
+ // based on headroom, we expect 75 containers to got to subcluster0,
+ // as it advertise lots of headroom (100), no containers for sublcuster1
+ // as it advertise zero headroom, 1 to subcluster 2 (as it advertise little
+ // headroom (1), and 25 to subcluster5 which has unknown headroom, and so
+ // it gets 1/4th of the load
+ checkExpectedAllocation(response, "subcluster0", 1, 75);
+ checkExpectedAllocation(response, "subcluster1", 1, -1);
+ checkExpectedAllocation(response, "subcluster2", 1, 1);
+ checkExpectedAllocation(response, "subcluster5", 1, 25);
+
+ // notify a change in headroom and try again
+ AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+
+ LOG.info("After headroom update");
+ prettyPrintRequests(response);
+ validateSplit(response, resourceRequests);
+
+ // we simulated a change in headroom for subcluster2, which will now
+ // have the same headroom of subcluster0 and so it splits the requests
+ // note that the total is still less or equal to (userAsk + numSubClusters)
+ checkExpectedAllocation(response, "subcluster0", 1, 38);
+ checkExpectedAllocation(response, "subcluster1", 1, -1);
+ checkExpectedAllocation(response, "subcluster2", 1, 38);
+ checkExpectedAllocation(response, "subcluster5", 1, 25);
+
+ }
+
+ @Test(timeout = 5000)
+ public void testStressPolicy() throws Exception {
+
+ // Tests how the headroom info are used to split based on the capacity
+ // each RM claims to give us.
+ // Configure policy to be 100% headroom based
+ getPolicyInfo().setHeadroomAlpha(1.0f);
+
+ initializePolicy();
+
+ int numRR = 1000;
+ List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
+
+ prepPolicyWithHeadroom();
+
+ int numIterations = 1000;
+ long tstart = System.currentTimeMillis();
+ for (int i = 0; i < numIterations; i++) {
+ Map<SubClusterId, List<ResourceRequest>> response =
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+ validateSplit(response, resourceRequests);
+ }
+ long tend = System.currentTimeMillis();
+
+ LOG.info("Performed " + numIterations + " policy invocations (and "
+ + "validations) in " + (tend - tstart) + "ms");
+ }
+
+ @Test
+ public void testFWDAllZeroANY() throws Exception {
+
+ // Tests how the headroom info are used to split based on the capacity
+ // each RM claims to give us.
+ // Configure policy to be 100% headroom based
+ getPolicyInfo().setHeadroomAlpha(0.5f);
+
+ initializePolicy();
+ List<ResourceRequest> resourceRequests = createZeroSizedANYRequest();
+
+ // this receives responses from sc0,sc1,sc2
+ prepPolicyWithHeadroom();
+
+ Map<SubClusterId, List<ResourceRequest>> response =
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+
+ // we expect all three to appear for a zero-sized ANY
+
+ // pretty print requests
+ prettyPrintRequests(response);
+
+ validateSplit(response, resourceRequests);
+
+ // we expect the zero size request to be sent to the first 3 rm (due to
+ // the fact that we received responses only from these 3 sublcusters)
+ checkExpectedAllocation(response, "subcluster0", 1, 0);
+ checkExpectedAllocation(response, "subcluster1", 1, 0);
+ checkExpectedAllocation(response, "subcluster2", 1, 0);
+ checkExpectedAllocation(response, "subcluster3", -1, -1);
+ checkExpectedAllocation(response, "subcluster4", -1, -1);
+ checkExpectedAllocation(response, "subcluster5", -1, -1);
+ }
+
+ @Test
+ public void testSplitBasedOnHeadroomAndWeights() throws Exception {
+
+ // Tests how the headroom info are used to split based on the capacity
+ // each RM claims to give us.
+
+ // Configure policy to be 50% headroom based and 50% weight based
+ getPolicyInfo().setHeadroomAlpha(0.5f);
+
+ initializePolicy();
+ List<ResourceRequest> resourceRequests = createSimpleRequest();
+
+ prepPolicyWithHeadroom();
+
+ Map<SubClusterId, List<ResourceRequest>> response =
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+
+ // pretty print requests
+ prettyPrintRequests(response);
+
+ validateSplit(response, resourceRequests);
+
+ // in this case the headroom allocates 50 containers, while weights allocate
+ // the rest. due to weights we have 12.5 (round to 13) containers for each
+ // sublcuster, the rest is due to headroom.
+ checkExpectedAllocation(response, "subcluster0", 1, 50);
+ checkExpectedAllocation(response, "subcluster1", 1, 13);
+ checkExpectedAllocation(response, "subcluster2", 1, 13);
+ checkExpectedAllocation(response, "subcluster3", -1, -1);
+ checkExpectedAllocation(response, "subcluster4", -1, -1);
+ checkExpectedAllocation(response, "subcluster5", 1, 25);
+
+ }
+
+ private void prepPolicyWithHeadroom() throws YarnException {
+ AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar);
+
+ ar = getAllocateResponseWithTargetHeadroom(0);
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .notifyOfResponse(SubClusterId.newInstance("subcluster1"), ar);
+
+ ar = getAllocateResponseWithTargetHeadroom(1);
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
+ }
+
+ private AllocateResponse getAllocateResponseWithTargetHeadroom(
+ int numContainers) {
+ return AllocateResponse.newInstance(0, null, null,
+ Collections.<NodeReport> emptyList(),
+ Resource.newInstance(numContainers * 1024, numContainers), null, 10,
+ null, Collections.<NMToken> emptyList());
+ }
+
+ @Test
+ public void testSplitAllocateRequest() throws Exception {
+
+ // Test a complex List<ResourceRequest> is split correctly
+ initializePolicy();
+
+ // modify default initialization to include a "homesubcluster"
+ // which we will use as the default for when nodes or racks are unknown
+ SubClusterInfo sci = mock(SubClusterInfo.class);
+ when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+ when(sci.getSubClusterId()).thenReturn(getHomeSubCluster());
+ getActiveSubclusters().put(getHomeSubCluster(), sci);
+ SubClusterIdInfo sc = new SubClusterIdInfo(getHomeSubCluster().getId());
+
+ getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f);
+ getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f);
+
+ FederationPoliciesTestUtil.initializePolicyContext(
+ getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
+ getActiveSubclusters());
+
+ List<ResourceRequest> resourceRequests = createComplexRequest();
+
+ Map<SubClusterId, List<ResourceRequest>> response =
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+
+ validateSplit(response, resourceRequests);
+ prettyPrintRequests(response);
+
+ // we expect 4 entry for home subcluster (3 for request-id 4, and a part
+ // of the broadcast of request-id 2
+ checkExpectedAllocation(response, getHomeSubCluster().getId(), 4, 23);
+
+ // for subcluster0 we expect 3 entry from request-id 0, and 3 from
+ // request-id 3, as well as part of the request-id 2 broadast
+ checkExpectedAllocation(response, "subcluster0", 7, 26);
+
+ // we expect 5 entry for subcluster1 (4 from request-id 1, and part
+ // of the broadcast of request-id 2
+ checkExpectedAllocation(response, "subcluster1", 5, 25);
+
+ // sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the
+ // broadcast of request-id 2, and no request-id 0
+ checkExpectedAllocation(response, "subcluster2", 4, 23);
+
+ // subcluster id 3, 4 should not appear (due to weights or active/inactive)
+ checkExpectedAllocation(response, "subcluster3", -1, -1);
+ checkExpectedAllocation(response, "subcluster4", -1, -1);
+
+ // subcluster5 should get only part of the request-id 2 broadcast
+ checkExpectedAllocation(response, "subcluster5", 1, 20);
+
+ // check that the allocations that show up are what expected
+ for (ResourceRequest rr : response.get(getHomeSubCluster())) {
+ Assert.assertTrue(rr.getAllocationRequestId() == 4L
+ || rr.getAllocationRequestId() == 2L);
+ }
+
+ for (ResourceRequest rr : response.get(getHomeSubCluster())) {
+ Assert.assertTrue(rr.getAllocationRequestId() != 1L);
+ }
+
+ List<ResourceRequest> rrs =
+ response.get(SubClusterId.newInstance("subcluster0"));
+ for (ResourceRequest rr : rrs) {
+ Assert.assertTrue(rr.getAllocationRequestId() != 1L);
+ }
+
+ for (ResourceRequest rr : response
+ .get(SubClusterId.newInstance("subcluster2"))) {
+ Assert.assertTrue(rr.getAllocationRequestId() != 0L);
+ }
+
+ for (ResourceRequest rr : response
+ .get(SubClusterId.newInstance("subcluster5"))) {
+ Assert.assertTrue(rr.getAllocationRequestId() >= 2);
+ Assert.assertTrue(rr.getRelaxLocality());
+ }
+ }
+
+ // check that the number of containers in the first ResourceRequest in
+ // response for this sub-cluster matches expectations. -1 indicate the
+ // response should be null
+ private void checkExpectedAllocation(
+ Map<SubClusterId, List<ResourceRequest>> response, String subCluster,
+ long totResourceRequests, long totContainers) {
+ if (totContainers == -1) {
+ Assert.assertNull(response.get(SubClusterId.newInstance(subCluster)));
+ } else {
+ SubClusterId sc = SubClusterId.newInstance(subCluster);
+ Assert.assertEquals(totResourceRequests, response.get(sc).size());
+
+ long actualContCount = 0;
+ for (ResourceRequest rr : response.get(sc)) {
+ actualContCount += rr.getNumContainers();
+ }
+ Assert.assertEquals(totContainers, actualContCount);
+ }
+ }
+
+ private void validateSplit(Map<SubClusterId, List<ResourceRequest>> split,
+ List<ResourceRequest> original) throws YarnException {
+
+ SubClusterResolver resolver =
+ getFederationPolicyContext().getFederationSubclusterResolver();
+
+ // Apply general validation rules
+ int numUsedSubclusters = split.size();
+
+ Set<Long> originalIds = new HashSet<>();
+ Set<Long> splitIds = new HashSet<>();
+
+ int originalContainers = 0;
+ for (ResourceRequest rr : original) {
+ originalContainers += rr.getNumContainers();
+ originalIds.add(rr.getAllocationRequestId());
+ }
+
+ int splitContainers = 0;
+ for (Map.Entry<SubClusterId, List<ResourceRequest>> rrs : split
+ .entrySet()) {
+ for (ResourceRequest rr : rrs.getValue()) {
+ splitContainers += rr.getNumContainers();
+ splitIds.add(rr.getAllocationRequestId());
+ // check node-local asks are sent to right RM (only)
+ SubClusterId fid = null;
+ try {
+ fid = resolver.getSubClusterForNode(rr.getResourceName());
+ } catch (YarnException e) {
+ // ignore code will handle
+ }
+ if (!rrs.getKey().equals(getHomeSubCluster()) && fid != null
+ && !fid.equals(rrs.getKey())) {
+ Assert.fail("A node-local (or resolvable rack-local) RR should not "
+ + "be send to an RM other than what it resolves to.");
+ }
+ }
+ }
+
+ // check we are not inventing Allocation Ids
+ Assert.assertEquals(originalIds, splitIds);
+
+ // check we are not exceedingly replicating the container asks among
+ // RMs (a little is allowed due to rounding of fractional splits)
+ Assert.assertTrue(
+ " Containers requested (" + splitContainers + ") should "
+ + "not exceed the original count of containers ("
+ + originalContainers + ") by more than the number of subclusters ("
+ + numUsedSubclusters + ")",
+ originalContainers + numUsedSubclusters >= splitContainers);
+
+ // Test target Ids
+ for (SubClusterId targetId : split.keySet()) {
+ Assert.assertTrue("Target subclusters should be in the active set",
+ getActiveSubclusters().containsKey(targetId));
+ Assert.assertTrue(
+ "Target subclusters (" + targetId + ") should have weight >0 in "
+ + "the policy ",
+ getPolicyInfo().getRouterPolicyWeights()
+ .get(new SubClusterIdInfo(targetId)) > 0);
+ }
+ }
+
+ private void prettyPrintRequests(
+ Map<SubClusterId, List<ResourceRequest>> response) {
+ for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
+ .entrySet()) {
+ String str = "";
+ for (ResourceRequest rr : entry.getValue()) {
+ str += " [id:" + rr.getAllocationRequestId() + " loc:"
+ + rr.getResourceName() + " numCont:" + rr.getNumContainers()
+ + "], ";
+ }
+ LOG.info(entry.getKey() + " --> " + str);
+ }
+ }
+
+ private List<ResourceRequest> createLargeRandomList(int numRR)
+ throws Exception {
+
+ List<ResourceRequest> out = new ArrayList<>();
+ Random rand = new Random(1);
+ DefaultSubClusterResolverImpl resolver =
+ (DefaultSubClusterResolverImpl) getFederationPolicyContext()
+ .getFederationSubclusterResolver();
+
+ List<String> nodes =
+ new ArrayList<>(resolver.getNodeToSubCluster().keySet());
+
+ for (int i = 0; i < numRR; i++) {
+ String nodeName = nodes.get(rand.nextInt(nodes.size()));
+ long allocationId = (long) rand.nextInt(20);
+
+ // create a single container request in sc0
+ out.add(FederationPoliciesTestUtil.createResourceRequest(allocationId,
+ nodeName, 1024, 1, 1, rand.nextInt(100), null, rand.nextBoolean()));
+ }
+ return out;
+ }
+
+ private List<ResourceRequest> createSimpleRequest() throws Exception {
+
+ List<ResourceRequest> out = new ArrayList<>();
+
+ // create a single container request in sc0
+ out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+ ResourceRequest.ANY, 1024, 1, 1, 100, null, true));
+ return out;
+ }
+
+ private List<ResourceRequest> createZeroSizedANYRequest() throws Exception {
+
+ List<ResourceRequest> out = new ArrayList<>();
+
+ // create a single container request in sc0
+ out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+ ResourceRequest.ANY, 1024, 1, 1, 0, null, true));
+ return out;
+ }
+
+ private List<ResourceRequest> createComplexRequest() throws Exception {
+
+ List<ResourceRequest> out = new ArrayList<>();
+
+ // create a single container request in sc0
+ out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+ "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+ "subcluster0-rack0", 1024, 1, 1, 1, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(0L,
+ ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
+
+ // create a single container request with 3 alternative hosts across sc1,sc2
+ // where we want 2 containers in sc1 and 1 in sc2
+ out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+ "subcluster1-rack1-host1", 1024, 1, 1, 1, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+ "subcluster1-rack1-host2", 1024, 1, 1, 1, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+ "subcluster2-rack3-host3", 1024, 1, 1, 1, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+ "subcluster1-rack1", 1024, 1, 1, 2, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+ "subcluster2-rack3", 1024, 1, 1, 1, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
+ ResourceRequest.ANY, 1024, 1, 1, 2, null, false));
+
+ // create a non-local ANY request that can span anything
+ out.add(FederationPoliciesTestUtil.createResourceRequest(2L,
+ ResourceRequest.ANY, 1024, 1, 1, 100, null, true));
+
+ // create a single container request in sc0 with relaxed locality
+ out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
+ "subcluster0-rack0-host0", 1024, 1, 1, 1, null, true));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
+ "subcluster0-rack0", 1024, 1, 1, 1, null, true));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(3L,
+ ResourceRequest.ANY, 1024, 1, 1, 1, null, true));
+
+ // create a request of an unknown node/rack and expect this to show up
+ // in homesubcluster
+ out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownNode",
+ 1024, 1, 1, 1, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownRack",
+ 1024, 1, 1, 1, null, false));
+ out.add(FederationPoliciesTestUtil.createResourceRequest(4L,
+ ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
+
+ return out;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
index 9e94f72..906e35f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
@@ -17,6 +17,9 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@@ -29,12 +32,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-
/**
- * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the
- * load is properly considered for allocation.
+ * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load
+ * is properly considered for allocation.
*/
public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
@@ -47,12 +47,10 @@ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
// simulate 20 active subclusters
for (int i = 0; i < 20; i++) {
- SubClusterIdInfo sc =
- new SubClusterIdInfo(String.format("sc%02d", i));
+ SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", i));
SubClusterInfo federationSubClusterInfo =
SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1,
- SubClusterState.SC_RUNNING, -1,
- generateClusterMetricsInfo(i));
+ SubClusterState.SC_RUNNING, -1, generateClusterMetricsInfo(i));
getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
float weight = getRand().nextInt(2);
if (i == 5) {
@@ -76,7 +74,7 @@ public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
private String generateClusterMetricsInfo(int id) {
long mem = 1024 * getRand().nextInt(277 * 100 - 1);
- //plant a best cluster
+ // plant a best cluster
if (id == 5) {
mem = 1024 * 277 * 100;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
index ff5175d..eefcfd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
@@ -16,6 +16,12 @@
*/
package org.apache.hadoop.yarn.server.federation.policies.router;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@@ -28,12 +34,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Simple test class for the {@link PriorityRouterPolicy}. Tests that the
* weights are correctly used for ordering the choice of sub-clusters.
@@ -72,8 +72,7 @@ public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest {
getPolicyInfo().setRouterPolicyWeights(routerWeights);
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
- getPolicyInfo(),
- getActiveSubclusters());
+ getPolicyInfo(), getActiveSubclusters());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
index a612685..78967d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
@@ -17,6 +17,13 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@@ -29,13 +36,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Simple test class for the {@link WeightedRandomRouterPolicy}. Generate large
* number of randomized tests to check we are weighiting correctly even if
@@ -71,8 +71,7 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
- getPolicyInfo(),
- getActiveSubclusters());
+ getPolicyInfo(), getActiveSubclusters());
}
@@ -88,8 +87,8 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
float numberOfDraws = 1000000;
for (float i = 0; i < numberOfDraws; i++) {
- SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()).
- getHomeSubcluster(getApplicationSubmissionContext());
+ SubClusterId chosenId = ((FederationRouterPolicy) getPolicy())
+ .getHomeSubcluster(getApplicationSubmissionContext());
counter.get(chosenId).incrementAndGet();
}
@@ -113,13 +112,15 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
if (getActiveSubclusters().containsKey(counterEntry.getKey())) {
Assert.assertTrue(
"Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
- + " expected weight: " + expectedWeight, expectedWeight == 0 ||
- (actualWeight / expectedWeight) < 1.1
- && (actualWeight / expectedWeight) > 0.9);
+ + " expected weight: " + expectedWeight,
+ expectedWeight == 0 || (actualWeight / expectedWeight) < 1.1
+ && (actualWeight / expectedWeight) > 0.9);
} else {
- Assert.assertTrue(
- "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
- + " expected weight: " + expectedWeight, actualWeight == 0);
+ Assert
+ .assertTrue(
+ "Id " + counterEntry.getKey() + " Actual weight: "
+ + actualWeight + " expected weight: " + expectedWeight,
+ actualWeight == 0);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
index f901329..87ed8d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.utils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolv
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.*;
+import org.apache.hadoop.yarn.util.Records;
import java.net.URL;
import java.nio.ByteBuffer;
@@ -48,6 +50,68 @@ public final class FederationPoliciesTestUtil {
// disabled.
}
+ private static final String FEDR_NODE_PREFIX = "fedr-test-node-";
+
+
+ public static List<ResourceRequest> createResourceRequests(String[] hosts,
+ int memory, int vCores, int priority, int containers,
+ String labelExpression, boolean relaxLocality) throws YarnException {
+ List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
+ for (String host : hosts) {
+ ResourceRequest hostReq =
+ createResourceRequest(host, memory, vCores, priority, containers,
+ labelExpression, relaxLocality);
+ reqs.add(hostReq);
+ ResourceRequest rackReq =
+ createResourceRequest("/default-rack", memory, vCores, priority,
+ containers, labelExpression, relaxLocality);
+ reqs.add(rackReq);
+ }
+
+ ResourceRequest offRackReq =
+ createResourceRequest(ResourceRequest.ANY, memory, vCores, priority,
+ containers, labelExpression, relaxLocality);
+ reqs.add(offRackReq);
+ return reqs;
+ }
+
+ protected static ResourceRequest createResourceRequest(String resource,
+ int memory, int vCores, int priority, int containers,
+ boolean relaxLocality) throws YarnException {
+ return createResourceRequest(resource, memory, vCores, priority, containers,
+ null, relaxLocality);
+ }
+
+ @SuppressWarnings("checkstyle:parameternumber")
+ public static ResourceRequest createResourceRequest(long id, String resource,
+ int memory, int vCores, int priority, int containers,
+ String labelExpression, boolean relaxLocality) throws YarnException {
+ ResourceRequest out =
+ createResourceRequest(resource, memory, vCores, priority, containers,
+ labelExpression, relaxLocality);
+ out.setAllocationRequestId(id);
+ return out;
+ }
+
+ public static ResourceRequest createResourceRequest(String resource,
+ int memory, int vCores, int priority, int containers,
+ String labelExpression, boolean relaxLocality) throws YarnException {
+ ResourceRequest req = Records.newRecord(ResourceRequest.class);
+ req.setResourceName(resource);
+ req.setNumContainers(containers);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(priority);
+ req.setPriority(pri);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemorySize(memory);
+ capability.setVirtualCores(vCores);
+ req.setCapability(capability);
+ if (labelExpression != null) {
+ req.setNodeLabelExpression(labelExpression);
+ }
+ req.setRelaxLocality(relaxLocality);
+ return req;
+ }
public static void initializePolicyContext(
FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c5ab53f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
index e4d6112..2b7e237 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/nodes
@@ -1,4 +1,8 @@
node1,subcluster1,rack1
node2 , subcluster2, RACK1
noDE3,subcluster3, rack2
-node4, subcluster3, rack2
\ No newline at end of file
+node4, subcluster3, rack2
+subcluster0-rack0-host0,subcluster0, subcluster0-rack0
+Subcluster1-RACK1-HOST1,subcluster1, subCluster1-RACK1
+SUBCLUSTER1-RACK1-HOST2,subcluster1, subCluster1-RACK1
+SubCluster2-RACK3-HOST3,subcluster2, subcluster2-rack3
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org