You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 00:58:38 UTC
[17/50] [abbrv] hadoop git commit: YARN-6190. Validation and
synchronization fixes in LocalityMulticastAMRMProxyPolicy. (Botong Huang via
curino)
YARN-6190. Validation and synchronization fixes in LocalityMulticastAMRMProxyPolicy. (Botong Huang via curino)
(cherry picked from commit 5c486961cd3a175b122ef86275c99b72964f2c50)
(cherry picked from commit 8623644f4599f51d34ba79c4c1453b3997205d8f)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6191fac9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6191fac9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6191fac9
Branch: refs/heads/branch-2
Commit: 6191fac914d35db18e0ef7a4364abe07a156c7e2
Parents: 2dca88b
Author: Carlo Curino <cu...@apache.org>
Authored: Tue Feb 28 17:04:20 2017 -0800
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 16:25:08 2017 -0700
----------------------------------------------------------------------
.../LocalityMulticastAMRMProxyPolicy.java | 63 +++++++++++++-------
.../TestLocalityMulticastAMRMProxyPolicy.java | 21 ++++++-
.../policies/manager/BasePolicyManagerTest.java | 3 -
.../resolver/TestDefaultSubClusterResolver.java | 9 ++-
.../utils/FederationPoliciesTestUtil.java | 6 +-
5 files changed, 73 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6191fac9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
index 283f89e..6f97a51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
@@ -143,10 +144,9 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
Map<SubClusterId, Float> newWeightsConverted = new HashMap<>();
boolean allInactive = true;
WeightedPolicyInfo policy = getPolicyInfo();
- if (policy.getAMRMPolicyWeights() == null
- || policy.getAMRMPolicyWeights().size() == 0) {
- allInactive = false;
- } else {
+
+ if (policy.getAMRMPolicyWeights() != null
+ && policy.getAMRMPolicyWeights().size() > 0) {
for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights()
.entrySet()) {
if (e.getValue() > 0) {
@@ -180,7 +180,6 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
this.federationFacade =
policyContext.getFederationStateStoreFacade();
- this.bookkeeper = new AllocationBookkeeper();
this.homeSubcluster = policyContext.getHomeSubcluster();
}
@@ -197,7 +196,9 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
List<ResourceRequest> resourceRequests) throws YarnException {
// object used to accumulate statistics about the answer, initialize with
- // active subclusters.
+ // active subclusters. Create a new instance per call because this method
+ // can be called concurrently.
+ bookkeeper = new AllocationBookkeeper();
bookkeeper.reinitialize(federationFacade.getSubClusters(true));
List<ResourceRequest> nonLocalizedRequests =
@@ -238,12 +239,16 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
// we log altogether later
}
if (targetIds != null && targetIds.size() > 0) {
+ boolean hasActive = false;
for (SubClusterId tid : targetIds) {
if (bookkeeper.isActiveAndEnabled(tid)) {
bookkeeper.addRackRR(tid, rr);
+ hasActive = true;
}
}
- continue;
+ if (hasActive) {
+ continue;
+ }
}
// Handle node/rack requests that the SubClusterResolver cannot map to
@@ -347,7 +352,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
originalResourceRequest.getExecutionTypeRequest());
out.setAllocationRequestId(allocationId);
out.setNumContainers((int) Math.ceil(numContainer));
- if (out.isAnyLocation(out.getResourceName())) {
+ if (ResourceRequest.isAnyLocation(out.getResourceName())) {
allocationBookkeeper.addAnyRR(targetId, out);
} else {
allocationBookkeeper.addRackRR(targetId, out);
@@ -362,7 +367,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
*/
private float getLocalityBasedWeighting(long reqId, SubClusterId targetId,
AllocationBookkeeper allocationBookkeeper) {
- float totWeight = allocationBookkeeper.getTotNumLocalizedContainers();
+ float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(reqId);
float localWeight =
allocationBookkeeper.getNumLocalizedContainers(reqId, targetId);
return totWeight > 0 ? localWeight / totWeight : 0;
@@ -375,7 +380,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
private float getPolicyConfigWeighting(SubClusterId targetId,
AllocationBookkeeper allocationBookkeeper) {
float totWeight = allocationBookkeeper.totPolicyWeight;
- Float localWeight = weights.get(targetId);
+ Float localWeight = allocationBookkeeper.policyWeights.get(targetId);
return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0;
}
@@ -424,29 +429,36 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
// asks, used to correctly "spread" the corresponding ANY
private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM =
new HashMap<>();
+ private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>();
private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
- private long totNumLocalizedContainers = 0;
private float totHeadroomMemory = 0;
private int totHeadRoomEnabledRMs = 0;
+ private Map<SubClusterId, Float> policyWeights;
private float totPolicyWeight = 0;
private void reinitialize(
Map<SubClusterId, SubClusterInfo> activeSubclusters)
throws YarnException {
+ if (activeSubclusters == null) {
+ throw new YarnRuntimeException("null activeSubclusters received");
+ }
// reset data structures
answer.clear();
countContainersPerRM.clear();
+ totNumLocalizedContainers.clear();
activeAndEnabledSC.clear();
- totNumLocalizedContainers = 0;
totHeadroomMemory = 0;
totHeadRoomEnabledRMs = 0;
+ // save the reference locally in case the weights get reinitialized
+ // concurrently
+ policyWeights = weights;
totPolicyWeight = 0;
// pre-compute the set of subclusters that are both active and enabled by
// the policy weights, and accumulate their total weight
- for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) {
+ for (Map.Entry<SubClusterId, Float> entry : policyWeights.entrySet()) {
if (entry.getValue() > 0
&& activeSubclusters.containsKey(entry.getKey())) {
activeAndEnabledSC.add(entry.getKey());
@@ -467,7 +479,6 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
totHeadRoomEnabledRMs++;
}
}
-
}
/**
@@ -475,7 +486,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
* on a per-allocation-id and per-subcluster bases.
*/
private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
- Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
+ Preconditions
+ .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>());
@@ -488,7 +500,12 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
.addAndGet(rr.getNumContainers());
- totNumLocalizedContainers += rr.getNumContainers();
+ if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) {
+ totNumLocalizedContainers.put(rr.getAllocationRequestId(),
+ new AtomicLong(0));
+ }
+ totNumLocalizedContainers.get(rr.getAllocationRequestId())
+ .addAndGet(rr.getNumContainers());
internalAddToAnswer(targetId, rr);
}
@@ -497,7 +514,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
* Add a rack-local request to the final asnwer.
*/
public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
- Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
+ Preconditions
+ .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
internalAddToAnswer(targetId, rr);
}
@@ -505,7 +523,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
* Add an ANY request to the final answer.
*/
private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
- Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName()));
+ Preconditions
+ .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName()));
internalAddToAnswer(targetId, rr);
}
@@ -552,10 +571,12 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
}
/**
- * Return the total number of container coming from localized requests.
+ * Return the total number of container coming from localized requests
+ * matching an allocation Id.
*/
- private long getTotNumLocalizedContainers() {
- return totNumLocalizedContainers;
+ private long getTotNumLocalizedContainers(long allocationId) {
+ AtomicLong c = totNumLocalizedContainers.get(allocationId);
+ return c == null ? 0 : c.get();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6191fac9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
index 2654a06..5b3cf74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -117,6 +119,21 @@ public class TestLocalityMulticastAMRMProxyPolicy
getActiveSubclusters());
}
+ @Test(expected = FederationPolicyInitializationException.class)
+ public void testNullWeights() throws Exception {
+ getPolicyInfo().setAMRMPolicyWeights(null);
+ initializePolicy();
+ fail();
+ }
+
+ @Test(expected = FederationPolicyInitializationException.class)
+ public void testEmptyWeights() throws Exception {
+ getPolicyInfo()
+ .setAMRMPolicyWeights(new HashMap<SubClusterIdInfo, Float>());
+ initializePolicy();
+ fail();
+ }
+
@Test
public void testSplitBasedOnHeadroom() throws Exception {
@@ -154,7 +171,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
- ((FederationAMRMProxyPolicy) getPolicy())
+ response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
LOG.info("After headroom update");
@@ -332,7 +349,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
// we expect 5 entry for subcluster1 (4 from request-id 1, and part
// of the broadcast of request-id 2
- checkExpectedAllocation(response, "subcluster1", 5, 25);
+ checkExpectedAllocation(response, "subcluster1", 5, 26);
// sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the
// broadcast of request-id 2, and no request-id 0
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6191fac9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
index 3cf73b6..bd99cb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
@@ -89,9 +89,6 @@ public abstract class BasePolicyManagerTest {
FederationAMRMProxyPolicy federationAMRMProxyPolicy =
wfp2.getAMRMPolicy(context, null);
- // needed only for tests (getARMRMPolicy change the "type" in conf)
- fpc.setType(wfp.getClass().getCanonicalName());
-
FederationRouterPolicy federationRouterPolicy =
wfp2.getRouterPolicy(context, null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6191fac9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
index 7396942..25d246e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/resolver/TestDefaultSubClusterResolver.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.resolver;
+import java.io.File;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
@@ -46,8 +47,10 @@ public class TestDefaultSubClusterResolver {
throw new RuntimeException(
"Could not find 'nodes' dummy file in classpath");
}
+ // This will get rid of the beginning '/' in the url in Windows env
+ File file = new File(url.getPath());
- conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
+ conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
resolver.setConf(conf);
resolver.load();
}
@@ -62,8 +65,10 @@ public class TestDefaultSubClusterResolver {
throw new RuntimeException(
"Could not find 'nodes-malformed' dummy file in classpath");
}
+ // This will get rid of the beginning '/' in the url in Windows env
+ File file = new File(url.getPath());
- conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
+ conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
resolver.setConf(conf);
resolver.load();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6191fac9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
index 85fdc96..acc14dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.*;
import org.apache.hadoop.yarn.util.Records;
+import java.io.File;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -162,7 +163,10 @@ public final class FederationPoliciesTestUtil {
throw new RuntimeException(
"Could not find 'nodes' dummy file in classpath");
}
- conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
+ // This will get rid of the beginning '/' in the url in Windows env
+ File file = new File(url.getPath());
+
+ conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
resolver.setConf(conf);
resolver.load();
return resolver;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org