You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/31 15:55:40 UTC
[10/37] hadoop git commit: YARN-7783. Add validation step to ensure
constraints are not violated due to order in which a request is processed.
(asuresh)
YARN-7783. Add validation step to ensure constraints are not violated due to order in which a request is processed. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4c539fc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4c539fc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4c539fc
Branch: refs/heads/YARN-6592
Commit: a4c539fcdba817e313b2375abf2c4c9a1d13a4fd
Parents: 9b81cb0
Author: Arun Suresh <as...@apache.org>
Authored: Tue Jan 23 08:15:58 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800
----------------------------------------------------------------------
.../algorithm/DefaultPlacementAlgorithm.java | 119 +++++++++++++++++--
.../constraint/TestPlacementProcessor.java | 49 ++++++++
2 files changed, 155 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4c539fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 9887749..4e6473f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -69,13 +70,9 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
public boolean attemptPlacementOnNode(ApplicationId appId,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
throws InvalidAllocationTagsQueryException {
- int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
- if (numAllocs > 0) {
- if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
- schedulingRequest, schedulerNode,
- constraintManager, tagsManager)) {
- return true;
- }
+ if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
+ schedulingRequest, schedulerNode, constraintManager, tagsManager)) {
+ return true;
}
return false;
}
@@ -93,6 +90,9 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
int rePlacementCount = RE_ATTEMPT_COUNT;
while (rePlacementCount > 0) {
doPlacement(requests, resp, allNodes, rejectedRequests);
+ // Double check if placement constraints are really satisfied
+ validatePlacement(requests.getApplicationId(), resp,
+ rejectedRequests);
if (rejectedRequests.size() == 0 || rePlacementCount == 1) {
break;
}
@@ -122,9 +122,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
break;
}
SchedulingRequest schedulingRequest = requestIterator.next();
+ PlacedSchedulingRequest placedReq =
+ new PlacedSchedulingRequest(schedulingRequest);
+ placedReq.setPlacementAttempt(requests.getPlacementAttempt());
+ resp.getPlacedRequests().add(placedReq);
CircularIterator<SchedulerNode> nodeIter =
new CircularIterator(lastSatisfiedNode, nIter, allNodes);
- int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
+ int numAllocs =
+ schedulingRequest.getResourceSizing().getNumAllocations();
while (nodeIter.hasNext() && numAllocs > 0) {
SchedulerNode node = nodeIter.next();
try {
@@ -135,11 +140,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
requests.getApplicationId(), schedulingRequest, node)) {
schedulingRequest.getResourceSizing()
.setNumAllocations(--numAllocs);
- PlacedSchedulingRequest placedReq =
- new PlacedSchedulingRequest(schedulingRequest);
- placedReq.setPlacementAttempt(requests.getPlacementAttempt());
placedReq.getNodes().add(node);
- resp.getPlacedRequests().add(placedReq);
numAllocs =
schedulingRequest.getResourceSizing().getNumAllocations();
// Add temp-container tags for current placement cycle
@@ -156,6 +157,98 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
// Add all requests whose numAllocations still > 0 to rejected list.
requests.getSchedulingRequests().stream()
.filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
- .forEach(rejReq -> rejectedRequests.add(rejReq));
+ .forEach(rejReq -> rejectedRequests.add(cloneReq(rejReq)));
}
+
+ /**
+ * During the placement phase, allocation tags are added to the node if the
+ * constraint is satisfied, But depending on the order in which the
+ * algorithm sees the request, it is possible that a constraint that happened
+ * to be valid during placement of an earlier-seen request, might not be
+ * valid after all subsequent requests have been placed.
+ *
+ * For eg:
+ * Assume nodes n1, n2, n3, n4 and n5
+ *
+ * Consider the 2 constraints:
+ * 1) "foo", anti-affinity with "foo"
+ * 2) "bar", anti-affinity with "foo"
+ *
+ * And 2 requests
+ * req1: NumAllocations = 4, allocTags = [foo]
+ * req2: NumAllocations = 1, allocTags = [bar]
+ *
+ * If "req1" is seen first, the algorithm can place the 4 containers in
+ * n1, n2, n3 and n4. And when it gets to "req2", it will see that 4 nodes
+ * with the "foo" tag and will place on n5.
+ * But if "req2" is seem first, then "bar" will be placed on any node,
+ * since no node currently has "foo", and when it gets to "req1", since
+ * "foo" has not anti-affinity with "bar", the algorithm can end up placing
+ * "foo" on a node with "bar" violating the second constraint.
+ *
+ * To prevent the above, we need a validation step: after the placements for a
+ * batch of requests are made, for each req, we remove its tags from the node
+ * and try to see of constraints are still satisfied if the tag were to be
+ * added back on the node.
+ *
+ * When applied to the example above, after "req2" and "req1" are placed,
+ * we remove the "bar" tag from the node and try to add it back on the node.
+ * This time, constraint satisfaction will fail, since there is now a "foo"
+ * tag on the node and "bar" cannot be added. The algorithm will then
+ * retry placing "req2" on another node.
+ *
+ * @param applicationId
+ * @param resp
+ * @param rejectedRequests
+ */
+ private void validatePlacement(ApplicationId applicationId,
+ ConstraintPlacementAlgorithmOutput resp,
+ List<SchedulingRequest> rejectedRequests) {
+ Iterator<PlacedSchedulingRequest> pReqIter =
+ resp.getPlacedRequests().iterator();
+ while (pReqIter.hasNext()) {
+ PlacedSchedulingRequest pReq = pReqIter.next();
+ Iterator<SchedulerNode> nodeIter = pReq.getNodes().iterator();
+ // Assuming all reqs were satisfied.
+ int num = 0;
+ while (nodeIter.hasNext()) {
+ SchedulerNode node = nodeIter.next();
+ try {
+ // Remove just the tags for this placement.
+ this.tagsManager.removeTempTags(node.getNodeID(),
+ applicationId, pReq.getSchedulingRequest().getAllocationTags());
+ if (!attemptPlacementOnNode(
+ applicationId, pReq.getSchedulingRequest(), node)) {
+ nodeIter.remove();
+ num++;
+ } else {
+ // Add back the tags if everything is fine.
+ this.tagsManager.addTempTags(node.getNodeID(),
+ applicationId, pReq.getSchedulingRequest().getAllocationTags());
+ }
+ } catch (InvalidAllocationTagsQueryException e) {
+ LOG.warn("Got exception from TagManager !", e);
+ }
+ }
+ if (num > 0) {
+ SchedulingRequest sReq = cloneReq(pReq.getSchedulingRequest());
+ sReq.getResourceSizing().setNumAllocations(num);
+ rejectedRequests.add(sReq);
+ }
+ if (pReq.getNodes().isEmpty()) {
+ pReqIter.remove();
+ }
+ }
+ }
+
+ private static SchedulingRequest cloneReq(SchedulingRequest sReq) {
+ return SchedulingRequest.newInstance(
+ sReq.getAllocationRequestId(), sReq.getPriority(),
+ sReq.getExecutionType(), sReq.getAllocationTags(),
+ ResourceSizing.newInstance(
+ sReq.getResourceSizing().getNumAllocations(),
+ sReq.getResourceSizing().getResources()),
+ sReq.getPlacementConstraint());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4c539fc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index 65daeb8..8426b20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -151,6 +151,55 @@ public class TestPlacementProcessor {
}
@Test(timeout = 300000)
+ public void testMutualAntiAffinityPlacement() throws Exception {
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ MockNM nm5 = new MockNM("h5:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm5.getNodeId(), nm5);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+ nm5.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ // Containers with allocationTag 'foo' are restricted to 1 per NODE
+ Map<Set<String>, PlacementConstraint> pcMap = new HashMap<>();
+ pcMap.put(Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+ pcMap.put(Collections.singleton("bar"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, pcMap);
+ am1.addSchedulingRequest(
+ Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 4, 1, 512, "foo"),
+ schedulingRequest(1, 5, 1, 512, "foo")));
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List<Container> allocatedContainers = new ArrayList<>();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+ // kick the scheduler
+ waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5);
+
+ Assert.assertEquals(5, allocatedContainers.size());
+ Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
+ .collect(Collectors.toSet());
+ // Ensure unique nodes (antiaffinity)
+ Assert.assertEquals(5, nodeIds.size());
+ }
+
+ @Test(timeout = 300000)
public void testCardinalityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 8192, rm.getResourceTrackerService());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org