You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/31 15:57:40 UTC
[16/32] hadoop git commit: YARN-7613. Implement Basic algorithm for
constraint based placement. (Panagiotis Garefalakis via asuresh)
YARN-7613. Implement Basic algorithm for constraint based placement. (Panagiotis Garefalakis via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a52d11fb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a52d11fb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a52d11fb
Branch: refs/heads/trunk
Commit: a52d11fb8c103f14e42692600a058ba3b56e2ecf
Parents: f9af15d
Author: Arun Suresh <as...@apache.org>
Authored: Wed Dec 27 22:59:22 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 4 +
.../src/main/resources/yarn-default.xml | 8 +-
.../rmcontainer/RMContainerImpl.java | 10 +-
.../constraint/AllocationTagsManager.java | 121 ++++++++++---
.../algorithm/DefaultPlacementAlgorithm.java | 172 +++++++++++++++++++
.../iterators/PopularTagsIterator.java | 71 ++++++++
.../algorithm/iterators/SerialIterator.java | 53 ++++++
.../algorithm/iterators/package-info.java | 29 ++++
.../constraint/algorithm/package-info.java | 29 ++++
.../constraint/processor/BatchedRequests.java | 45 ++++-
.../processor/PlacementProcessor.java | 32 ++--
.../processor/SamplePlacementAlgorithm.java | 144 ----------------
.../constraint/TestAllocationTagsManager.java | 156 ++++++++++++-----
.../TestBatchedRequestsIterators.java | 82 +++++++++
.../constraint/TestPlacementProcessor.java | 4 +-
15 files changed, 721 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8fb3c2e..367b1ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -536,6 +536,10 @@ public class YarnConfiguration extends Configuration {
public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_CLASS =
RM_PREFIX + "placement-constraints.algorithm.class";
+ /** Used for BasicPlacementAlgorithm - default SERIAL. **/
+ public static final String RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR =
+ RM_PREFIX + "placement-constraints.algorithm.iterator";
+
public static final String RM_PLACEMENT_CONSTRAINTS_ENABLED =
RM_PREFIX + "placement-constraints.enabled";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 6d52ace..509a040 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -145,7 +145,13 @@
<property>
<description>Constraint Placement Algorithm to be used.</description>
<name>yarn.resourcemanager.placement-constraints.algorithm.class</name>
- <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.SamplePlacementAlgorithm</value>
+ <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm</value>
+ </property>
+
+ <property>
+ <description>Placement Algorithm Requests Iterator to be used.</description>
+ <name>yarn.resourcemanager.placement-constraints.algorithm.iterator</name>
+ <value>SERIAL</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index c873509..2c4ef7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -579,9 +579,8 @@ public class RMContainerImpl implements RMContainer {
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Notify placementManager
container.rmContext.getAllocationTagsManager().addContainer(
- container.getNodeId(),
- container.getApplicationAttemptId().getApplicationId(),
- container.getContainerId(), container.getAllocationTags());
+ container.getNodeId(), container.getContainerId(),
+ container.getAllocationTags());
container.eventHandler.handle(new RMAppAttemptEvent(
container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
@@ -696,9 +695,8 @@ public class RMContainerImpl implements RMContainer {
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Notify placementManager
container.rmContext.getAllocationTagsManager().removeContainer(
- container.getNodeId(),
- container.getApplicationAttemptId().getApplicationId(),
- container.getContainerId(), container.getAllocationTags());
+ container.getNodeId(), container.getContainerId(),
+ container.getAllocationTags());
RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index 7b0b959..4bb3e79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -54,24 +55,27 @@ public class AllocationTagsManager {
private final RMContext rmContext;
// Application's tags to Node
- private Map<ApplicationId, NodeToCountedTags> perAppNodeMappings =
+ private Map<ApplicationId, TypeToCountedTags> perAppNodeMappings =
new HashMap<>();
// Application's tags to Rack
- private Map<ApplicationId, NodeToCountedTags> perAppRackMappings =
+ private Map<ApplicationId, TypeToCountedTags> perAppRackMappings =
new HashMap<>();
+ // Application's Temporary containers mapping
+ private Map<ApplicationId, Map<NodeId, Map<ContainerId, Set<String>>>>
+ appTempMappings = new HashMap<>();
// Global tags to node mapping (used to fast return aggregated tags
// cardinality across apps)
- private NodeToCountedTags<NodeId> globalNodeMapping = new NodeToCountedTags();
+ private TypeToCountedTags<NodeId> globalNodeMapping = new TypeToCountedTags();
// Global tags to Rack mapping
- private NodeToCountedTags<String> globalRackMapping = new NodeToCountedTags();
+ private TypeToCountedTags<String> globalRackMapping = new TypeToCountedTags();
/**
* Generic store mapping type <T> to counted tags.
* Currently used both for NodeId to Tag, Count and Rack to Tag, Count
*/
@VisibleForTesting
- static class NodeToCountedTags<T> {
+ static class TypeToCountedTags<T> {
// Map<Type, Map<Tag, Count>>
private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
@@ -209,25 +213,31 @@ public class AllocationTagsManager {
}
@VisibleForTesting
- Map<ApplicationId, NodeToCountedTags> getPerAppNodeMappings() {
+ Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
return perAppNodeMappings;
}
@VisibleForTesting
- Map<ApplicationId, NodeToCountedTags> getPerAppRackMappings() {
+ Map<ApplicationId, TypeToCountedTags> getPerAppRackMappings() {
return perAppRackMappings;
}
@VisibleForTesting
- NodeToCountedTags getGlobalNodeMapping() {
+ TypeToCountedTags getGlobalNodeMapping() {
return globalNodeMapping;
}
@VisibleForTesting
- NodeToCountedTags getGlobalRackMapping() {
+ TypeToCountedTags getGlobalRackMapping() {
return globalRackMapping;
}
+ @VisibleForTesting
+ public Map<NodeId, Map<ContainerId, Set<String>>> getAppTempMappings(
+ ApplicationId applicationId) {
+ return appTempMappings.get(applicationId);
+ }
+
public AllocationTagsManager(RMContext context) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
@@ -235,18 +245,52 @@ public class AllocationTagsManager {
rmContext = context;
}
+ //
+
+ /**
+ * Method adds a temporary fake-container tag to Node mapping.
+ * Used by the constrained placement algorithm to keep track of containers
+ * that are currently placed on nodes but are not yet allocated.
+ * @param nodeId
+ * @param applicationId
+ * @param allocationTags
+ */
+ public void addTempContainer(NodeId nodeId, ApplicationId applicationId,
+ Set<String> allocationTags) {
+ ContainerId tmpContainer = ContainerId.newContainerId(
+ ApplicationAttemptId.newInstance(applicationId, 1), System.nanoTime());
+
+ writeLock.lock();
+ try {
+ Map<NodeId, Map<ContainerId, Set<String>>> appTempMapping =
+ appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
+ Map<ContainerId, Set<String>> containerTempMapping =
+ appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
+ containerTempMapping.put(tmpContainer, allocationTags);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added TEMP container=" + tmpContainer + " with tags=["
+ + StringUtils.join(allocationTags, ",") + "]");
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ addContainer(nodeId, tmpContainer, allocationTags);
+ }
+
/**
* Notify container allocated on a node.
*
* @param nodeId allocated node.
- * @param applicationId applicationId
* @param containerId container id.
* @param allocationTags allocation tags, see
* {@link SchedulingRequest#getAllocationTags()}
* application_id will be added to allocationTags.
*/
- public void addContainer(NodeId nodeId, ApplicationId applicationId,
- ContainerId containerId, Set<String> allocationTags) {
+ public void addContainer(NodeId nodeId, ContainerId containerId,
+ Set<String> allocationTags) {
+ ApplicationId applicationId =
+ containerId.getApplicationAttemptId().getApplicationId();
String applicationIdTag =
AllocationTagsNamespaces.APP_ID + applicationId.toString();
@@ -260,10 +304,10 @@ public class AllocationTagsManager {
writeLock.lock();
try {
- NodeToCountedTags perAppTagsMapping = perAppNodeMappings
- .computeIfAbsent(applicationId, k -> new NodeToCountedTags());
- NodeToCountedTags perAppRackTagsMapping = perAppRackMappings
- .computeIfAbsent(applicationId, k -> new NodeToCountedTags());
+ TypeToCountedTags perAppTagsMapping = perAppNodeMappings
+ .computeIfAbsent(applicationId, k -> new TypeToCountedTags());
+ TypeToCountedTags perAppRackTagsMapping = perAppRackMappings
+ .computeIfAbsent(applicationId, k -> new TypeToCountedTags());
// Covering test-cases where context is mocked
String nodeRack = (rmContext.getRMNodes() != null
&& rmContext.getRMNodes().get(nodeId) != null)
@@ -294,12 +338,13 @@ public class AllocationTagsManager {
* Notify container removed.
*
* @param nodeId nodeId
- * @param applicationId applicationId
* @param containerId containerId.
* @param allocationTags allocation tags for given container
*/
- public void removeContainer(NodeId nodeId, ApplicationId applicationId,
+ public void removeContainer(NodeId nodeId,
ContainerId containerId, Set<String> allocationTags) {
+ ApplicationId applicationId =
+ containerId.getApplicationAttemptId().getApplicationId();
String applicationIdTag =
AllocationTagsNamespaces.APP_ID + applicationId.toString();
boolean useSet = false;
@@ -313,9 +358,9 @@ public class AllocationTagsManager {
writeLock.lock();
try {
- NodeToCountedTags perAppTagsMapping =
+ TypeToCountedTags perAppTagsMapping =
perAppNodeMappings.get(applicationId);
- NodeToCountedTags perAppRackTagsMapping =
+ TypeToCountedTags perAppRackTagsMapping =
perAppRackMappings.get(applicationId);
if (perAppTagsMapping == null) {
return;
@@ -354,6 +399,34 @@ public class AllocationTagsManager {
}
/**
+ * Method removes temporary containers associated with an application
+ * Used by the placement algorithm to clean temporary tags at the end of
+ * a placement cycle.
+ * @param applicationId Application Id.
+ */
+ public void cleanTempContainers(ApplicationId applicationId) {
+
+ if (!appTempMappings.get(applicationId).isEmpty()) {
+ appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
+ nodeE.getValue().entrySet().stream().forEach(containerE -> {
+ removeContainer(nodeE.getKey(), containerE.getKey(),
+ containerE.getValue());
+ });
+ });
+ writeLock.lock();
+ try {
+ appTempMappings.remove(applicationId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed TEMP containers of app=" + applicationId);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ }
+
+
+ /**
* Get Node cardinality for a specific tag.
* When applicationId is null, method returns aggregated cardinality
*
@@ -378,7 +451,7 @@ public class AllocationTagsManager {
"Must specify nodeId/tag to query cardinality");
}
- NodeToCountedTags mapping;
+ TypeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppNodeMappings.get(applicationId);
} else {
@@ -419,7 +492,7 @@ public class AllocationTagsManager {
"Must specify rack/tag to query cardinality");
}
- NodeToCountedTags mapping;
+ TypeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppRackMappings.get(applicationId);
} else {
@@ -492,7 +565,7 @@ public class AllocationTagsManager {
"Must specify nodeId/tags/op to query cardinality");
}
- NodeToCountedTags mapping;
+ TypeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppNodeMappings.get(applicationId);
} else {
@@ -540,7 +613,7 @@ public class AllocationTagsManager {
"Must specify rack/tags/op to query cardinality");
}
- NodeToCountedTags mapping;
+ TypeToCountedTags mapping;
if (applicationId != null) {
mapping = perAppRackMappings.get(applicationId);
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
new file mode 100644
index 0000000..395c156
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic placement algorithm.
+ * Supports different Iterators at SchedulingRequest level including:
+ * Serial, PopularTags
+ */
+public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultPlacementAlgorithm.class);
+
+ private AllocationTagsManager tagsManager;
+ private PlacementConstraintManager constraintManager;
+ private NodeCandidateSelector nodeSelector;
+
+ @Override
+ public void init(RMContext rmContext) {
+ this.tagsManager = rmContext.getAllocationTagsManager();
+ this.constraintManager = rmContext.getPlacementConstraintManager();
+ this.nodeSelector =
+ filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
+ .getNodes(filter);
+ }
+
+ /**
+ * TODO: Method will be moved to PlacementConstraintsUtil class (YARN-7682)
+ * @param applicationId
+ * @param allocationTags
+ * @param nodeId
+ * @param tagsManager
+ * @return boolean
+ * @throws InvalidAllocationTagsQueryException
+ */
+ public boolean canAssign(ApplicationId applicationId,
+ Set<String> allocationTags, NodeId nodeId,
+ AllocationTagsManager tagsManager)
+ throws InvalidAllocationTagsQueryException {
+ PlacementConstraint constraint =
+ constraintManager.getConstraint(applicationId, allocationTags);
+ if (constraint == null) {
+ return true;
+ }
+ // TODO: proper transformations
+ // Currently works only for simple anti-affinity
+ // NODE scope target expressions
+ PlacementConstraintTransformations.SpecializedConstraintTransformer transformer =
+ new PlacementConstraintTransformations.SpecializedConstraintTransformer(
+ constraint);
+ PlacementConstraint transform = transformer.transform();
+ PlacementConstraint.TargetConstraint targetConstraint =
+ (PlacementConstraint.TargetConstraint) transform.getConstraintExpr();
+ // Assume a single target expression tag;
+ // The Sample Algorithm assumes a constraint will always be a simple
+ // Target Constraint with a single entry in the target set.
+ // As mentioned in the class javadoc - This algorithm should be
+ // used mostly for testing and validating end-2-end workflow.
+ String targetTag = targetConstraint.getTargetExpressions().iterator().next()
+ .getTargetValues().iterator().next();
+ // TODO: Assuming anti-affinity constraint
+ long nodeCardinality =
+ tagsManager.getNodeCardinality(nodeId, applicationId, targetTag);
+ if (nodeCardinality != 0) {
+ return false;
+ }
+ // return true if it is a valid placement
+ return true;
+ }
+
+ public boolean attemptPlacementOnNode(ApplicationId appId,
+ SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
+ throws InvalidAllocationTagsQueryException {
+ int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
+ if (numAllocs > 0) {
+ if (canAssign(appId,
+ schedulingRequest.getAllocationTags(), schedulerNode.getNodeID(),
+ tagsManager)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ @Override
+ public void place(ConstraintPlacementAlgorithmInput input,
+ ConstraintPlacementAlgorithmOutputCollector collector) {
+ BatchedRequests requests = (BatchedRequests) input;
+ ConstraintPlacementAlgorithmOutput resp =
+ new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
+ List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
+
+ Iterator<SchedulingRequest> requestIterator = requests.iterator();
+ while (requestIterator.hasNext()) {
+ SchedulingRequest schedulingRequest = requestIterator.next();
+ Iterator<SchedulerNode> nodeIter = allNodes.iterator();
+ int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
+ while (nodeIter.hasNext() && numAllocs > 0) {
+ SchedulerNode node = nodeIter.next();
+ try {
+ if (attemptPlacementOnNode(requests.getApplicationId(),
+ schedulingRequest, node)) {
+ schedulingRequest.getResourceSizing()
+ .setNumAllocations(--numAllocs);
+ PlacedSchedulingRequest placedReq =
+ new PlacedSchedulingRequest(schedulingRequest);
+ placedReq.setPlacementAttempt(requests.getPlacementAttempt());
+ placedReq.getNodes().add(node);
+ resp.getPlacedRequests().add(placedReq);
+ numAllocs =
+ schedulingRequest.getResourceSizing().getNumAllocations();
+ // Add temp-container tags for current placement cycle
+ this.tagsManager.addTempContainer(node.getNodeID(),
+ requests.getApplicationId(),
+ schedulingRequest.getAllocationTags());
+ }
+ } catch (InvalidAllocationTagsQueryException e) {
+ LOG.warn("Got exception from TagManager !", e);
+ }
+ }
+ }
+ // Add all requests whose numAllocations still > 0 to rejected list.
+ requests.getSchedulingRequests().stream()
+ .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
+ .forEach(rejReq -> resp.getRejectedRequests().add(rejReq));
+ collector.collect(resp);
+ // Clean current temp-container tags
+ this.tagsManager.cleanTempContainers(requests.getApplicationId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java
new file mode 100644
index 0000000..ca3e351
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/PopularTagsIterator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+
+/**
+ * Traverse Scheduling requests with the most popular tags (count) first.
+ * Currently the count is per Batch but could use TagManager for global count.
+ */
+public class PopularTagsIterator implements Iterator<SchedulingRequest> {
+
+ private final List<SchedulingRequest> schedulingRequestList;
+ private int cursor;
+
+ public PopularTagsIterator(Collection<SchedulingRequest> schedulingRequests) {
+ this.schedulingRequestList = new ArrayList<>(schedulingRequests);
+ // Most popular First
+ Collections.sort(schedulingRequestList,
+ (o1, o2) -> (int) getTagPopularity(o2) - (int) getTagPopularity(o1));
+
+ this.cursor = 0;
+ }
+
+ private long getTagPopularity(SchedulingRequest o1) {
+ long max = 0;
+ for (String tag : o1.getAllocationTags()) {
+ long count = schedulingRequestList.stream()
+ .filter(req -> req.getAllocationTags().contains(tag)).count();
+ if (count > max) {
+ max = count;
+ }
+ }
+ return max;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (cursor < schedulingRequestList.size());
+ }
+
+ @Override
+ public SchedulingRequest next() {
+ if (hasNext()) {
+ return schedulingRequestList.get(cursor++);
+ }
+ throw new NoSuchElementException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java
new file mode 100644
index 0000000..68733a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/SerialIterator.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+
+/**
+ * Traverse Scheduling Requests in the same order as they arrive
+ */
+public class SerialIterator implements Iterator<SchedulingRequest> {
+
+ private final List<SchedulingRequest> schedulingRequestList;
+ private int cursor;
+
+ public SerialIterator(Collection<SchedulingRequest> schedulingRequests) {
+ this.schedulingRequestList = new ArrayList<>(schedulingRequests);
+ this.cursor = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (cursor < schedulingRequestList.size());
+ }
+
+ @Override
+ public SchedulingRequest next() {
+ if (hasNext()) {
+ return schedulingRequestList.get(cursor++);
+ }
+ throw new NoSuchElementException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/package-info.java
new file mode 100644
index 0000000..c84671e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/iterators/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ * contains classes related to scheduling containers using placement
+ * constraints.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/package-info.java
new file mode 100644
index 0000000..bb82077
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ * contains classes related to scheduling containers using placement
+ * constraints.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
index fe92d2f..8b04860 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java
@@ -21,12 +21,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators.PopularTagsIterator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.iterators.SerialIterator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -35,7 +38,8 @@ import java.util.Set;
* to place as a batch. The placement algorithm tends to give more optimal
* placements if more requests are batched together.
*/
-class BatchedRequests implements ConstraintPlacementAlgorithmInput {
+public class BatchedRequests
+ implements ConstraintPlacementAlgorithmInput, Iterable<SchedulingRequest> {
// PlacementAlgorithmOutput attempt - the number of times the requests in this
// batch has been placed but was rejected by the scheduler.
@@ -44,19 +48,46 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
private final ApplicationId applicationId;
private final Collection<SchedulingRequest> requests;
private final Map<String, Set<NodeId>> blacklist = new HashMap<>();
+ private IteratorType iteratorType;
- BatchedRequests(ApplicationId applicationId,
+ /**
+ * Iterator Type.
+ */
+ public enum IteratorType {
+ SERIAL,
+ POPULAR_TAGS
+ }
+
+ public BatchedRequests(IteratorType type, ApplicationId applicationId,
Collection<SchedulingRequest> requests, int attempt) {
+ this.iteratorType = type;
this.applicationId = applicationId;
this.requests = requests;
this.placementAttempt = attempt;
}
/**
+ * Exposes SchedulingRequest Iterator interface which can be used
+ * to traverse requests using different heuristics i.e. Tag Popularity
+ * @return SchedulingRequest Iterator.
+ */
+ @Override
+ public Iterator<SchedulingRequest> iterator() {
+ switch (this.iteratorType) {
+ case SERIAL:
+ return new SerialIterator(requests);
+ case POPULAR_TAGS:
+ return new PopularTagsIterator(requests);
+ default:
+ return null;
+ }
+ }
+
+ /**
* Get Application Id.
* @return Application Id.
*/
- ApplicationId getApplicationId() {
+ public ApplicationId getApplicationId() {
return applicationId;
}
@@ -73,11 +104,11 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
* Add a Scheduling request to the batch.
* @param req Scheduling Request.
*/
- void addToBatch(SchedulingRequest req) {
+ public void addToBatch(SchedulingRequest req) {
requests.add(req);
}
- void addToBlacklist(Set<String> tags, SchedulerNode node) {
+ public void addToBlacklist(Set<String> tags, SchedulerNode node) {
if (tags != null && !tags.isEmpty()) {
// We are currently assuming a single allocation tag
// per scheduler request currently.
@@ -90,7 +121,7 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
* Get placement attempt.
* @return PlacementAlgorithmOutput placement Attempt.
*/
- int getPlacementAttempt() {
+ public int getPlacementAttempt() {
return placementAttempt;
}
@@ -99,7 +130,7 @@ class BatchedRequests implements ConstraintPlacementAlgorithmInput {
* @param tag Tag.
* @return Set of blacklisted Nodes.
*/
- Set<NodeId> getBlacklist(String tag) {
+ public Set<NodeId> getBlacklist(String tag) {
return blacklist.getOrDefault(tag, Collections.EMPTY_SET);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
index d613d4e..8e9c79c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
@@ -98,6 +100,7 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
private Map<ApplicationId, List<SchedulingRequest>> requestsToReject =
new ConcurrentHashMap<>();
+ private BatchedRequests.IteratorType iteratorType;
private PlacementDispatcher placementDispatcher;
@@ -122,9 +125,20 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
if (instances != null && !instances.isEmpty()) {
algorithm = instances.get(0);
} else {
- algorithm = new SamplePlacementAlgorithm();
+ algorithm = new DefaultPlacementAlgorithm();
+ }
+ LOG.info("Placement Algorithm [{}]", algorithm.getClass().getName());
+
+ String iteratorName = ((RMContextImpl) amsContext).getYarnConfiguration()
+ .get(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_ITERATOR,
+ BatchedRequests.IteratorType.SERIAL.name());
+ LOG.info("Placement Algorithm Iterator[{}]", iteratorName);
+ try {
+ iteratorType = BatchedRequests.IteratorType.valueOf(iteratorName);
+ } catch (IllegalArgumentException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate Placement Algorithm Iterator: ", e);
}
- LOG.info("Planning Algorithm [{}]", algorithm.getClass().getName());
int algoPSize = ((RMContextImpl) amsContext).getYarnConfiguration().getInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ALGORITHM_POOL_SIZE,
@@ -188,9 +202,8 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
List<SchedulingRequest> schedulingRequests) {
if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
- this.placementDispatcher.dispatch(
- new BatchedRequests(appAttemptId.getApplicationId(),
- schedulingRequests, 1));
+ this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
+ appAttemptId.getApplicationId(), schedulingRequests, 1));
}
}
@@ -329,11 +342,10 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
}
}
if (!isAdded) {
- BatchedRequests br =
- new BatchedRequests(schedulerResponse.getApplicationId(),
- Collections.singleton(
- schedulerResponse.getSchedulingRequest()),
- placementAttempt + 1);
+ BatchedRequests br = new BatchedRequests(iteratorType,
+ schedulerResponse.getApplicationId(),
+ Collections.singleton(schedulerResponse.getSchedulingRequest()),
+ placementAttempt + 1);
reqsToRetry.add(br);
br.addToBlacklist(
schedulerResponse.getSchedulingRequest().getAllocationTags(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
deleted file mode 100644
index 8d49801..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/SamplePlacementAlgorithm.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
-
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetConstraint;
-import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SpecializedConstraintTransformer;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmInput;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Sample Test algorithm. Assumes anti-affinity always
- * It also assumes the numAllocations in resource sizing is always = 1
- *
- * NOTE: This is just a sample implementation. Not be actually used
- */
-public class SamplePlacementAlgorithm implements ConstraintPlacementAlgorithm {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(SamplePlacementAlgorithm.class);
-
- private AllocationTagsManager tagsManager;
- private PlacementConstraintManager constraintManager;
- private NodeCandidateSelector nodeSelector;
-
- @Override
- public void init(RMContext rmContext) {
- this.tagsManager = rmContext.getAllocationTagsManager();
- this.constraintManager = rmContext.getPlacementConstraintManager();
- this.nodeSelector =
- filter -> ((AbstractYarnScheduler)(rmContext)
- .getScheduler()).getNodes(filter);
- }
-
- @Override
- public void place(ConstraintPlacementAlgorithmInput input,
- ConstraintPlacementAlgorithmOutputCollector collector) {
- BatchedRequests requests = (BatchedRequests)input;
- ConstraintPlacementAlgorithmOutput resp =
- new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
- List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
- Map<String, List<SchedulingRequest>> tagIndexedRequests = new HashMap<>();
- requests.getSchedulingRequests()
- .stream()
- .filter(r -> r.getAllocationTags() != null)
- .forEach(
- req -> req.getAllocationTags().forEach(
- tag -> tagIndexedRequests.computeIfAbsent(tag,
- k -> new ArrayList<>()).add(req))
- );
- for (Map.Entry<String, List<SchedulingRequest>> entry :
- tagIndexedRequests.entrySet()) {
- String tag = entry.getKey();
- PlacementConstraint constraint =
- constraintManager.getConstraint(requests.getApplicationId(),
- Collections.singleton(tag));
- if (constraint != null) {
- // Currently works only for simple anti-affinity
- // NODE scope target expressions
- SpecializedConstraintTransformer transformer =
- new SpecializedConstraintTransformer(constraint);
- PlacementConstraint transform = transformer.transform();
- TargetConstraint targetConstraint =
- (TargetConstraint) transform.getConstraintExpr();
- // Assume a single target expression tag;
- // The Sample Algorithm assumes a constraint will always be a simple
- // Target Constraint with a single entry in the target set.
- // As mentioned in the class javadoc - This algorithm should be
- // used mostly for testing and validating end-2-end workflow.
- String targetTag =
- targetConstraint.getTargetExpressions().iterator().next()
- .getTargetValues().iterator().next();
- // iterate over all nodes
- Iterator<SchedulerNode> nodeIter = allNodes.iterator();
- List<SchedulingRequest> schedulingRequests = entry.getValue();
- Iterator<SchedulingRequest> reqIter = schedulingRequests.iterator();
- while (reqIter.hasNext()) {
- SchedulingRequest sReq = reqIter.next();
- int numAllocs = sReq.getResourceSizing().getNumAllocations();
- while (numAllocs > 0 && nodeIter.hasNext()) {
- SchedulerNode node = nodeIter.next();
- long nodeCardinality = 0;
- try {
- nodeCardinality = tagsManager.getNodeCardinality(
- node.getNodeID(), requests.getApplicationId(),
- targetTag);
- if (nodeCardinality == 0 &&
- !requests.getBlacklist(tag).contains(node.getNodeID())) {
- numAllocs--;
- sReq.getResourceSizing().setNumAllocations(numAllocs);
- PlacedSchedulingRequest placedReq =
- new PlacedSchedulingRequest(sReq);
- placedReq.setPlacementAttempt(requests.getPlacementAttempt());
- placedReq.getNodes().add(node);
- resp.getPlacedRequests().add(placedReq);
- }
- } catch (InvalidAllocationTagsQueryException e) {
- LOG.warn("Got exception from TagManager !", e);
- }
- }
- }
- }
- }
- // Add all requests whose numAllocations still > 0 to rejected list.
- requests.getSchedulingRequests().stream()
- .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0)
- .forEach(rejReq -> resp.getRejectedRequests().add(rejReq));
- collector.collect(resp);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
index 0ce1614..f1d5663 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java
@@ -75,24 +75,24 @@ public class TestAllocationTagsManager {
// 3 Containers from app1
atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
+ TestUtils.getMockContainerId(1, 3),
ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
+ TestUtils.getMockContainerId(1, 4),
ImmutableSet.of("reducer"));
// 1 Container from app2
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
+ TestUtils.getMockContainerId(2, 3),
ImmutableSet.of("service"));
// Get Node Cardinality of app1 on node1, with tag "mapper"
@@ -170,24 +170,21 @@ public class TestAllocationTagsManager {
// Finish all containers:
atm.removeContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
atm.removeContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
- ImmutableSet.of("reducer"));
+ TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
atm.removeContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// Expect all cardinality to be 0
// Get Cardinality of app1 on node1, with tag "mapper"
@@ -270,25 +267,22 @@ public class TestAllocationTagsManager {
// 3 Containers from app1
atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 2),
+ TestUtils.getMockContainerId(2, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 4),
- ImmutableSet.of("reducer"));
+ TestUtils.getMockContainerId(2, 4), ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
// 1 Container from app2
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// Get Rack Cardinality of app1 on rack0, with tag "mapper"
Assert.assertEquals(1, atm.getRackCardinality("rack0",
@@ -325,45 +319,39 @@ public class TestAllocationTagsManager {
// Add a bunch of containers
atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
- ImmutableSet.of("reducer"));
+ TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// Remove all these containers
atm.removeContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.removeContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
atm.removeContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
- ImmutableSet.of("reducer"));
+ TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
atm.removeContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// Check internal data structure
Assert.assertEquals(0,
@@ -375,6 +363,87 @@ public class TestAllocationTagsManager {
}
@Test
+ public void testTempContainerAllocations()
+ throws InvalidAllocationTagsQueryException {
+ /**
+ * Construct both TEMP and normal containers: Node1: TEMP container_1_1
+ * (mapper/reducer/app_1) container_1_2 (service/app_1)
+ *
+ * Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
+ */
+
+ AllocationTagsManager atm = new AllocationTagsManager(rmContext);
+
+ // 3 Containers from app1
+ atm.addTempContainer(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1),
+ ImmutableSet.of("mapper", "reducer"));
+
+ atm.addContainer(NodeId.fromString("host1:123"),
+ TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
+
+ atm.addContainer(NodeId.fromString("host2:123"),
+ TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
+
+ // 1 Container from app2
+ atm.addTempContainer(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
+
+ // Expect tag mappings to be present including temp Tags
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+ Long::sum));
+
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+ Long::sum));
+
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+ Long::sum));
+
+ // Do a temp Tag cleanup on app2
+ atm.cleanTempContainers(TestUtils.getMockApplicationId(2));
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+ Long::sum));
+ // Expect app1 to be unaffected
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+ Long::sum));
+ // Do a cleanup on app1 as well
+ atm.cleanTempContainers(TestUtils.getMockApplicationId(1));
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
+ Long::sum));
+
+ // Non temp-tags should be unaffected
+ Assert.assertEquals(1,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
+ TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
+ Long::sum));
+
+ Assert.assertEquals(0,
+ atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
+ TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
+ Long::sum));
+
+ // Expect app2 with no containers, and app1 with 2 containers across 2 nodes
+ Assert.assertEquals(2,
+ atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
+ .getTypeToTagsWithCount().size());
+
+ Assert.assertNull(
+ atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
+ }
+
+ @Test
public void testQueryCardinalityWithIllegalParameters()
throws InvalidAllocationTagsQueryException {
/**
@@ -385,24 +454,21 @@ public class TestAllocationTagsManager {
// Add a bunch of containers
atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 1),
+ TestUtils.getMockContainerId(1, 1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 2),
+ TestUtils.getMockContainerId(1, 2),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(1, 3), ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(1), TestUtils.getMockContainerId(1, 4),
- ImmutableSet.of("reducer"));
+ TestUtils.getMockContainerId(1, 4), ImmutableSet.of("reducer"));
atm.addContainer(NodeId.fromString("host2:123"),
- TestUtils.getMockApplicationId(2), TestUtils.getMockContainerId(2, 3),
- ImmutableSet.of("service"));
+ TestUtils.getMockContainerId(2, 3), ImmutableSet.of("service"));
// No node-id
boolean caughtException = false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestBatchedRequestsIterators.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestBatchedRequestsIterators.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestBatchedRequestsIterators.java
new file mode 100644
index 0000000..0e7b715
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestBatchedRequestsIterators.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TestPlacementProcessor.schedulingRequest;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test Request Iterator.
+ */
+public class TestBatchedRequestsIterators {
+
+ @Test
+ public void testSerialIterator() throws Exception {
+ List<SchedulingRequest> schedulingRequestList =
+ Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
+ schedulingRequest(1, 2, 1, 512, "foo"),
+ schedulingRequest(1, 3, 1, 512, "foo"),
+ schedulingRequest(1, 4, 1, 512, "foo"));
+
+ BatchedRequests batchedRequests = new BatchedRequests(
+ BatchedRequests.IteratorType.SERIAL, null, schedulingRequestList, 1);
+
+ Iterator<SchedulingRequest> requestIterator = batchedRequests.iterator();
+ long prevAllocId = 0;
+ while (requestIterator.hasNext()) {
+ SchedulingRequest request = requestIterator.next();
+ Assert.assertTrue(request.getAllocationRequestId() > prevAllocId);
+ prevAllocId = request.getAllocationRequestId();
+ }
+ }
+
+ @Test
+ public void testPopularTagsIterator() throws Exception {
+ List<SchedulingRequest> schedulingRequestList =
+ Arrays.asList(schedulingRequest(1, 1, 1, 512, "pri", "foo"),
+ schedulingRequest(1, 2, 1, 512, "bar"),
+ schedulingRequest(1, 3, 1, 512, "foo", "pri"),
+ schedulingRequest(1, 4, 1, 512, "test"),
+ schedulingRequest(1, 5, 1, 512, "pri", "bar"));
+
+ BatchedRequests batchedRequests =
+ new BatchedRequests(BatchedRequests.IteratorType.POPULAR_TAGS, null,
+ schedulingRequestList, 1);
+
+ Iterator<SchedulingRequest> requestIterator = batchedRequests.iterator();
+ long recCcount = 0;
+ while (requestIterator.hasNext()) {
+ SchedulingRequest request = requestIterator.next();
+ if (recCcount < 3) {
+ Assert.assertTrue(request.getAllocationTags().contains("pri"));
+ } else {
+ Assert.assertTrue(request.getAllocationTags().contains("bar")
+ || request.getAllocationTags().contains("test"));
+ }
+ recCcount++;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a52d11fb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
index db8ae15..87dd5b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java
@@ -373,13 +373,13 @@ public class TestPlacementProcessor {
rej.getReason());
}
- private static SchedulingRequest schedulingRequest(
+ protected static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int cores, int mem, String... tags) {
return schedulingRequest(priority, allocReqId, cores, mem,
ExecutionType.GUARANTEED, tags);
}
- private static SchedulingRequest schedulingRequest(
+ protected static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int cores, int mem,
ExecutionType execType, String... tags) {
return SchedulingRequest.newBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org