You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/02/02 18:29:08 UTC
hadoop git commit: YARN-7839. Modify PlacementAlgorithm to Check node
capacity before placing request on node. (Panagiotis Garefalakis via asuresh)
Repository: hadoop
Updated Branches:
refs/heads/trunk 460d77bd6 -> 6e5ba9366
YARN-7839. Modify PlacementAlgorithm to Check node capacity before placing request on node. (Panagiotis Garefalakis via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e5ba936
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e5ba936
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e5ba936
Branch: refs/heads/trunk
Commit: 6e5ba9366fc05719906ff2789b1a0fd26001182b
Parents: 460d77b
Author: Arun Suresh <as...@apache.org>
Authored: Fri Feb 2 10:28:22 2018 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Feb 2 10:28:22 2018 -0800
----------------------------------------------------------------------
.../scheduler/capacity/CapacityScheduler.java | 4 -
.../algorithm/DefaultPlacementAlgorithm.java | 61 ++++++++++----
.../api/ConstraintPlacementAlgorithmOutput.java | 5 +-
.../SchedulingRequestWithPlacementAttempt.java | 52 ++++++++++++
.../constraint/processor/BatchedRequests.java | 2 +-
.../processor/PlacementDispatcher.java | 12 +--
.../processor/PlacementProcessor.java | 28 +++++--
.../constraint/TestPlacementProcessor.java | 87 +++++++++++++++++++-
8 files changed, 215 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cb01351..d3aa5cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2609,10 +2609,6 @@ public class CapacityScheduler extends
" but only 1 will be attempted !!");
}
if (!appAttempt.isStopped()) {
- Resource resource =
- schedulingRequest.getResourceSizing().getResources();
- schedulingRequest.getResourceSizing().setResources(
- getNormalizedResource(resource));
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
resourceCommitRequest = createResourceCommitRequest(
appAttempt, schedulingRequest, schedulerNode);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 4e6473f..710e6c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -18,10 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -35,8 +40,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.Co
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,25 +64,31 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
private LocalAllocationTagsManager tagsManager;
private PlacementConstraintManager constraintManager;
private NodeCandidateSelector nodeSelector;
+ private ResourceCalculator resourceCalculator;
@Override
public void init(RMContext rmContext) {
this.tagsManager = new LocalAllocationTagsManager(
rmContext.getAllocationTagsManager());
this.constraintManager = rmContext.getPlacementConstraintManager();
+ this.resourceCalculator = rmContext.getScheduler().getResourceCalculator();
this.nodeSelector =
filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
.getNodes(filter);
}
- public boolean attemptPlacementOnNode(ApplicationId appId,
- SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
+ boolean attemptPlacementOnNode(ApplicationId appId,
+ Resource availableResources, SchedulingRequest schedulingRequest,
+ SchedulerNode schedulerNode, boolean ignoreResourceCheck)
throws InvalidAllocationTagsQueryException {
- if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
- schedulingRequest, schedulerNode, constraintManager, tagsManager)) {
- return true;
- }
- return false;
+ boolean fitsInNode = ignoreResourceCheck ||
+ Resources.fitsIn(resourceCalculator,
+ schedulingRequest.getResourceSizing().getResources(),
+ availableResources);
+ boolean constraintsSatisfied =
+ PlacementConstraintsUtil.canSatisfyConstraints(appId,
+ schedulingRequest, schedulerNode, constraintManager, tagsManager);
+ return fitsInNode && constraintsSatisfied;
}
@@ -82,17 +96,19 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
public void place(ConstraintPlacementAlgorithmInput input,
ConstraintPlacementAlgorithmOutputCollector collector) {
BatchedRequests requests = (BatchedRequests) input;
+ int placementAttempt = requests.getPlacementAttempt();
ConstraintPlacementAlgorithmOutput resp =
new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
List<SchedulingRequest> rejectedRequests = new ArrayList<>();
+ Map<NodeId, Resource> availResources = new HashMap<>();
int rePlacementCount = RE_ATTEMPT_COUNT;
while (rePlacementCount > 0) {
- doPlacement(requests, resp, allNodes, rejectedRequests);
+ doPlacement(requests, resp, allNodes, rejectedRequests, availResources);
// Double check if placement constraints are really satisfied
validatePlacement(requests.getApplicationId(), resp,
- rejectedRequests);
+ rejectedRequests, availResources);
if (rejectedRequests.size() == 0 || rePlacementCount == 1) {
break;
}
@@ -103,7 +119,10 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
rePlacementCount--;
}
- resp.getRejectedRequests().addAll(rejectedRequests);
+ resp.getRejectedRequests().addAll(
+ rejectedRequests.stream().map(
+ x -> new SchedulingRequestWithPlacementAttempt(
+ placementAttempt, x)).collect(Collectors.toList()));
collector.collect(resp);
// Clean current temp-container tags
this.tagsManager.cleanTempContainers(requests.getApplicationId());
@@ -112,7 +131,8 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
private void doPlacement(BatchedRequests requests,
ConstraintPlacementAlgorithmOutput resp,
List<SchedulerNode> allNodes,
- List<SchedulingRequest> rejectedRequests) {
+ List<SchedulingRequest> rejectedRequests,
+ Map<NodeId, Resource> availableResources) {
Iterator<SchedulingRequest> requestIterator = requests.iterator();
Iterator<SchedulerNode> nIter = allNodes.iterator();
SchedulerNode lastSatisfiedNode = null;
@@ -135,11 +155,17 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
try {
String tag = schedulingRequest.getAllocationTags() == null ? "" :
schedulingRequest.getAllocationTags().iterator().next();
+ Resource unallocatedResource =
+ availableResources.computeIfAbsent(node.getNodeID(),
+ x -> Resource.newInstance(node.getUnallocatedResource()));
if (!requests.getBlacklist(tag).contains(node.getNodeID()) &&
attemptPlacementOnNode(
- requests.getApplicationId(), schedulingRequest, node)) {
+ requests.getApplicationId(), unallocatedResource,
+ schedulingRequest, node, false)) {
schedulingRequest.getResourceSizing()
.setNumAllocations(--numAllocs);
+ Resources.addTo(unallocatedResource,
+ schedulingRequest.getResourceSizing().getResources());
placedReq.getNodes().add(node);
numAllocs =
schedulingRequest.getResourceSizing().getNumAllocations();
@@ -200,10 +226,12 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
* @param applicationId
* @param resp
* @param rejectedRequests
+ * @param availableResources
*/
private void validatePlacement(ApplicationId applicationId,
ConstraintPlacementAlgorithmOutput resp,
- List<SchedulingRequest> rejectedRequests) {
+ List<SchedulingRequest> rejectedRequests,
+ Map<NodeId, Resource> availableResources) {
Iterator<PlacedSchedulingRequest> pReqIter =
resp.getPlacedRequests().iterator();
while (pReqIter.hasNext()) {
@@ -217,10 +245,13 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
// Remove just the tags for this placement.
this.tagsManager.removeTempTags(node.getNodeID(),
applicationId, pReq.getSchedulingRequest().getAllocationTags());
- if (!attemptPlacementOnNode(
- applicationId, pReq.getSchedulingRequest(), node)) {
+ Resource availOnNode = availableResources.get(node.getNodeID());
+ if (!attemptPlacementOnNode(applicationId, availOnNode,
+ pReq.getSchedulingRequest(), node, true)) {
nodeIter.remove();
num++;
+ Resources.subtractFrom(availOnNode,
+ pReq.getSchedulingRequest().getResourceSizing().getResources());
} else {
// Add back the tags if everything is fine.
this.tagsManager.addTempTags(node.getNodeID(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java
index 9571f0e..952bfbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import java.util.ArrayList;
import java.util.List;
@@ -41,14 +40,14 @@ public class ConstraintPlacementAlgorithmOutput {
private final List<PlacedSchedulingRequest> placedRequests =
new ArrayList<>();
- private final List<SchedulingRequest> rejectedRequests =
+ private final List<SchedulingRequestWithPlacementAttempt> rejectedRequests =
new ArrayList<>();
public List<PlacedSchedulingRequest> getPlacedRequests() {
return placedRequests;
}
- public List<SchedulingRequest> getRejectedRequests() {
+ public List<SchedulingRequestWithPlacementAttempt> getRejectedRequests() {
return rejectedRequests;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java
new file mode 100644
index 0000000..e14d235
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+
+/**
+ * Simple holder class encapsulating a SchedulingRequest
+ * with a placement attempt.
+ */
+public class SchedulingRequestWithPlacementAttempt {
+
+ private final int placementAttempt;
+ private final SchedulingRequest schedulingRequest;
+
+ public SchedulingRequestWithPlacementAttempt(int placementAttempt,
+ SchedulingRequest schedulingRequest) {
+ this.placementAttempt = placementAttempt;
+ this.schedulingRequest = schedulingRequest;
+ }
+
+ public int getPlacementAttempt() {
+ return placementAttempt;
+ }
+
+ public SchedulingRequest getSchedulingRequest() {
+ return schedulingRequest;
+ }
+
+ @Override
+ public String toString() {
+ return "SchedulingRequestWithPlacementAttempt{" +
+ "placementAttempt=" + placementAttempt +
+ ", schedulingRequest=" + schedulingRequest +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
index 8e39b63..6badfee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
@@ -109,7 +109,7 @@ public class BatchedRequests
}
public void addToBlacklist(Set<String> tags, SchedulerNode node) {
- if (tags != null && !tags.isEmpty()) {
+ if (tags != null && !tags.isEmpty() && node != null) {
// We are currently assuming a single allocation tag
// per scheduler request currently.
blacklist.computeIfAbsent(tags.iterator().next(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
index 6a00ba8..849eb21 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
@@ -18,12 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +49,7 @@ class PlacementDispatcher implements
private Map<ApplicationId, List<PlacedSchedulingRequest>>
placedRequests = new ConcurrentHashMap<>();
- private Map<ApplicationId, List<SchedulingRequest>>
+ private Map<ApplicationId, List<SchedulingRequestWithPlacementAttempt>>
rejectedRequests = new ConcurrentHashMap<>();
public void init(RMContext rmContext,
@@ -90,12 +90,12 @@ class PlacementDispatcher implements
return Collections.EMPTY_LIST;
}
- public List<SchedulingRequest> pullRejectedRequests(
+ public List<SchedulingRequestWithPlacementAttempt> pullRejectedRequests(
ApplicationId applicationId) {
- List<SchedulingRequest> rejectedReqs =
+ List<SchedulingRequestWithPlacementAttempt> rejectedReqs =
this.rejectedRequests.get(applicationId);
if (rejectedReqs != null && !rejectedReqs.isEmpty()) {
- List<SchedulingRequest> retList = new ArrayList<>();
+ List<SchedulingRequestWithPlacementAttempt> retList = new ArrayList<>();
synchronized (rejectedReqs) {
if (rejectedReqs.size() > 0) {
retList.addAll(rejectedReqs);
@@ -130,7 +130,7 @@ class PlacementDispatcher implements
}
}
if (!placement.getRejectedRequests().isEmpty()) {
- List<SchedulingRequest> rejected =
+ List<SchedulingRequestWithPlacementAttempt> rejected =
rejectedRequests.computeIfAbsent(
placement.getApplicationId(), k -> new ArrayList());
LOG.warn(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
index 2a6b889..9ce38f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.Placem
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -208,6 +210,12 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
List<SchedulingRequest> schedulingRequests) {
if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
+ // Normalize the Requests before dispatching
+ schedulingRequests.forEach(req -> {
+ Resource reqResource = req.getResourceSizing().getResources();
+ req.getResourceSizing()
+ .setResources(this.scheduler.getNormalizedResource(reqResource));
+ });
this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
appAttemptId.getApplicationId(), schedulingRequests, 1));
}
@@ -261,20 +269,28 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
AllocateResponse response) {
- List<SchedulingRequest> rejectedRequests =
+ List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests =
this.placementDispatcher.pullRejectedRequests(
appAttemptId.getApplicationId());
- if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
+ if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) {
LOG.warn("Following requests of [{}] were rejected by" +
" the PlacementAlgorithmOutput Algorithm: {}",
- appAttemptId.getApplicationId(), rejectedRequests);
+ appAttemptId.getApplicationId(), rejectedAlgoRequests);
+ rejectedAlgoRequests.stream()
+ .filter(req -> req.getPlacementAttempt() < retryAttempts)
+ .forEach(req -> handleSchedulingResponse(
+ new Response(false, appAttemptId.getApplicationId(),
+ req.getSchedulingRequest(), req.getPlacementAttempt(),
+ null)));
ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
- rejectedRequests.stream()
+ rejectedAlgoRequests.stream()
+ .filter(req -> req.getPlacementAttempt() >= retryAttempts)
.map(sr -> RejectedSchedulingRequest.newInstance(
- RejectionReason.COULD_NOT_PLACE_ON_NODE, sr))
+ RejectionReason.COULD_NOT_PLACE_ON_NODE,
+ sr.getSchedulingRequest()))
.collect(Collectors.toList()));
}
- rejectedRequests =
+ List<SchedulingRequest> rejectedRequests =
this.requestsToReject.get(appAttemptId.getApplicationId());
if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
synchronized (rejectedRequests) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e5ba936/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index a530230..c4c0b5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -371,6 +371,91 @@ public class TestPlacementProcessor {
@Test(timeout = 300000)
public void testSchedulerRejection() throws Exception {
+ stopRM();
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a", "b"});
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 15.0f);
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 85.0f);
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+ startRM(conf);
+
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
+ // Containers with allocationTag 'foo' are restricted to 1 per NODE
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+ Collections.singletonMap(
+ Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+ ));
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ // Ask for a container larger than the node
+ schedulingRequest(1, 4, 1, 512, "foo"))
+ );
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List<Container> allocatedContainers = new ArrayList<>();
+ List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
+ int allocCount = 1;
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+
+ // kick the scheduler
+ while (allocCount < 11) {
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+ LOG.info("Waiting for containers to be created for app 1...");
+ sleep(1000);
+ allocResponse = am1.schedule();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+ allocCount++;
+ if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(3, allocatedContainers.size());
+ Set<NodeId> nodeIds = allocatedContainers.stream()
+ .map(x -> x.getNodeId()).collect(Collectors.toSet());
+ // Ensure unique nodes
+ Assert.assertEquals(3, nodeIds.size());
+ RejectedSchedulingRequest rej = rejectedReqs.get(0);
+ Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
+ Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
+ rej.getReason());
+
+ QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
+ // Verify Metrics
+ verifyMetrics(metrics, 12288, 12, 4096, 4, 4);
+ }
+
+ @Test(timeout = 300000)
+ public void testNodeCapacityRejection() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
@@ -432,7 +517,7 @@ public class TestPlacementProcessor {
Assert.assertEquals(3, nodeIds.size());
RejectedSchedulingRequest rej = rejectedReqs.get(0);
Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
- Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
+ Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
rej.getReason());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org