You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2018/08/28 23:01:56 UTC
hadoop git commit: YARN-8697. LocalityMulticastAMRMProxyPolicy should
fallback to random sub-cluster when cannot resolve resource. Contributed by
Botong Huang.
Repository: hadoop
Updated Branches:
refs/heads/trunk 3e18b957e -> 7ed458b25
YARN-8697. LocalityMulticastAMRMProxyPolicy should fallback to random sub-cluster when cannot resolve resource. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7ed458b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7ed458b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7ed458b2
Branch: refs/heads/trunk
Commit: 7ed458b255e492fd5bc2ca36f216ff1b16054db7
Parents: 3e18b95
Author: Giovanni Matteo Fumarola <gi...@apache.org>
Authored: Tue Aug 28 16:01:35 2018 -0700
Committer: Giovanni Matteo Fumarola <gi...@apache.org>
Committed: Tue Aug 28 16:01:35 2018 -0700
----------------------------------------------------------------------
.../LocalityMulticastAMRMProxyPolicy.java | 105 +++++++++++++++----
.../TestLocalityMulticastAMRMProxyPolicy.java | 53 ++++++++--
2 files changed, 125 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ed458b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
index 1ccd61c..e5f26d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -123,6 +126,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
public static final Logger LOG =
LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class);
+ private static Random rand = new Random();
+
private Map<SubClusterId, Float> weights;
private SubClusterResolver resolver;
@@ -275,26 +280,18 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
}
// Handle node/rack requests that the SubClusterResolver cannot map to
- // any cluster. Defaulting to home subcluster.
+ // any cluster. Pick a random sub-cluster from active and enabled ones.
+ targetId = getSubClusterForUnResolvedRequest(bookkeeper,
+ rr.getAllocationRequestId());
if (LOG.isDebugEnabled()) {
LOG.debug("ERROR resolving sub-cluster for resourceName: "
- + rr.getResourceName() + " we are falling back to homeSubCluster:"
- + homeSubcluster);
+ + rr.getResourceName() + ", picked a random subcluster to forward:"
+ + targetId);
}
-
- // If home-subcluster is not active, ignore node/rack request
- if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
- if (targetIds != null && targetIds.size() > 0) {
- bookkeeper.addRackRR(homeSubcluster, rr);
- } else {
- bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
- }
+ if (targetIds != null && targetIds.size() > 0) {
+ bookkeeper.addRackRR(targetId, rr);
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "
- + "defaulting to is not active, the ResourceRequest "
- + "will be ignored.");
- }
+ bookkeeper.addLocalizedNodeRR(targetId, rr);
}
}
@@ -314,6 +311,14 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
}
/**
+ * For unit test to override.
+ */
+ protected SubClusterId getSubClusterForUnResolvedRequest(
+ AllocationBookkeeper bookKeeper, long allocationId) {
+ return bookKeeper.getSubClusterForUnResolvedRequest(allocationId);
+ }
+
+ /**
* It splits a list of non-localized resource requests among sub-clusters.
*/
private void splitAnyRequests(List<ResourceRequest> originalResourceRequests,
@@ -512,10 +517,11 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
* This helper class is used to book-keep the requests made to each
* subcluster, and maintain useful statistics to split ANY requests.
*/
- private final class AllocationBookkeeper {
+ protected final class AllocationBookkeeper {
// the answer being accumulated
private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>();
+ private Map<SubClusterId, Set<Long>> maskForRackDeletion = new HashMap<>();
// stores how many containers we have allocated in each RM for localized
// asks, used to correctly "spread" the corresponding ANY
@@ -523,6 +529,10 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
new HashMap<>();
private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>();
+ // Store the randomly selected subClusterId for unresolved resource requests
+ // keyed by requestId
+ private Map<Long, SubClusterId> unResolvedRequestLocation = new HashMap<>();
+
private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
private float totHeadroomMemory = 0;
private int totHeadRoomEnabledRMs = 0;
@@ -538,6 +548,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
// reset data structures
answer.clear();
+ maskForRackDeletion.clear();
countContainersPerRM.clear();
totNumLocalizedContainers.clear();
activeAndEnabledSC.clear();
@@ -628,16 +639,16 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
.addAndGet(rr.getNumContainers());
}
- internalAddToAnswer(targetId, rr);
+ internalAddToAnswer(targetId, rr, false);
}
/**
* Add a rack-local request to the final asnwer.
*/
- public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
+ private void addRackRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions
.checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
- internalAddToAnswer(targetId, rr);
+ internalAddToAnswer(targetId, rr, true);
}
/**
@@ -646,11 +657,18 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions
.checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName()));
- internalAddToAnswer(targetId, rr);
+ internalAddToAnswer(targetId, rr, false);
}
private void internalAddToAnswer(SubClusterId targetId,
- ResourceRequest partialRR) {
+ ResourceRequest partialRR, boolean isRack) {
+ if (!isRack) {
+ if (!maskForRackDeletion.containsKey(targetId)) {
+ maskForRackDeletion.put(targetId, new HashSet<Long>());
+ }
+ maskForRackDeletion.get(targetId)
+ .add(partialRR.getAllocationRequestId());
+ }
if (!answer.containsKey(targetId)) {
answer.put(targetId, new ArrayList<ResourceRequest>());
}
@@ -658,6 +676,27 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
}
/**
+ * For requests whose location cannot be resolved, choose an active and
+ * enabled sub-cluster to forward this requestId to.
+ */
+ private SubClusterId getSubClusterForUnResolvedRequest(long allocationId) {
+ if (unResolvedRequestLocation.containsKey(allocationId)) {
+ return unResolvedRequestLocation.get(allocationId);
+ }
+ int id = rand.nextInt(activeAndEnabledSC.size());
+ for (SubClusterId subclusterId : activeAndEnabledSC) {
+ if (id == 0) {
+ unResolvedRequestLocation.put(allocationId, subclusterId);
+ return subclusterId;
+ }
+ id--;
+ }
+ throw new RuntimeException(
+ "Should not be here. activeAndEnabledSC size = "
+ + activeAndEnabledSC.size() + " id = " + id);
+ }
+
+ /**
* Return all known subclusters associated with an allocation id.
*
* @param allocationId the allocation id considered
@@ -678,6 +717,28 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
* @return the answer
*/
private Map<SubClusterId, List<ResourceRequest>> getAnswer() {
+ Iterator<Entry<SubClusterId, List<ResourceRequest>>> answerIter =
+ answer.entrySet().iterator();
+ // Remove redundant rack RR before returning the answer
+ while (answerIter.hasNext()) {
+ Entry<SubClusterId, List<ResourceRequest>> entry = answerIter.next();
+ SubClusterId scId = entry.getKey();
+ Set<Long> mask = maskForRackDeletion.get(scId);
+ if (mask != null) {
+ Iterator<ResourceRequest> rrIter = entry.getValue().iterator();
+ while (rrIter.hasNext()) {
+ ResourceRequest rr = rrIter.next();
+ if (!mask.contains(rr.getAllocationRequestId())) {
+ rrIter.remove();
+ }
+ }
+ }
+ if (mask == null || entry.getValue().size() == 0) {
+ answerIter.remove();
+ LOG.info("removing {} from output because it has only rack RR",
+ scId);
+ }
+ }
return answer;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7ed458b2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
index cf9ac53..c49ab60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
@@ -69,12 +69,12 @@ public class TestLocalityMulticastAMRMProxyPolicy
@Before
public void setUp() throws Exception {
- setPolicy(new LocalityMulticastAMRMProxyPolicy());
+ setPolicy(new TestableLocalityMulticastAMRMProxyPolicy());
setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
- // simulate 20 subclusters with a 5% chance of being inactive
+ // Six sub-clusters with one inactive and one disabled
for (int i = 0; i < 6; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i);
// sub-cluster 3 is not active
@@ -207,6 +207,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
getPolicyInfo().setHeadroomAlpha(1.0f);
initializePolicy();
+ addHomeSubClusterAsActive();
int numRR = 1000;
List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
@@ -324,14 +325,11 @@ public class TestLocalityMulticastAMRMProxyPolicy
null, Collections.<NMToken> emptyList());
}
- @Test
- public void testSplitAllocateRequest() throws Exception {
-
- // Test a complex List<ResourceRequest> is split correctly
- initializePolicy();
-
- // modify default initialization to include a "homesubcluster"
- // which we will use as the default for when nodes or racks are unknown
+ /**
+ * modify default initialization to include a "homesubcluster" which we will
+ * use as the default for when nodes or racks are unknown.
+ */
+ private void addHomeSubClusterAsActive() {
SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(getHomeSubCluster());
@@ -340,6 +338,14 @@ public class TestLocalityMulticastAMRMProxyPolicy
getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f);
getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f);
+ }
+
+ @Test
+ public void testSplitAllocateRequest() throws Exception {
+
+ // Test a complex List<ResourceRequest> is split correctly
+ initializePolicy();
+ addHomeSubClusterAsActive();
FederationPoliciesTestUtil.initializePolicyContext(
getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
@@ -502,7 +508,8 @@ public class TestLocalityMulticastAMRMProxyPolicy
// Test target Ids
for (SubClusterId targetId : split.keySet()) {
- Assert.assertTrue("Target subclusters should be in the active set",
+ Assert.assertTrue(
+ "Target subcluster " + targetId + " should be in the active set",
getActiveSubclusters().containsKey(targetId));
Assert.assertTrue(
"Target subclusters (" + targetId + ") should have weight >0 in "
@@ -787,4 +794,28 @@ public class TestLocalityMulticastAMRMProxyPolicy
checkTotalContainerAllocation(response, 100);
}
+ /**
+ * A testable version of LocalityMulticastAMRMProxyPolicy that
+ * deterministically falls back to home sub-cluster for unresolved requests.
+ */
+ private class TestableLocalityMulticastAMRMProxyPolicy
+ extends LocalityMulticastAMRMProxyPolicy {
+ @Override
+ protected SubClusterId getSubClusterForUnResolvedRequest(
+ AllocationBookkeeper bookkeeper, long allocationId) {
+ SubClusterId originalResult =
+ super.getSubClusterForUnResolvedRequest(bookkeeper, allocationId);
+ Map<SubClusterId, SubClusterInfo> activeClusters = null;
+ try {
+ activeClusters = getActiveSubclusters();
+ } catch (YarnException e) {
+ throw new RuntimeException(e);
+ }
+ // The randomly selected sub-cluster should at least be active
+ Assert.assertTrue(activeClusters.containsKey(originalResult));
+
+ // Alwasy use home sub-cluster so that unit test is deterministic
+ return getHomeSubCluster();
+ }
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org