You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2018/08/20 21:34:02 UTC
hadoop git commit: YARN-8581. [AMRMProxy] Add sub-cluster timeout in
LocalityMulticastAMRMProxyPolicy. Contributed by Botong Huang.
Repository: hadoop
Updated Branches:
refs/heads/trunk 8736fc39a -> e0f6ffdba
YARN-8581. [AMRMProxy] Add sub-cluster timeout in LocalityMulticastAMRMProxyPolicy. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0f6ffdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0f6ffdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0f6ffdb
Branch: refs/heads/trunk
Commit: e0f6ffdbad6f43fd43ec57fb68ebf5275b8b9ba0
Parents: 8736fc3
Author: Giovanni Matteo Fumarola <gi...@apache.com>
Authored: Mon Aug 20 14:33:16 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@apache.com>
Committed: Mon Aug 20 14:33:16 2018 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 8 +-
.../yarn/conf/TestYarnConfigurationFields.java | 2 +
.../LocalityMulticastAMRMProxyPolicy.java | 64 ++++++++++++--
.../utils/FederationStateStoreFacade.java | 9 ++
.../TestLocalityMulticastAMRMProxyPolicy.java | 91 +++++++++++++++++---
.../utils/FederationPoliciesTestUtil.java | 7 +-
6 files changed, 162 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f6ffdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 78e28f7..148edb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3209,8 +3209,14 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.federation.resolver."
+ "DefaultSubClusterResolverImpl";
- public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
+ // AMRMProxy split-merge timeout for active sub-clusters. We will not route
+ // new asks to expired sub-clusters.
+ public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
+ FEDERATION_PREFIX + "amrmproxy.subcluster.timeout.ms";
+ public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
+ 60000; // one minute
+ public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
+ "policy-manager";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f6ffdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 9249ed4..d63933c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -105,6 +105,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+ configurationPropsToSkipCompare
+ .add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
// Federation StateStore ZK implementation configs to be ignored
configurationPropsToSkipCompare.add(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f6ffdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
index 1481f34..1ccd61c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -126,6 +127,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
private SubClusterResolver resolver;
private Map<SubClusterId, Resource> headroom;
+ private Map<SubClusterId, Long> lastHeartbeatTimeStamp;
+ private long subClusterTimeOut;
private float hrAlpha;
private FederationStateStoreFacade federationFacade;
private AllocationBookkeeper bookkeeper;
@@ -178,6 +181,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
if (headroom == null) {
headroom = new ConcurrentHashMap<>();
+ lastHeartbeatTimeStamp = new ConcurrentHashMap<>();
}
hrAlpha = policy.getHeadroomAlpha();
@@ -185,13 +189,29 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
policyContext.getFederationStateStoreFacade();
this.homeSubcluster = policyContext.getHomeSubcluster();
+ this.subClusterTimeOut = this.federationFacade.getConf().getLong(
+ YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
+ if (this.subClusterTimeOut <= 0) {
+ LOG.info(
+ "{} configured to be {}, should be positive. Using default of {}.",
+ YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+ this.subClusterTimeOut,
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
+ this.subClusterTimeOut =
+ YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
+ }
}
@Override
public void notifyOfResponse(SubClusterId subClusterId,
AllocateResponse response) throws YarnException {
- // stateless policy does not care about responses except tracking headroom
- headroom.put(subClusterId, response.getAvailableResources());
+ if (response.getAvailableResources() != null) {
+ headroom.put(subClusterId, response.getAvailableResources());
+ LOG.info("Subcluster {} updated with {} memory headroom", subClusterId,
+ response.getAvailableResources().getMemorySize());
+ }
+ lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis());
}
@Override
@@ -281,6 +301,15 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
// handle all non-localized requests (ANY)
splitAnyRequests(nonLocalizedRequests, bookkeeper);
+ for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : bookkeeper
+ .getAnswer().entrySet()) {
+ // A new-cluster here will trigger new UAM luanch, which might take a long
+ // time. We don't want too many requests stuck in this UAM before it is
+ // ready and starts heartbeating
+ if (!lastHeartbeatTimeStamp.containsKey(entry.getKey())) {
+ lastHeartbeatTimeStamp.put(entry.getKey(), System.currentTimeMillis());
+ }
+ }
return bookkeeper.getAnswer();
}
@@ -519,13 +548,10 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
policyWeights = weights;
totPolicyWeight = 0;
- // pre-compute the set of subclusters that are both active and enabled by
- // the policy weights, and accumulate their total weight
for (Map.Entry<SubClusterId, Float> entry : policyWeights.entrySet()) {
if (entry.getValue() > 0
&& activeSubclusters.containsKey(entry.getKey())) {
activeAndEnabledSC.add(entry.getKey());
- totPolicyWeight += entry.getValue();
}
}
@@ -535,6 +561,34 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
+ "currently active we cannot forward the ResourceRequest(s)");
}
+ Set<SubClusterId> tmpSCSet = new HashSet<>(activeAndEnabledSC);
+ for (Map.Entry<SubClusterId, Long> entry : lastHeartbeatTimeStamp
+ .entrySet()) {
+ long duration = System.currentTimeMillis() - entry.getValue();
+ if (duration > subClusterTimeOut) {
+ LOG.warn(
+ "Subcluster {} does not have a success heartbeat for {}s, "
+ + "skip routing asks there for this request",
+ entry.getKey(), (double) duration / 1000);
+ tmpSCSet.remove(entry.getKey());
+ }
+ }
+ if (tmpSCSet.size() < 1) {
+ LOG.warn("All active and enabled subclusters have expired last "
+ + "heartbeat time. Ignore the expiry check for this request");
+ } else {
+ activeAndEnabledSC = tmpSCSet;
+ }
+
+ LOG.info("{} subcluster active, {} subclusters active and enabled",
+ activeSubclusters.size(), activeAndEnabledSC.size());
+
+ // pre-compute the set of subclusters that are both active and enabled by
+ // the policy weights, and accumulate their total weight
+ for (SubClusterId sc : activeAndEnabledSC) {
+ totPolicyWeight += policyWeights.get(sc);
+ }
+
// pre-compute headroom-based weights for active/enabled subclusters
for (Map.Entry<SubClusterId, Resource> r : headroom.entrySet()) {
if (activeAndEnabledSC.contains(r.getKey())) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f6ffdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 1bcb0f4..5d9702f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -392,6 +392,15 @@ public final class FederationStateStoreFacade {
}
/**
+ * Get the configuration.
+ *
+ * @return configuration object
+ */
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
* Helper method to create instances of Object using the class name defined in
* the configuration object. The instances creates {@link RetryProxy} using
* the specific {@link RetryPolicy}.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f6ffdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
index f66bbb6..cf9ac53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
@@ -32,11 +32,13 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -106,6 +108,10 @@ public class TestLocalityMulticastAMRMProxyPolicy
}
private void initializePolicy() throws YarnException {
+ initializePolicy(new YarnConfiguration());
+ }
+
+ private void initializePolicy(Configuration conf) throws YarnException {
setFederationPolicyContext(new FederationPolicyInitializationContext());
SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
getFederationPolicyContext().setFederationSubclusterResolver(resolver);
@@ -116,7 +122,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
FederationPoliciesTestUtil.initializePolicyContext(
getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
- getActiveSubclusters());
+ getActiveSubclusters(), conf);
}
@Test(expected = FederationPolicyInitializationException.class)
@@ -145,7 +151,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
initializePolicy();
List<ResourceRequest> resourceRequests = createSimpleRequest();
- prepPolicyWithHeadroom();
+ prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
@@ -205,7 +211,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
int numRR = 1000;
List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
- prepPolicyWithHeadroom();
+ prepPolicyWithHeadroom(true);
int numIterations = 1000;
long tstart = System.currentTimeMillis();
@@ -233,7 +239,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
List<ResourceRequest> resourceRequests = createZeroSizedANYRequest();
// this receives responses from sc0,sc1,sc2
- prepPolicyWithHeadroom();
+ prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
@@ -269,7 +275,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
initializePolicy();
List<ResourceRequest> resourceRequests = createSimpleRequest();
- prepPolicyWithHeadroom();
+ prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
@@ -292,10 +298,14 @@ public class TestLocalityMulticastAMRMProxyPolicy
checkTotalContainerAllocation(response, 100);
}
- private void prepPolicyWithHeadroom() throws YarnException {
+ private void prepPolicyWithHeadroom(boolean setSubCluster0)
+ throws YarnException {
AllocateResponse ar = getAllocateResponseWithTargetHeadroom(40);
- ((FederationAMRMProxyPolicy) getPolicy())
- .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar);
+
+ if (setSubCluster0) {
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar);
+ }
ar = getAllocateResponseWithTargetHeadroom(0);
((FederationAMRMProxyPolicy) getPolicy())
@@ -333,7 +343,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
FederationPoliciesTestUtil.initializePolicyContext(
getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
- getActiveSubclusters());
+ getActiveSubclusters(), new Configuration());
List<ResourceRequest> resourceRequests = createComplexRequest();
@@ -669,7 +679,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
List<ResourceRequest> resourceRequests = new ArrayList<>();
// Initialize the headroom map
- prepPolicyWithHeadroom();
+ prepPolicyWithHeadroom(true);
// Cancel at ANY level only
resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
@@ -716,4 +726,65 @@ public class TestLocalityMulticastAMRMProxyPolicy
checkExpectedAllocation(response, "subcluster5", 1, 25);
checkTotalContainerAllocation(response, 100);
}
+
+ @Test
+ public void testSubClusterExpiry() throws Exception {
+
+ // Tests how the headroom info are used to split based on the capacity
+ // each RM claims to give us.
+ // Configure policy to be 100% headroom based
+ getPolicyInfo().setHeadroomAlpha(1.0f);
+
+ YarnConfiguration conf = new YarnConfiguration();
+ // Set expiry to 500ms
+ conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+ 500);
+
+ initializePolicy(conf);
+ List<ResourceRequest> resourceRequests = createSimpleRequest();
+
+ // Update the response timestamp for the first time
+ prepPolicyWithHeadroom(true);
+
+ Map<SubClusterId, List<ResourceRequest>> response =
+ ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+
+ // pretty print requests
+ prettyPrintRequests(response);
+
+ validateSplit(response, resourceRequests);
+
+ /*
+ * based on headroom, we expect 75 containers to got to subcluster0 (60) and
+ * subcluster2 (15) according to the advertised headroom (40 and 10), no
+ * containers for sublcuster1 as it advertise zero headroom, and 25 to
+ * subcluster5 which has unknown headroom, and so it gets 1/4th of the load
+ */
+ checkExpectedAllocation(response, "subcluster0", 1, 60);
+ checkExpectedAllocation(response, "subcluster1", 1, -1);
+ checkExpectedAllocation(response, "subcluster2", 1, 15);
+ checkExpectedAllocation(response, "subcluster5", 1, 25);
+ checkTotalContainerAllocation(response, 100);
+
+ Thread.sleep(800);
+
+ // Update the response timestamp for the second time, skipping sc0 and sc5
+ prepPolicyWithHeadroom(false);
+
+ response = ((FederationAMRMProxyPolicy) getPolicy())
+ .splitResourceRequests(resourceRequests);
+
+ // pretty print requests
+ prettyPrintRequests(response);
+
+ validateSplit(response, resourceRequests);
+
+ checkExpectedAllocation(response, "subcluster0", 1, -1);
+ checkExpectedAllocation(response, "subcluster1", 1, -1);
+ checkExpectedAllocation(response, "subcluster2", 1, 100);
+ checkExpectedAllocation(response, "subcluster5", 1, -1);
+ checkTotalContainerAllocation(response, 100);
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f6ffdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
index 24399cb..9cc6d06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
@@ -117,7 +117,7 @@ public final class FederationPoliciesTestUtil {
public static void initializePolicyContext(
FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy
policy, WeightedPolicyInfo policyInfo,
- Map<SubClusterId, SubClusterInfo> activeSubclusters)
+ Map<SubClusterId, SubClusterInfo> activeSubclusters, Configuration conf)
throws YarnException {
ByteBuffer buf = policyInfo.toByteBuffer();
fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
@@ -133,7 +133,7 @@ public final class FederationPoliciesTestUtil {
.newInstance(new ArrayList<SubClusterInfo>(activeSubclusters.values()));
when(fss.getSubClusters(any())).thenReturn(response);
- facade.reinitialize(fss, new Configuration());
+ facade.reinitialize(fss, conf);
fpc.setFederationStateStoreFacade(facade);
policy.reinitialize(fpc);
}
@@ -155,7 +155,8 @@ public final class FederationPoliciesTestUtil {
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(null, initResolver(),
initFacade(), SubClusterId.newInstance(subclusterId));
- initializePolicyContext(context, policy, policyInfo, activeSubclusters);
+ initializePolicyContext(context, policy, policyInfo, activeSubclusters,
+ new Configuration());
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org