You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/08/26 00:44:47 UTC
[35/50] [abbrv] hadoop git commit: YARN-8015. Support all types of
placement constraint support for Capacity Scheduler. Contributed by Weiwei
Yang.
YARN-8015. Support all types of placement constraint support for Capacity Scheduler. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ac01444
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ac01444
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ac01444
Branch: refs/heads/HDFS-12943
Commit: 1ac01444a24faee6f74f2e83d9521eb4e0be651b
Parents: b021249
Author: Sunil G <su...@apache.org>
Authored: Thu Aug 23 10:05:43 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Thu Aug 23 10:05:43 2018 +0530
----------------------------------------------------------------------
.../SingleConstraintAppPlacementAllocator.java | 175 ++------
.../yarn/server/resourcemanager/MockRM.java | 35 +-
...estSchedulingRequestContainerAllocation.java | 438 ++++++++++++++++++-
...stSingleConstraintAppPlacementAllocator.java | 78 ----
4 files changed, 509 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ac01444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
index 914f35d..54e4666 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
@@ -19,18 +19,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -48,12 +45,12 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
/**
@@ -70,7 +67,6 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
private SchedulingRequest schedulingRequest = null;
private String targetNodePartition;
- private Set<String> targetAllocationTags;
private AllocationTagsManager allocationTagsManager;
private PlacementConstraintManager placementConstraintManager;
@@ -239,135 +235,55 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
"Only GUARANTEED execution type is supported.");
}
- // Node partition
- String nodePartition = null;
- // Target allocation tags
- Set<String> targetAllocationTags = null;
-
- PlacementConstraint constraint =
- newSchedulingRequest.getPlacementConstraint();
-
- if (constraint != null) {
- // We only accept SingleConstraint
- PlacementConstraint.AbstractConstraint ac = constraint
- .getConstraintExpr();
- if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
- throwExceptionWithMetaInfo("Only accepts "
- + PlacementConstraint.SingleConstraint.class.getName()
- + " as constraint-expression. Rejecting the new added "
- + "constraint-expression.class=" + ac.getClass().getName());
- }
-
- PlacementConstraint.SingleConstraint singleConstraint =
- (PlacementConstraint.SingleConstraint) ac;
-
- // Make sure it is an anti-affinity request (actually this implementation
- // should be able to support both affinity / anti-affinity without much
- // effort. Considering potential test effort required. Limit to
- // anti-affinity to intra-app and scope is node.
- if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
- throwExceptionWithMetaInfo(
- "Only support scope=" + PlacementConstraints.NODE
- + "now. PlacementConstraint=" + singleConstraint);
- }
-
- if (singleConstraint.getMinCardinality() != 0
- || singleConstraint.getMaxCardinality() != 0) {
- throwExceptionWithMetaInfo(
- "Only support anti-affinity, which is: minCardinality=0, "
- + "maxCardinality=1");
- }
-
- Set<PlacementConstraint.TargetExpression> targetExpressionSet =
- singleConstraint.getTargetExpressions();
- if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
- throwExceptionWithMetaInfo(
- "TargetExpression should not be null or empty");
- }
-
- for (PlacementConstraint.TargetExpression targetExpression :
- targetExpressionSet) {
- // Handle node partition
- if (targetExpression.getTargetType().equals(
- PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
- // For node attribute target, we only support Partition now. And once
- // YARN-3409 is merged, we will support node attribute.
- if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
- throwExceptionWithMetaInfo("When TargetType="
- + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
- + " only " + NODE_PARTITION + " is accepted as TargetKey.");
- }
-
- if (nodePartition != null) {
- // This means we have duplicated node partition entry
- // inside placement constraint, which might be set by mistake.
- throwExceptionWithMetaInfo(
- "Only one node partition targetExpression is allowed");
- }
-
- Set<String> values = targetExpression.getTargetValues();
- if (values == null || values.isEmpty()) {
- nodePartition = RMNodeLabelsManager.NO_LABEL;
- continue;
- }
+ this.targetNodePartition = validateAndGetTargetNodePartition(
+ newSchedulingRequest.getPlacementConstraint());
+ this.schedulingRequest = new SchedulingRequestPBImpl(
+ ((SchedulingRequestPBImpl) newSchedulingRequest).getProto());
- if (values.size() > 1) {
- throwExceptionWithMetaInfo("Inside one targetExpression, we only "
- + "support affinity to at most one node partition now");
- }
- nodePartition = values.iterator().next();
- } else if (targetExpression.getTargetType().equals(
- PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
- // Handle allocation tags
- if (targetAllocationTags != null) {
- // This means we have duplicated AllocationTag expressions entries
- // inside placement constraint, which might be set by mistake.
- throwExceptionWithMetaInfo(
- "Only one AllocationTag targetExpression is allowed");
- }
+ LOG.info("Successfully added SchedulingRequest to app="
+ + appSchedulingInfo.getApplicationAttemptId()
+ + " placementConstraint=["
+ + schedulingRequest.getPlacementConstraint()
+ + "]. nodePartition=" + targetNodePartition);
+ }
- if (targetExpression.getTargetValues() == null ||
- targetExpression.getTargetValues().isEmpty()) {
- throwExceptionWithMetaInfo("Failed to find allocation tags from "
- + "TargetExpressions or couldn't find self-app target.");
+ // Tentatively find out potential exist node-partition in the placement
+ // constraint and set as the app's primary node-partition.
+ // Currently only single constraint is handled.
+ private String validateAndGetTargetNodePartition(
+ PlacementConstraint placementConstraint) {
+ String nodePartition = RMNodeLabelsManager.NO_LABEL;
+ if (placementConstraint != null &&
+ placementConstraint.getConstraintExpr() != null) {
+ PlacementConstraint.AbstractConstraint ac =
+ placementConstraint.getConstraintExpr();
+ if (ac != null && ac instanceof PlacementConstraint.SingleConstraint) {
+ PlacementConstraint.SingleConstraint singleConstraint =
+ (PlacementConstraint.SingleConstraint) ac;
+ for (PlacementConstraint.TargetExpression targetExpression :
+ singleConstraint.getTargetExpressions()) {
+ // Handle node partition
+ if (targetExpression.getTargetType().equals(NODE_ATTRIBUTE) &&
+ targetExpression.getTargetKey().equals(NODE_PARTITION)) {
+ Set<String> values = targetExpression.getTargetValues();
+ if (values == null || values.isEmpty()) {
+ continue;
+ }
+ if (values.size() > 1) {
+ throwExceptionWithMetaInfo(
+ "Inside one targetExpression, we only support"
+ + " affinity to at most one node partition now");
+ }
+ nodePartition = values.iterator().next();
+ if (nodePartition != null) {
+ break;
+ }
}
-
- targetAllocationTags = new HashSet<>(
- targetExpression.getTargetValues());
}
}
-
- if (targetAllocationTags == null) {
- // That means we don't have ALLOCATION_TAG specified
- throwExceptionWithMetaInfo(
- "Couldn't find target expression with type == ALLOCATION_TAG,"
- + " it is required to include one and only one target"
- + " expression with type == ALLOCATION_TAG");
- }
- }
-
- // If this scheduling request doesn't contain a placement constraint,
- // we set allocation tags an empty set.
- if (targetAllocationTags == null) {
- targetAllocationTags = ImmutableSet.of();
- }
-
- if (nodePartition == null) {
- nodePartition = RMNodeLabelsManager.NO_LABEL;
}
-
- // Validation is done. set local results:
- this.targetNodePartition = nodePartition;
- this.targetAllocationTags = targetAllocationTags;
-
- this.schedulingRequest = new SchedulingRequestPBImpl(
- ((SchedulingRequestPBImpl) newSchedulingRequest).getProto());
-
- LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo
- .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils
- .join(",", targetAllocationTags) + "]. nodePartition="
- + targetNodePartition);
+ return nodePartition;
}
@Override
@@ -515,11 +431,6 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
return targetNodePartition;
}
- @VisibleForTesting
- Set<String> getTargetAllocationTags() {
- return targetAllocationTags;
- }
-
@Override
public void initialize(AppSchedulingInfo appSchedulingInfo,
SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ac01444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index eb4c626..2ad4391 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -513,6 +513,19 @@ public class MockRM extends ResourceManager {
return submitApp(masterMemory, false);
}
+ public RMApp submitApp(int masterMemory, Set<String> appTags)
+ throws Exception {
+ Resource resource = Resource.newInstance(masterMemory, 0);
+ ResourceRequest amResourceRequest = ResourceRequest.newInstance(
+ Priority.newInstance(0), ResourceRequest.ANY, resource, 1);
+ return submitApp(Collections.singletonList(amResourceRequest), "",
+ UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
+ null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
+ false, false, null, 0, null, true, Priority.newInstance(0), null,
+ null, null, appTags);
+ }
+
public RMApp submitApp(int masterMemory, Priority priority) throws Exception {
Resource resource = Resource.newInstance(masterMemory, 0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
@@ -732,8 +745,23 @@ public class MockRM extends ResourceManager {
LogAggregationContext logAggregationContext,
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
Map<ApplicationTimeoutType, Long> applicationTimeouts,
- ByteBuffer tokensConf)
- throws Exception {
+ ByteBuffer tokensConf) throws Exception {
+ return submitApp(amResourceRequests, name, user, acls, unmanaged, queue,
+ maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
+ isAppIdProvided, applicationId, attemptFailuresValidityInterval,
+ logAggregationContext, cancelTokensWhenComplete, priority, amLabel,
+ applicationTimeouts, tokensConf, null);
+ }
+
+ public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
+ String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
+ String queue, int maxAppAttempts, Credentials ts, String appType,
+ boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+ ApplicationId applicationId, long attemptFailuresValidityInterval,
+ LogAggregationContext logAggregationContext,
+ boolean cancelTokensWhenComplete, Priority priority, String amLabel,
+ Map<ApplicationTimeoutType, Long> applicationTimeouts,
+ ByteBuffer tokensConf, Set<String> applicationTags) throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@@ -749,6 +777,9 @@ public class MockRM extends ResourceManager {
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
+ if (applicationTags != null) {
+ sub.setApplicationTags(applicationTags);
+ }
if (applicationTimeouts != null && applicationTimeouts.size() > 0) {
sub.setApplicationTimeouts(applicationTimeouts);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ac01444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
index f23fd8f..26c709f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java
@@ -26,12 +26,18 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -46,10 +52,24 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
public class TestSchedulingRequestContainerAllocation {
- private final int GB = 1024;
+ private static final int GB = 1024;
private YarnConfiguration conf;
@@ -435,8 +455,7 @@ public class TestSchedulingRequestContainerAllocation {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- PlacementConstraint constraint = PlacementConstraints
- .targetNotIn("node", allocationTag("t1"))
+ PlacementConstraint constraint = targetNotIn("node", allocationTag("t1"))
.build();
SchedulingRequest sc = SchedulingRequest
.newInstance(0, Priority.newInstance(1),
@@ -477,4 +496,413 @@ public class TestSchedulingRequestContainerAllocation {
rm1.close();
}
+
+ private void doNodeHeartbeat(MockNM... nms) throws Exception {
+ for (MockNM nm : nms) {
+ nm.nodeHeartbeat(true);
+ }
+ }
+
+ private List<Container> waitForAllocation(int allocNum, int timeout,
+ MockAM am, MockNM... nms) throws Exception {
+ final List<Container> result = new ArrayList<>();
+ GenericTestUtils.waitFor(() -> {
+ try {
+ AllocateResponse response = am.schedule();
+ List<Container> allocated = response.getAllocatedContainers();
+ System.out.println("Expecting allocation: " + allocNum
+ + ", actual allocation: " + allocated.size());
+ for (Container c : allocated) {
+ System.out.println("Container " + c.getId().toString()
+ + " is allocated on node: " + c.getNodeId().toString()
+ + ", allocation tags: "
+ + String.join(",", c.getAllocationTags()));
+ }
+ result.addAll(allocated);
+ if (result.size() == allocNum) {
+ return true;
+ }
+ doNodeHeartbeat(nms);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return false;
+ }, 500, timeout);
+ return result;
+ }
+
+ private static SchedulingRequest schedulingRequest(int requestId,
+ int containers, int cores, int mem, PlacementConstraint constraint,
+ String... tags) {
+ return schedulingRequest(1, requestId, containers, cores, mem,
+ ExecutionType.GUARANTEED, constraint, tags);
+ }
+
+ private static SchedulingRequest schedulingRequest(
+ int priority, long allocReqId, int containers, int cores, int mem,
+ ExecutionType execType, PlacementConstraint constraint, String... tags) {
+ return SchedulingRequest.newBuilder()
+ .priority(Priority.newInstance(priority))
+ .allocationRequestId(allocReqId)
+ .allocationTags(new HashSet<>(Arrays.asList(tags)))
+ .executionType(ExecutionTypeRequest.newInstance(execType, true))
+ .resourceSizing(
+ ResourceSizing.newInstance(containers,
+ Resource.newInstance(mem, cores)))
+ .placementConstraintExpression(constraint)
+ .build();
+ }
+
+ private int getContainerNodesNum(List<Container> containers) {
+ Set<NodeId> nodes = new HashSet<>();
+ if (containers != null) {
+ containers.forEach(c -> nodes.add(c.getNodeId()));
+ }
+ return nodes.size();
+ }
+
+ @Test(timeout = 30000L)
+ public void testInterAppCompositeConstraints() throws Exception {
+ // This test both intra and inter app constraints.
+ // Including simple affinity, anti-affinity, cardinality constraints,
+ // and simple AND composite constraints.
+ YarnConfiguration config = new YarnConfiguration();
+ config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(config);
+ try {
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("192.168.0.1:1234", 100*GB, 100);
+ MockNM nm2 = rm.registerNode("192.168.0.2:1234", 100*GB, 100);
+ MockNM nm3 = rm.registerNode("192.168.0.3:1234", 100*GB, 100);
+ MockNM nm4 = rm.registerNode("192.168.0.4:1234", 100*GB, 100);
+ MockNM nm5 = rm.registerNode("192.168.0.5:1234", 100*GB, 100);
+
+ RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("hbase"));
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ // App1 (hbase)
+ // h1: hbase-master(1)
+ // h2: hbase-master(1)
+ // h3:
+ // h4:
+ // h5:
+ PlacementConstraint pc = targetNotIn("node",
+ allocationTag("hbase-master")).build();
+ am1.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 2, 1, 2048, pc, "hbase-master")));
+ List<Container> allocated = waitForAllocation(2, 3000, am1, nm1, nm2);
+
+ // 2 containers allocated
+ Assert.assertEquals(2, allocated.size());
+ // containers should be distributed on 2 different nodes
+ Assert.assertEquals(2, getContainerNodesNum(allocated));
+
+ // App1 (hbase)
+ // h1: hbase-rs(1), hbase-master(1)
+ // h2: hbase-rs(1), hbase-master(1)
+ // h3: hbase-rs(1)
+ // h4: hbase-rs(1)
+ // h5:
+ pc = targetNotIn("node", allocationTag("hbase-rs")).build();
+ am1.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(2, 4, 1, 1024, pc, "hbase-rs")));
+ allocated = waitForAllocation(4, 3000, am1, nm1, nm2, nm3, nm4, nm5);
+
+ Assert.assertEquals(4, allocated.size());
+ Assert.assertEquals(4, getContainerNodesNum(allocated));
+
+ // App2 (web-server)
+ // Web server instance has 2 instance and non of them can be co-allocated
+ // with hbase-master.
+ RMApp app2 = rm.submitApp(1*GB, ImmutableSet.of("web-server"));
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ // App2 (web-server)
+ // h1: hbase-rs(1), hbase-master(1)
+ // h2: hbase-rs(1), hbase-master(1)
+ // h3: hbase-rs(1), ws-inst(1)
+ // h4: hbase-rs(1), ws-inst(1)
+ // h5:
+ pc = and(
+ targetIn("node", allocationTagWithNamespace(
+ new TargetApplicationsNamespace.All().toString(),
+ "hbase-master")),
+ targetNotIn("node", allocationTag("ws-inst"))).build();
+ am2.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 2, 1, 2048, pc, "ws-inst")));
+ allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5);
+ Assert.assertEquals(2, allocated.size());
+ Assert.assertEquals(2, getContainerNodesNum(allocated));
+
+ ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
+ for (Container c : allocated) {
+ RMNode rmNode = rmNodes.get(c.getNodeId());
+ Assert.assertNotNull(rmNode);
+ Assert.assertTrue("If ws-inst is allocated to a node,"
+ + " this node should have inherited the ws-inst tag ",
+ rmNode.getAllocationTagsWithCount().get("ws-inst") == 1);
+ Assert.assertTrue("ws-inst should be co-allocated to "
+ + "hbase-master nodes",
+ rmNode.getAllocationTagsWithCount().get("hbase-master") == 1);
+ }
+
+ // App3 (ws-servant)
+ // App3 has multiple instances that must be co-allocated
+ // with app2 server instance, and each node cannot have more than
+ // 3 instances.
+ RMApp app3 = rm.submitApp(1*GB, ImmutableSet.of("ws-servants"));
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm3);
+
+
+ // App3 (ws-servant)
+ // h1: hbase-rs(1), hbase-master(1)
+ // h2: hbase-rs(1), hbase-master(1)
+ // h3: hbase-rs(1), ws-inst(1), ws-servant(3)
+ // h4: hbase-rs(1), ws-inst(1), ws-servant(3)
+ // h5:
+ pc = and(
+ targetIn("node", allocationTagWithNamespace(
+ new TargetApplicationsNamespace.AppTag("web-server").toString(),
+ "ws-inst")),
+ cardinality("node", 0, 2, "ws-servant")).build();
+ am3.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 10, 1, 512, pc, "ws-servant")));
+ // total 6 containers can be allocated due to cardinality constraint
+ // each round, 2 containers can be allocated
+ allocated = waitForAllocation(6, 10000, am3, nm1, nm2, nm3, nm4, nm5);
+ Assert.assertEquals(6, allocated.size());
+ Assert.assertEquals(2, getContainerNodesNum(allocated));
+
+ for (Container c : allocated) {
+ RMNode rmNode = rmNodes.get(c.getNodeId());
+ Assert.assertNotNull(rmNode);
+ Assert.assertTrue("Node has ws-servant allocated must have 3 instances",
+ rmNode.getAllocationTagsWithCount().get("ws-servant") == 3);
+ Assert.assertTrue("Every ws-servant container should be co-allocated"
+ + " with ws-inst",
+ rmNode.getAllocationTagsWithCount().get("ws-inst") == 1);
+ }
+ } finally {
+ rm.stop();
+ }
+ }
+
+ @Test(timeout = 30000L)
+ public void testMultiAllocationTagsConstraints() throws Exception {
+ // This test simulates to use PC to avoid port conflicts
+ YarnConfiguration config = new YarnConfiguration();
+ config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(config);
+ try {
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("192.168.0.1:1234", 10*GB, 10);
+ MockNM nm2 = rm.registerNode("192.168.0.2:1234", 10*GB, 10);
+ MockNM nm3 = rm.registerNode("192.168.0.3:1234", 10*GB, 10);
+ MockNM nm4 = rm.registerNode("192.168.0.4:1234", 10*GB, 10);
+ MockNM nm5 = rm.registerNode("192.168.0.5:1234", 10*GB, 10);
+
+ RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("server1"));
+ // Allocate AM container on nm1
+ doNodeHeartbeat(nm1);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ // App1 uses ports: 7000, 8000 and 9000
+ String[] server1Ports =
+ new String[] {"port_6000", "port_7000", "port_8000"};
+ PlacementConstraint pc = targetNotIn("node",
+ allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(),
+ server1Ports))
+ .build();
+ am1.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 2, 1, 1024, pc, server1Ports)));
+ List<Container> allocated = waitForAllocation(2, 3000,
+ am1, nm1, nm2, nm3, nm4, nm5);
+
+ // 2 containers allocated
+ Assert.assertEquals(2, allocated.size());
+ // containers should be distributed on 2 different nodes
+ Assert.assertEquals(2, getContainerNodesNum(allocated));
+
+ // App1 uses ports: 6000
+ String[] server2Ports = new String[] {"port_6000"};
+ RMApp app2 = rm.submitApp(1*GB, ImmutableSet.of("server2"));
+ // Allocate AM container on nm1
+ doNodeHeartbeat(nm2);
+ RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt();
+ MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId());
+ am2.registerAppAttempt();
+
+ pc = targetNotIn("node",
+ allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(),
+ server2Ports))
+ .build();
+ am2.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 3, 1, 1024, pc, server2Ports)));
+ allocated = waitForAllocation(3, 3000, am2, nm1, nm2, nm3, nm4, nm5);
+ Assert.assertEquals(3, allocated.size());
+ Assert.assertEquals(3, getContainerNodesNum(allocated));
+
+ ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
+ for (Container c : allocated) {
+ RMNode rmNode = rmNodes.get(c.getNodeId());
+ Assert.assertNotNull(rmNode);
+ Assert.assertTrue("server2 should not co-allocate to server1 as"
+ + " they both need to use port 6000",
+ rmNode.getAllocationTagsWithCount().get("port_6000") == 1);
+ Assert.assertFalse(rmNode.getAllocationTagsWithCount()
+ .containsKey("port_7000"));
+ Assert.assertFalse(rmNode.getAllocationTagsWithCount()
+ .containsKey("port_8000"));
+ }
+ } finally {
+ rm.stop();
+ }
+ }
+
+ @Test(timeout = 30000L)
+ public void testInterAppConstraintsWithNamespaces() throws Exception {
+ // This test verifies inter-app constraints with namespaces
+ // not-self/app-id/app-tag
+ YarnConfiguration config = new YarnConfiguration();
+ config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(config);
+ try {
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("192.168.0.1:1234:", 100*GB, 100);
+ MockNM nm2 = rm.registerNode("192.168.0.2:1234", 100*GB, 100);
+ MockNM nm3 = rm.registerNode("192.168.0.3:1234", 100*GB, 100);
+ MockNM nm4 = rm.registerNode("192.168.0.4:1234", 100*GB, 100);
+ MockNM nm5 = rm.registerNode("192.168.0.5:1234", 100*GB, 100);
+
+ ApplicationId app5Id = null;
+ Map<ApplicationId, List<Container>> allocMap = new HashMap<>();
+ // 10 apps and all containers are attached with foo tag
+ for (int i = 0; i<10; i++) {
+ // App1 ~ app5 tag "former5"
+ // App6 ~ app10 tag "latter5"
+ String applicationTag = i<5 ? "former5" : "latter5";
+ RMApp app = rm.submitApp(1*GB, ImmutableSet.of(applicationTag));
+ // Allocate AM container on nm1
+ doNodeHeartbeat(nm1, nm2, nm3, nm4, nm5);
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+
+ PlacementConstraint pc = targetNotIn("node", allocationTag("foo"))
+ .build();
+ am.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 3, 1, 1024, pc, "foo")));
+ List<Container> allocated = waitForAllocation(3, 3000,
+ am, nm1, nm2, nm3, nm4, nm5);
+ // Memorize containers that has app5 foo
+ if (i == 5) {
+ app5Id = am.getApplicationAttemptId().getApplicationId();
+ }
+ allocMap.put(am.getApplicationAttemptId().getApplicationId(),
+ allocated);
+ }
+
+ Assert.assertNotNull(app5Id);
+ Assert.assertEquals(3, getContainerNodesNum(allocMap.get(app5Id)));
+
+ // *** app-id
+ // Submit another app, use app-id constraint against app5
+ RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("xyz"));
+ // Allocate AM container on nm1
+ doNodeHeartbeat(nm1);
+ RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+ MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+ am1.registerAppAttempt();
+
+ PlacementConstraint pc = targetIn("node",
+ allocationTagWithNamespace(
+ new TargetApplicationsNamespace.AppID(app5Id).toString(),
+ "foo"))
+ .build();
+ am1.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 3, 1, 1024, pc, "foo")));
+ List<Container> allocated = waitForAllocation(3, 3000,
+ am1, nm1, nm2, nm3, nm4, nm5);
+
+ ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
+ List<Container> app5Alloc = allocMap.get(app5Id);
+ for (Container c : allocated) {
+ RMNode rmNode = rmNodes.get(c.getNodeId());
+ Assert.assertNotNull(rmNode);
+ Assert.assertTrue("This app is affinity with app-id/app5/foo "
+ + "containers",
+ app5Alloc.stream().anyMatch(
+ c5 -> c5.getNodeId() == c.getNodeId()));
+ }
+
+ // *** app-tag
+ RMApp app2 = rm.submitApp(1*GB);
+ // Allocate AM container on nm1
+ doNodeHeartbeat(nm2);
+ RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt();
+ MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId());
+ am2.registerAppAttempt();
+
+ pc = targetNotIn("node",
+ allocationTagWithNamespace(
+ new TargetApplicationsNamespace.AppTag("xyz").toString(),
+ "foo"))
+ .build();
+ am2.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 2, 1, 1024, pc, "foo")));
+ allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5);
+ Assert.assertEquals(2, allocated.size());
+
+ // none of them can be allocated to nodes that has app5 foo containers
+ for (Container c : app5Alloc) {
+ Assert.assertNotEquals(c.getNodeId(),
+ allocated.iterator().next().getNodeId());
+ }
+
+ // *** not-self
+ RMApp app3 = rm.submitApp(1*GB);
+ // Allocate AM container on nm1
+ doNodeHeartbeat(nm3);
+ RMAppAttempt app3attempt1 = app3.getCurrentAppAttempt();
+ MockAM am3 = rm.sendAMLaunched(app3attempt1.getAppAttemptId());
+ am3.registerAppAttempt();
+
+ pc = cardinality("node",
+ new TargetApplicationsNamespace.NotSelf().toString(),
+ 1, 1, "foo").build();
+ am3.addSchedulingRequest(
+ ImmutableList.of(
+ schedulingRequest(1, 1, 1, 1024, pc, "foo")));
+ allocated = waitForAllocation(1, 3000, am3, nm1, nm2, nm3, nm4, nm5);
+ Assert.assertEquals(1, allocated.size());
+ // All 5 containers should be allocated
+ Assert.assertTrue(rmNodes.get(allocated.iterator().next().getNodeId())
+ .getAllocationTagsWithCount().get("foo") == 2);
+ } finally {
+ rm.stop();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ac01444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
index ccf4281..902c6d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
-import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
@@ -131,8 +130,6 @@ public class TestSingleConstraintAppPlacementAllocator {
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build());
- Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
- allocator.getTargetAllocationTags());
Assert.assertEquals("", allocator.getTargetNodePartition());
// Valid (with partition)
@@ -147,8 +144,6 @@ public class TestSingleConstraintAppPlacementAllocator {
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build());
- Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
- allocator.getTargetAllocationTags());
Assert.assertEquals("x", allocator.getTargetNodePartition());
// Valid (without specifying node partition)
@@ -162,8 +157,6 @@ public class TestSingleConstraintAppPlacementAllocator {
.resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build());
- Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
- allocator.getTargetAllocationTags());
Assert.assertEquals("", allocator.getTargetNodePartition());
// Valid (with application Id target)
@@ -178,8 +171,6 @@ public class TestSingleConstraintAppPlacementAllocator {
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build());
// Allocation tags should not include application Id
- Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
- allocator.getTargetAllocationTags());
Assert.assertEquals("", allocator.getTargetNodePartition());
// Invalid (without sizing)
@@ -200,75 +191,6 @@ public class TestSingleConstraintAppPlacementAllocator {
.targetNotIn(PlacementConstraints.NODE).build())
.build(), true);
- // Invalid (with multiple allocation tags expression specified)
- assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
- ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
- .allocationRequestId(10L).priority(Priority.newInstance(1))
- .placementConstraintExpression(PlacementConstraints
- .targetNotIn(PlacementConstraints.NODE,
- PlacementConstraints.PlacementTargets
- .allocationTag("mapper"),
- PlacementConstraints.PlacementTargets
- .allocationTag("reducer"),
- PlacementConstraints.PlacementTargets.nodePartition(""))
- .build()).resourceSizing(
- ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
- .build(), true);
-
- // Invalid (with multiple node partition target expression specified)
- assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
- ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
- .allocationRequestId(10L).priority(Priority.newInstance(1))
- .placementConstraintExpression(PlacementConstraints
- .targetNotIn(PlacementConstraints.NODE,
- PlacementConstraints.PlacementTargets
- .allocationTag("mapper"),
- PlacementConstraints.PlacementTargets
- .allocationTag(""),
- PlacementConstraints.PlacementTargets.nodePartition("x"))
- .build()).resourceSizing(
- ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
- .build(), true);
-
- // Invalid (not anti-affinity cardinality)
- assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
- ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
- .allocationRequestId(10L).priority(Priority.newInstance(1))
- .placementConstraintExpression(PlacementConstraints
- .targetCardinality(PlacementConstraints.NODE, 1, 2,
- PlacementConstraints.PlacementTargets
- .allocationTag("mapper"),
- PlacementConstraints.PlacementTargets.nodePartition(""))
- .build()).resourceSizing(
- ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
- .build(), true);
-
- // Invalid (not anti-affinity cardinality)
- assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
- ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
- .allocationRequestId(10L).priority(Priority.newInstance(1))
- .placementConstraintExpression(PlacementConstraints
- .targetCardinality(PlacementConstraints.NODE, 0, 2,
- PlacementConstraints.PlacementTargets
- .allocationTag("mapper"),
- PlacementConstraints.PlacementTargets.nodePartition(""))
- .build()).resourceSizing(
- ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
- .build(), true);
-
- // Invalid (not NODE scope)
- assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
- ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
- .allocationRequestId(10L).priority(Priority.newInstance(1))
- .placementConstraintExpression(PlacementConstraints
- .targetNotIn(PlacementConstraints.RACK,
- PlacementConstraints.PlacementTargets
- .allocationTag("mapper", "reducer"),
- PlacementConstraints.PlacementTargets.nodePartition(""))
- .build()).resourceSizing(
- ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
- .build(), true);
-
// Invalid (not GUARANTEED)
assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC))
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org