You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/17 21:59:00 UTC
[06/11] hadoop git commit: YARN-7612. Add Processor Framework for
Rich Placement Constraints. (asuresh)
YARN-7612. Add Processor Framework for Rich Placement Constraints. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e802a6c0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e802a6c0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e802a6c0
Branch: refs/heads/YARN-6592
Commit: e802a6c06cb46462ad84b77e4df663d075c02316
Parents: f650d39
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 22 15:51:20 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 17 13:51:28 2018 -0800
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 26 ++
.../src/main/resources/yarn-default.xml | 30 ++
.../ApplicationMasterService.java | 15 +
.../rmcontainer/RMContainerImpl.java | 7 +-
.../scheduler/capacity/CapacityScheduler.java | 2 +
.../constraint/processor/BatchedRequests.java | 105 +++++
.../processor/NodeCandidateSelector.java | 38 ++
.../processor/PlacementDispatcher.java | 145 +++++++
.../processor/PlacementProcessor.java | 343 ++++++++++++++++
.../processor/SamplePlacementAlgorithm.java | 144 +++++++
.../constraint/processor/package-info.java | 29 ++
.../yarn/server/resourcemanager/MockAM.java | 26 ++
.../yarn/server/resourcemanager/MockRM.java | 14 +
.../constraint/TestPlacementProcessor.java | 394 +++++++++++++++++++
14 files changed, 1316 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1b6bd0e..03c24d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -529,6 +529,32 @@ public class YarnConfiguration extends Configuration {
/** The class to use as the resource scheduler.*/
public static final String RM_SCHEDULER =
RM_PREFIX + "scheduler.class";
+
+ /** Placement Algorithm. */
+ public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS =
+ RM_PREFIX + "placement-constraints.algorithm.class";
+
+ public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
+ RM_PREFIX + "placement-constraints.enabled";
+
+ public static final boolean DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED = true;
+
+ public static final String RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS =
+ RM_PREFIX + "placement-constraints.retry-attempts";
+
+ public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS = 3;
+
+ public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE =
+ RM_PREFIX + "placement-constraints.algorithm.pool-size";
+
+ public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE =
+ 1;
+
+ public static final String RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE =
+ RM_PREFIX + "placement-constraints.scheduler.pool-size";
+
+ public static final int DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE =
+ 1;
public static final String DEFAULT_RM_SCHEDULER =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d450eca..0285069 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -131,6 +131,36 @@
</property>
<property>
+ <description>Enable Constraint Placement.</description>
+ <name>yarn.resourcemanager.placement-constraints.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <description>Number of times to retry placing of rejected SchedulingRequests</description>
+ <name>yarn.resourcemanager.placement-constraints.retry-attempts</name>
+ <value>3</value>
+ </property>
+
+ <property>
+ <description>Constraint Placement Algorithm to be used.</description>
+ <name>yarn.resourcemanager.placement-constraints.algorithm.class</name>
+ <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SamplePlacementAlgorithm</value>
+ </property>
+
+ <property>
+ <description>Threadpool size for the Algorithm used for placement constraint processing.</description>
+ <name>yarn.resourcemanager.placement-constraints.algorithm.pool-size</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <description>Threadpool size for the Scheduler invocation phase of placement constraint processing.</description>
+ <name>yarn.resourcemanager.placement-constraints.scheduler.pool-size</name>
+ <value>1</value>
+ </property>
+
+ <property>
<description>
Comma separated class names of ApplicationMasterServiceProcessor
implementations. The processors will be applied in the order
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 90c42be..aa1177d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.PlacementProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
@@ -114,11 +115,25 @@ public class ApplicationMasterService extends AbstractService implements
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+ initializeProcessingChain(conf);
+ }
+
+ private void initializeProcessingChain(Configuration conf) {
amsProcessingChain.init(rmContext, null);
+ boolean enablePlacementConstraints = conf.getBoolean(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED,
+ YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ENABLED);
+ if (enablePlacementConstraints) {
+ amsProcessingChain.addProcessor(new PlacementProcessor());
+ }
List<ApplicationMasterServiceProcessor> processors = getProcessorList(conf);
if (processors != null) {
Collections.reverse(processors);
for (ApplicationMasterServiceProcessor p : processors) {
+ // Ensure only single instance of PlacementProcessor is included
+ if (enablePlacementConstraints && p instanceof PlacementProcessor) {
+ continue;
+ }
this.amsProcessingChain.addProcessor(p);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 184cdfc..c873509 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -190,8 +190,7 @@ public class RMContainerImpl implements RMContainer {
private boolean isExternallyAllocated;
private SchedulerRequestKey allocatedSchedulerKey;
- // TODO, set it when container allocated by scheduler (From SchedulingRequest)
- private Set<String> allocationTags = null;
+ private volatile Set<String> allocationTags = null;
public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -510,6 +509,10 @@ public class RMContainerImpl implements RMContainer {
return allocationTags;
}
+ public void setAllocationTags(Set<String> tags) {
+ this.allocationTags = tags;
+ }
+
private static class BaseTransition implements
SingleArcTransition<RMContainerImpl, RMContainerEvent> {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d92ce58..f03d7d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2539,6 +2539,8 @@ public class CapacityScheduler extends
SchedulerRequestKey.extractFrom(container),
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, false);
+ ((RMContainerImpl)rmContainer).setAllocationTags(
+ new HashSet<>(schedulingRequest.getAllocationTags()));
allocated = new ContainerAllocationProposal<>(
getSchedulerContainer(rmContainer, true),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
new file mode 100644
index 0000000..fe92d2f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A grouping of Scheduling Requests which are sent to the PlacementAlgorithm
+ * to place as a batch. The placement algorithm tends to give more optimal
+ * placements if more requests are batched together.
+ */
+class BatchedRequests implements ConstraintPlacementAlgorithmInput {
+
+ // PlacementAlgorithmOutput attempt - the number of times the requests in this
+ // batch has been placed but was rejected by the scheduler.
+ private final int placementAttempt;
+
+ private final ApplicationId applicationId;
+ private final Collection<SchedulingRequest> requests;
+ private final Map<String, Set<NodeId>> blacklist = new HashMap<>();
+
+ BatchedRequests(ApplicationId applicationId,
+ Collection<SchedulingRequest> requests, int attempt) {
+ this.applicationId = applicationId;
+ this.requests = requests;
+ this.placementAttempt = attempt;
+ }
+
+ /**
+ * Get Application Id.
+ * @return Application Id.
+ */
+ ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ /**
+ * Get Collection of SchedulingRequests in this batch.
+ * @return Collection of Scheduling Requests.
+ */
+ @Override
+ public Collection<SchedulingRequest> getSchedulingRequests() {
+ return requests;
+ }
+
+ /**
+ * Add a Scheduling request to the batch.
+ * @param req Scheduling Request.
+ */
+ void addToBatch(SchedulingRequest req) {
+ requests.add(req);
+ }
+
+ void addToBlacklist(Set<String> tags, SchedulerNode node) {
+ if (tags != null && !tags.isEmpty()) {
+ // We are currently assuming a single allocation tag
+ // per scheduler request currently.
+ blacklist.computeIfAbsent(tags.iterator().next(),
+ k -> new HashSet<>()).add(node.getNodeID());
+ }
+ }
+
+ /**
+ * Get placement attempt.
+ * @return PlacementAlgorithmOutput placement Attempt.
+ */
+ int getPlacementAttempt() {
+ return placementAttempt;
+ }
+
+ /**
+ * Get any blacklisted nodes associated with tag.
+ * @param tag Tag.
+ * @return Set of blacklisted Nodes.
+ */
+ Set<NodeId> getBlacklist(String tag) {
+ return blacklist.getOrDefault(tag, Collections.EMPTY_SET);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java
new file mode 100644
index 0000000..4299050
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/NodeCandidateSelector.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeFilter;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.List;
+
+/**
+ * A read only implementation of the ClusterNodeTracker which exposes a method
+ * to simply return a filtered list of nodes.
+ */
+public interface NodeCandidateSelector {
+
+ /**
+ * Select a list of nodes given a filter.
+ * @param filter a NodeFilter.
+ * @return List of SchedulerNodes.
+ */
+ List<SchedulerNode> selectNodes(NodeFilter filter);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
new file mode 100644
index 0000000..6a00ba8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * This class initializes the Constraint Placement Algorithm. It dispatches
+ * input to the algorithm and collects output from it.
+ */
+class PlacementDispatcher implements
+ ConstraintPlacementAlgorithmOutputCollector {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PlacementDispatcher.class);
+ private ConstraintPlacementAlgorithm algorithm;
+ private ExecutorService algorithmThreadPool;
+
+ private Map<ApplicationId, List<PlacedSchedulingRequest>>
+ placedRequests = new ConcurrentHashMap<>();
+ private Map<ApplicationId, List<SchedulingRequest>>
+ rejectedRequests = new ConcurrentHashMap<>();
+
+ public void init(RMContext rmContext,
+ ConstraintPlacementAlgorithm placementAlgorithm, int poolSize) {
+ LOG.info("Initializing Constraint Placement Planner:");
+ this.algorithm = placementAlgorithm;
+ this.algorithm.init(rmContext);
+ this.algorithmThreadPool = Executors.newFixedThreadPool(poolSize);
+ }
+
+ void dispatch(final BatchedRequests batchedRequests) {
+ final ConstraintPlacementAlgorithmOutputCollector collector = this;
+ Runnable placingTask = () -> {
+ LOG.debug("Got [{}] requests to place from application [{}].. " +
+ "Attempt count [{}]",
+ batchedRequests.getSchedulingRequests().size(),
+ batchedRequests.getApplicationId(),
+ batchedRequests.getPlacementAttempt());
+ algorithm.place(batchedRequests, collector);
+ };
+ this.algorithmThreadPool.submit(placingTask);
+ }
+
+ public List<PlacedSchedulingRequest> pullPlacedRequests(
+ ApplicationId applicationId) {
+ List<PlacedSchedulingRequest> placedReqs =
+ this.placedRequests.get(applicationId);
+ if (placedReqs != null && !placedReqs.isEmpty()) {
+ List<PlacedSchedulingRequest> retList = new ArrayList<>();
+ synchronized (placedReqs) {
+ if (placedReqs.size() > 0) {
+ retList.addAll(placedReqs);
+ placedReqs.clear();
+ }
+ }
+ return retList;
+ }
+ return Collections.EMPTY_LIST;
+ }
+
+ public List<SchedulingRequest> pullRejectedRequests(
+ ApplicationId applicationId) {
+ List<SchedulingRequest> rejectedReqs =
+ this.rejectedRequests.get(applicationId);
+ if (rejectedReqs != null && !rejectedReqs.isEmpty()) {
+ List<SchedulingRequest> retList = new ArrayList<>();
+ synchronized (rejectedReqs) {
+ if (rejectedReqs.size() > 0) {
+ retList.addAll(rejectedReqs);
+ rejectedReqs.clear();
+ }
+ }
+ return retList;
+ }
+ return Collections.EMPTY_LIST;
+ }
+
+ void clearApplicationState(ApplicationId applicationId) {
+ placedRequests.remove(applicationId);
+ rejectedRequests.remove(applicationId);
+ }
+
+ @Override
+ public void collect(ConstraintPlacementAlgorithmOutput placement) {
+ if (!placement.getPlacedRequests().isEmpty()) {
+ List<PlacedSchedulingRequest> processed =
+ placedRequests.computeIfAbsent(
+ placement.getApplicationId(), k -> new ArrayList<>());
+ synchronized (processed) {
+ LOG.debug(
+ "Planning Algorithm has placed for application [{}]" +
+ " the following [{}]", placement.getApplicationId(),
+ placement.getPlacedRequests());
+ for (PlacedSchedulingRequest esr :
+ placement.getPlacedRequests()) {
+ processed.add(esr);
+ }
+ }
+ }
+ if (!placement.getRejectedRequests().isEmpty()) {
+ List<SchedulingRequest> rejected =
+ rejectedRequests.computeIfAbsent(
+ placement.getApplicationId(), k -> new ArrayList());
+ LOG.warn(
+ "Planning Algorithm has rejected for application [{}]" +
+ " the following [{}]", placement.getApplicationId(),
+ placement.getRejectedRequests());
+ synchronized (rejected) {
+ rejected.addAll(placement.getRejectedRequests());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
new file mode 100644
index 0000000..d613d4e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An ApplicationMasterService Processor that performs Constrained placement of
+ * Scheduling Requests. It does the following:
+ * 1. All initialization.
+ * 2. Intercepts placement constraints from the register call and adds it to
+ * the placement constraint manager.
+ * 3. Dispatches Scheduling Requests to the Planner.
+ */
+public class PlacementProcessor implements ApplicationMasterServiceProcessor {
+
+ /**
+ * Wrapper over the SchedulingResponse that wires in the placement attempt
+ * and last attempted Node.
+ */
+ static final class Response extends SchedulingResponse {
+
+ private final int placementAttempt;
+ private final SchedulerNode attemptedNode;
+
+ private Response(boolean isSuccess, ApplicationId applicationId,
+ SchedulingRequest schedulingRequest, int placementAttempt,
+ SchedulerNode attemptedNode) {
+ super(isSuccess, applicationId, schedulingRequest);
+ this.placementAttempt = placementAttempt;
+ this.attemptedNode = attemptedNode;
+ }
+ }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PlacementProcessor.class);
+ private PlacementConstraintManager constraintManager;
+ private ApplicationMasterServiceProcessor nextAMSProcessor;
+
+ private AbstractYarnScheduler scheduler;
+ private ExecutorService schedulingThreadPool;
+ private int retryAttempts;
+ private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
+ new ConcurrentHashMap<>();
+ private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
+ new ConcurrentHashMap<>();
+
+ private PlacementDispatcher placementDispatcher;
+
+
+ @Override
+ public void init(ApplicationMasterServiceContext amsContext,
+ ApplicationMasterServiceProcessor nextProcessor) {
+ LOG.info("Initializing Constraint Placement Processor:");
+ this.nextAMSProcessor = nextProcessor;
+ this.constraintManager =
+ ((RMContextImpl)amsContext).getPlacementConstraintManager();
+
+ this.scheduler =
+ (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler();
+ // Only the first class is considered - even if a comma separated
+ // list is provided. (This is for simplicity, since getInstances does a
+ // lot of good things by handling things correctly)
+ List<ConstraintPlacementAlgorithm> instances =
+ ((RMContextImpl) amsContext).getYarnConfiguration().getInstances(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS,
+ ConstraintPlacementAlgorithm.class);
+ ConstraintPlacementAlgorithm algorithm = null;
+ if (instances != null && !instances.isEmpty()) {
+ algorithm = instances.get(0);
+ } else {
+ algorithm = new SamplePlacementAlgorithm();
+ }
+ LOG.info("Planning Algorithm [{}]", algorithm.getClass().getName());
+
+ int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
+ YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE);
+ this.placementDispatcher = new PlacementDispatcher();
+ this.placementDispatcher.init(
+ ((RMContextImpl)amsContext), algorithm, algoPSize);
+ LOG.info("Planning Algorithm pool size [{}]", algoPSize);
+
+ int schedPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE,
+ YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_SCHEDULER_POOL_SIZE);
+ this.schedulingThreadPool = Executors.newFixedThreadPool(schedPSize);
+ LOG.info("Scheduler pool size [{}]", schedPSize);
+
+ // Number of times a request that is not satisfied by the scheduler
+ // can be retried.
+ this.retryAttempts =
+ ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
+ LOG.info("Num retry attempts [{}]", this.retryAttempts);
+ }
+
+ @Override
+ public void registerApplicationMaster(ApplicationAttemptId appAttemptId,
+ RegisterApplicationMasterRequest request,
+ RegisterApplicationMasterResponse response)
+ throws IOException, YarnException {
+ Map<Set<String>, PlacementConstraint> appPlacementConstraints =
+ request.getPlacementConstraints();
+ processPlacementConstraints(
+ appAttemptId.getApplicationId(), appPlacementConstraints);
+ nextAMSProcessor.registerApplicationMaster(appAttemptId, request, response);
+ }
+
+ private void processPlacementConstraints(ApplicationId applicationId,
+ Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
+ if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
+ LOG.info("Constraints added for application [{}] against tags [{}]",
+ applicationId, appPlacementConstraints);
+ constraintManager.registerApplication(
+ applicationId, appPlacementConstraints);
+ }
+ }
+
+ @Override
+ public void allocate(ApplicationAttemptId appAttemptId,
+ AllocateRequest request, AllocateResponse response) throws YarnException {
+ List<SchedulingRequest> schedulingRequests =
+ request.getSchedulingRequests();
+ dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
+ reDispatchRetryableRequests(appAttemptId);
+ schedulePlacedRequests(appAttemptId);
+
+ nextAMSProcessor.allocate(appAttemptId, request, response);
+
+ handleRejectedRequests(appAttemptId, response);
+ }
+
+ private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
+ List<SchedulingRequest> schedulingRequests) {
+ if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
+ this.placementDispatcher.dispatch(
+ new BatchedRequests(appAttemptId.getApplicationId(),
+ schedulingRequests, 1));
+ }
+ }
+
+ private void reDispatchRetryableRequests(ApplicationAttemptId appAttId) {
+ List<BatchedRequests> reqsToRetry =
+ this.requestsToRetry.get(appAttId.getApplicationId());
+ if (reqsToRetry != null && !reqsToRetry.isEmpty()) {
+ synchronized (reqsToRetry) {
+ for (BatchedRequests bReq: reqsToRetry) {
+ this.placementDispatcher.dispatch(bReq);
+ }
+ reqsToRetry.clear();
+ }
+ }
+ }
+
+ private void schedulePlacedRequests(ApplicationAttemptId appAttemptId) {
+ ApplicationId applicationId = appAttemptId.getApplicationId();
+ List<PlacedSchedulingRequest> placedSchedulingRequests =
+ this.placementDispatcher.pullPlacedRequests(applicationId);
+ for (PlacedSchedulingRequest placedReq : placedSchedulingRequests) {
+ SchedulingRequest sReq = placedReq.getSchedulingRequest();
+ for (SchedulerNode node : placedReq.getNodes()) {
+ final SchedulingRequest sReqClone =
+ SchedulingRequest.newInstance(sReq.getAllocationRequestId(),
+ sReq.getPriority(), sReq.getExecutionType(),
+ sReq.getAllocationTags(),
+ ResourceSizing.newInstance(
+ sReq.getResourceSizing().getResources()),
+ sReq.getPlacementConstraint());
+ SchedulerApplicationAttempt applicationAttempt =
+ this.scheduler.getApplicationAttempt(appAttemptId);
+ Runnable task = () -> {
+ boolean success =
+ scheduler.attemptAllocationOnNode(
+ applicationAttempt, sReqClone, node);
+ if (!success) {
+ LOG.warn("Unsuccessful allocation attempt [{}] for [{}]",
+ placedReq.getPlacementAttempt(), sReqClone);
+ }
+ handleSchedulingResponse(
+ new Response(success, applicationId, sReqClone,
+ placedReq.getPlacementAttempt(), node));
+ };
+ this.schedulingThreadPool.submit(task);
+ }
+ }
+ }
+
+ private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
+ AllocateResponse response) {
+ List<SchedulingRequest> rejectedRequests =
+ this.placementDispatcher.pullRejectedRequests(
+ appAttemptId.getApplicationId());
+ if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
+ LOG.warn("Following requests of [{}] were rejected by" +
+ " the PlacementAlgorithmOutput Algorithm: {}",
+ appAttemptId.getApplicationId(), rejectedRequests);
+ ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
+ rejectedRequests.stream()
+ .map(sr -> RejectedSchedulingRequest.newInstance(
+ RejectionReason.COULD_NOT_PLACE_ON_NODE, sr))
+ .collect(Collectors.toList()));
+ }
+ rejectedRequests =
+ this.requestsToReject.get(appAttemptId.getApplicationId());
+ if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
+ synchronized (rejectedRequests) {
+ LOG.warn("Following requests of [{}] exhausted all retry attempts " +
+ "trying to schedule on placed node: {}",
+ appAttemptId.getApplicationId(), rejectedRequests);
+ ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
+ rejectedRequests.stream()
+ .map(sr -> RejectedSchedulingRequest.newInstance(
+ RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, sr))
+ .collect(Collectors.toList()));
+ rejectedRequests.clear();
+ }
+ }
+ }
+
+ @Override
+ public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
+ FinishApplicationMasterRequest request,
+ FinishApplicationMasterResponse response) {
+ constraintManager.unregisterApplication(appAttemptId.getApplicationId());
+ placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
+ requestsToReject.remove(appAttemptId.getApplicationId());
+ requestsToRetry.remove(appAttemptId.getApplicationId());
+ nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response);
+ }
+
+ private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
+ int placementAttempt = ((Response)schedulerResponse).placementAttempt;
+ // Retry this placement as it is not successful and we are still
+ // under max retry. The req is batched with other unsuccessful
+ // requests from the same app
+ if (!schedulerResponse.isSuccess() && placementAttempt < retryAttempts) {
+ List<BatchedRequests> reqsToRetry =
+ requestsToRetry.computeIfAbsent(
+ schedulerResponse.getApplicationId(),
+ k -> new ArrayList<>());
+ synchronized (reqsToRetry) {
+ addToRetryList(schedulerResponse, placementAttempt, reqsToRetry);
+ }
+ LOG.warn("Going to retry request for application [{}] after [{}]" +
+ " attempts: [{}]", schedulerResponse.getApplicationId(),
+ placementAttempt, schedulerResponse.getSchedulingRequest());
+ } else {
+ if (!schedulerResponse.isSuccess()) {
+ LOG.warn("Not retrying request for application [{}] after [{}]" +
+ " attempts: [{}]", schedulerResponse.getApplicationId(),
+ placementAttempt, schedulerResponse.getSchedulingRequest());
+ List<SchedulingRequest> reqsToReject =
+ requestsToReject.computeIfAbsent(
+ schedulerResponse.getApplicationId(),
+ k -> new ArrayList<>());
+ synchronized (reqsToReject) {
+ reqsToReject.add(schedulerResponse.getSchedulingRequest());
+ }
+ }
+ }
+ }
+
+ private void addToRetryList(SchedulingResponse schedulerResponse,
+ int placementAttempt, List<BatchedRequests> reqsToRetry) {
+ boolean isAdded = false;
+ for (BatchedRequests br : reqsToRetry) {
+ if (br.getPlacementAttempt() == placementAttempt + 1) {
+ br.addToBatch(schedulerResponse.getSchedulingRequest());
+ br.addToBlacklist(
+ schedulerResponse.getSchedulingRequest().getAllocationTags(),
+ ((Response) schedulerResponse).attemptedNode);
+ isAdded = true;
+ break;
+ }
+ }
+ if (!isAdded) {
+ BatchedRequests br =
+ new BatchedRequests(schedulerResponse.getApplicationId(),
+ Collections.singleton(
+ schedulerResponse.getSchedulingRequest()),
+ placementAttempt + 1);
+ reqsToRetry.add(br);
+ br.addToBlacklist(
+ schedulerResponse.getSchedulingRequest().getAllocationTags(),
+ ((Response) schedulerResponse).attemptedNode);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
new file mode 100644
index 0000000..8d49801
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SpecializedConstraintTransformer;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Sample Test algorithm. Assumes anti-affinity always
+ * It also assumes the numAllocations in resource sizing is always = 1
+ *
+ * NOTE: This is just a sample implementation. Not be actually used
+ */
+public class SamplePlacementAlgorithm implements ConstraintPlacementAlgorithm {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SamplePlacementAlgorithm.class);
+
+ private AllocationTagsManager tagsManager;
+ private PlacementConstraintManager constraintManager;
+ private NodeCandidateSelector nodeSelector;
+
+ @Override
+ public void init(RMContext rmContext) {
+ this.tagsManager = rmContext.getAllocationTagsManager();
+ this.constraintManager = rmContext.getPlacementConstraintManager();
+ this.nodeSelector =
+ filter -> ((AbstractYarnScheduler)(rmContext)
+ .getScheduler()).getNodes(filter);
+ }
+
+ @Override
+ public void place(ConstraintPlacementAlgorithmInput input,
+ ConstraintPlacementAlgorithmOutputCollector collector) {
+ BatchedRequests requests = (BatchedRequests)input;
+ ConstraintPlacementAlgorithmOutput resp =
+ new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
+ List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
+ Map<String, List<SchedulingRequest>> tagIndexedRequests = new HashMap<>();
+ requests.getSchedulingRequests()
+ .stream()
+ .filter(r -> r.getAllocationTags() != null)
+ .forEach(
+ req -> req.getAllocationTags().forEach(
+ tag -> tagIndexedRequests.computeIfAbsent(tag,
+ k -> new ArrayList<>()).add(req))
+ );
+ for (Map.Entry<String, List<SchedulingRequest>> entry :
+ tagIndexedRequests.entrySet()) {
+ String tag = entry.getKey();
+ PlacementConstraint constraint =
+ constraintManager.getConstraint(requests.getApplicationId(),
+ Collections.singleton(tag));
+ if (constraint != null) {
+ // Currently works only for simple anti-affinity
+ // NODE scope target expressions
+ SpecializedConstraintTransformer transformer =
+ new SpecializedConstraintTransformer(constraint);
+ PlacementConstraint transform = transformer.transform();
+ TargetConstraint targetConstraint =
+ (TargetConstraint) transform.getConstraintExpr();
+ // Assume a single target expression tag;
+ // The Sample Algorithm assumes a constraint will always be a simple
+ // Target Constraint with a single entry in the target set.
+ // As mentioned in the class javadoc - This algorithm should be
+ // used mostly for testing and validating end-2-end workflow.
+ String targetTag =
+ targetConstraint.getTargetExpressions().iterator().next()
+ .getTargetValues().iterator().next();
+ // iterate over all nodes
+ Iterator<SchedulerNode> nodeIter = allNodes.iterator();
+ List<SchedulingRequest> schedulingRequests = entry.getValue();
+ Iterator<SchedulingRequest> reqIter = schedulingRequests.iterator();
+ while (reqIter.hasNext()) {
+ SchedulingRequest sReq = reqIter.next();
+ int numAllocs = sReq.getResourceSizing().getNumAllocations();
+ while (numAllocs > 0 && nodeIter.hasNext()) {
+ SchedulerNode node = nodeIter.next();
+ long nodeCardinality = 0;
+ try {
+ nodeCardinality = tagsManager.getNodeCardinality(
+ node.getNodeID(), requests.getApplicationId(),
+ targetTag);
+ if (nodeCardinality == 0 &&
+ !requests.getBlacklist(tag).contains(node.getNodeID())) {
+ numAllocs--;
+ sReq.getResourceSizing().setNumAllocations(numAllocs);
+ PlacedSchedulingRequest placedReq =
+ new PlacedSchedulingRequest(sReq);
+ placedReq.setPlacementAttempt(requests.getPlacementAttempt());
+ placedReq.getNodes().add(node);
+ resp.getPlacedRequests().add(placedReq);
+ }
+ } catch (InvalidAllocationTagsQueryException e) {
+ LOG.warn("Got exception from TagManager !", e);
+ }
+ }
+ }
+ }
+ }
+ // Add all requests whose numAllocations still > 0 to rejected list.
+ requests.getSchedulingRequests().stream()
+ .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
+ .forEach(rejReq -> resp.getRejectedRequests().add(rejReq));
+ collector.collect(resp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java
new file mode 100644
index 0000000..7090154
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package o.a.h.yarn.server.resourcemanager.scheduler.constraint.processor
+ * contains classes related to scheduling containers using placement
+ * processor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 12dfe18..975abe6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -39,7 +42,9 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -57,6 +62,9 @@ public class MockAM {
private ApplicationMasterProtocol amRMProtocol;
private UserGroupInformation ugi;
private volatile AllocateResponse lastResponse;
+ private Map<Set<String>, PlacementConstraint> placementConstraints =
+ new HashMap<>();
+ private List<SchedulingRequest> schedulingRequests = new ArrayList<>();
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
private final List<ContainerId> releases = new ArrayList<ContainerId>();
@@ -93,6 +101,16 @@ public class MockAM {
return registerAppAttempt(true);
}
+ public void addPlacementConstraint(Set<String> tags,
+ PlacementConstraint constraint) {
+ placementConstraints.put(tags, constraint);
+ }
+
+ public MockAM addSchedulingRequest(List<SchedulingRequest> reqs) {
+ schedulingRequests.addAll(reqs);
+ return this;
+ }
+
public RegisterApplicationMasterResponse registerAppAttempt(boolean wait)
throws Exception {
if (wait) {
@@ -104,6 +122,9 @@ public class MockAM {
req.setHost("");
req.setRpcPort(1);
req.setTrackingUrl("");
+ if (!placementConstraints.isEmpty()) {
+ req.setPlacementConstraints(this.placementConstraints);
+ }
if (ugi == null) {
ugi = UserGroupInformation.createRemoteUser(
attemptId.toString());
@@ -247,12 +268,17 @@ public class MockAM {
}
+
public AllocateResponse allocate(
List<ResourceRequest> resourceRequest, List<ContainerId> releases)
throws Exception {
final AllocateRequest req =
AllocateRequest.newInstance(0, 0F, resourceRequest,
releases, null);
+ if (!schedulingRequests.isEmpty()) {
+ req.setSchedulingRequests(schedulingRequests);
+ schedulingRequests.clear();
+ }
return allocate(req);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 302f5b3..f0e4213 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -1240,6 +1242,18 @@ public class MockRM extends ResourceManager {
return am;
}
+ public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm,
+ Map<Set<String>, PlacementConstraint> constraints) throws Exception {
+ MockAM am = launchAM(app, rm, nm);
+ for (Map.Entry<Set<String>, PlacementConstraint> e :
+ constraints.entrySet()) {
+ am.addPlacementConstraint(e.getKey(), e.getValue());
+ }
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+ return am;
+ }
+
public ApplicationReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException {
ApplicationClientProtocol client = getClientRMService();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e802a6c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
new file mode 100644
index 0000000..db8ae15
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.RejectionReason;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+
+/**
+ * This tests end2end workflow of the constraint placement framework.
+ */
+public class TestPlacementProcessor {
+
+ private static final int GB = 1024;
+
+ private static final Log LOG =
+ LogFactory.getLog(TestPlacementProcessor.class);
+ private MockRM rm;
+ private DrainDispatcher dispatcher;
+
+ @Before
+ public void createAndStartRM() {
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+ conf.setInt(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1);
+ startRM(conf);
+ }
+
+ private void startRM(final YarnConfiguration conf) {
+ dispatcher = new DrainDispatcher();
+ rm = new MockRM(conf) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ rm.start();
+ }
+
+ @After
+ public void stopRM() {
+ if (rm != null) {
+ rm.stop();
+ }
+ }
+
+ @Test(timeout = 300000)
+ public void testPlacement() throws Exception {
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+ Collections.singletonMap(
+ Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+ ));
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 5, 1, 512, "foo"))
+ );
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List<Container> allocatedContainers = new ArrayList<>();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+
+ // kick the scheduler
+
+ while (allocatedContainers.size() < 4) {
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+ LOG.info("Waiting for containers to be created for app 1...");
+ sleep(1000);
+ allocResponse = am1.schedule();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ }
+
+ Assert.assertEquals(4, allocatedContainers.size());
+ Set<NodeId> nodeIds = allocatedContainers.stream()
+ .map(x -> x.getNodeId()).collect(Collectors.toSet());
+ // Ensure unique nodes
+ Assert.assertEquals(4, nodeIds.size());
+ }
+
+ @Test(timeout = 300000)
+ public void testSchedulerRejection() throws Exception {
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+ Collections.singletonMap(
+ Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+ ));
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ // Ask for a container larger than the node
+ schedulingRequest(1, 4, 1, 5120, "foo"))
+ );
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List<Container> allocatedContainers = new ArrayList<>();
+ List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
+ int allocCount = 1;
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+
+ // kick the scheduler
+
+ while (allocCount < 11) {
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+ LOG.info("Waiting for containers to be created for app 1...");
+ sleep(1000);
+ allocResponse = am1.schedule();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+ allocCount++;
+ if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(3, allocatedContainers.size());
+ Set<NodeId> nodeIds = allocatedContainers.stream()
+ .map(x -> x.getNodeId()).collect(Collectors.toSet());
+ // Ensure unique nodes
+ Assert.assertEquals(3, nodeIds.size());
+ RejectedSchedulingRequest rej = rejectedReqs.get(0);
+ Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
+ Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
+ rej.getReason());
+ }
+
+ @Test(timeout = 300000)
+ public void testRePlacementAfterSchedulerRejection() throws Exception {
+ stopRM();
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+ conf.setInt(
+ YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2);
+ startRM(conf);
+
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ MockNM nm5 = new MockNM("h5:1234", 8192, rm.getResourceTrackerService());
+ nodes.put(nm5.getNodeId(), nm5);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+ // No not register nm5 yet..
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+ Collections.singletonMap(
+ Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+ ));
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ // Ask for a container larger than the node
+ schedulingRequest(1, 4, 1, 5120, "foo"))
+ );
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List<Container> allocatedContainers = new ArrayList<>();
+ List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
+ int allocCount = 1;
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+
+ // Register node5 only after first allocate - so the initial placement
+ // for the large schedReq goes to some other node..
+ nm5.registerNode();
+
+ // kick the scheduler
+ while (allocCount < 11) {
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+ nm5.nodeHeartbeat(true);
+ LOG.info("Waiting for containers to be created for app 1...");
+ sleep(1000);
+ allocResponse = am1.schedule();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+ allocCount++;
+ if (allocatedContainers.size() > 3) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(4, allocatedContainers.size());
+ Set<NodeId> nodeIds = allocatedContainers.stream()
+ .map(x -> x.getNodeId()).collect(Collectors.toSet());
+ // Ensure unique nodes
+ Assert.assertEquals(4, nodeIds.size());
+ }
+
+ @Test(timeout = 300000)
+ public void testPlacementRejection() throws Exception {
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
+ Collections.singletonMap(
+ Collections.singleton("foo"),
+ PlacementConstraints.build(
+ PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
+ ));
+ am1.addSchedulingRequest(
+ Arrays.asList(
+ schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 4, 1, 512, "foo"),
+ // Ask for more containers than nodes
+ schedulingRequest(1, 5, 1, 512, "foo"))
+ );
+ AllocateResponse allocResponse = am1.schedule(); // send the request
+ List<Container> allocatedContainers = new ArrayList<>();
+ List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
+ int allocCount = 1;
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+
+ // kick the scheduler
+
+ while (allocCount < 11) {
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+ LOG.info("Waiting for containers to be created for app 1...");
+ sleep(1000);
+ allocResponse = am1.schedule();
+ allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+ rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
+ allocCount++;
+ if (rejectedReqs.size() > 0 && allocatedContainers.size() > 3) {
+ break;
+ }
+ }
+
+ Assert.assertEquals(4, allocatedContainers.size());
+ Set<NodeId> nodeIds = allocatedContainers.stream()
+ .map(x -> x.getNodeId()).collect(Collectors.toSet());
+ // Ensure unique nodes
+ Assert.assertEquals(4, nodeIds.size());
+ RejectedSchedulingRequest rej = rejectedReqs.get(0);
+ Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
+ rej.getReason());
+ }
+
+ private static SchedulingRequest schedulingRequest(
+ int priority, long allocReqId, int cores, int mem, String... tags) {
+ return schedulingRequest(priority, allocReqId, cores, mem,
+ ExecutionType.GUARANTEED, tags);
+ }
+
+ private static SchedulingRequest schedulingRequest(
+ int priority, long allocReqId, int cores, int mem,
+ ExecutionType execType, String... tags) {
+ return SchedulingRequest.newBuilder()
+ .priority(Priority.newInstance(priority))
+ .allocationRequestId(allocReqId)
+ .allocationTags(new HashSet<>(Arrays.asList(tags)))
+ .executionType(ExecutionTypeRequest.newInstance(execType, true))
+ .resourceSizing(
+ ResourceSizing.newInstance(1, Resource.newInstance(mem, cores)))
+ .build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org