You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/30 18:10:05 UTC
[23/50] [abbrv] hadoop git commit: YARN-6599. Support anti-affinity
constraint via AppPlacementAllocator. (Wangda Tan via asuresh)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index 73b4f9e..24c5a5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import java.util.Iterator;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -30,9 +32,12 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
+
/**
* This class contains various static methods used by the Placement Algorithms
* to simplify constrained placement.
@@ -41,16 +46,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algori
@Public
@Unstable
public final class PlacementConstraintsUtil {
+ private static final Log LOG =
+ LogFactory.getLog(PlacementConstraintsUtil.class);
// Suppresses default constructor, ensuring non-instantiability.
private PlacementConstraintsUtil() {
}
/**
- * Returns true if **single** application constraint with associated
+ * Returns true if **single** placement constraint with associated
* allocationTags and scope is satisfied by a specific scheduler Node.
*
- * @param appId the application id
+ * @param targetApplicationId the application id, which could be override by
+ * target application id specified inside allocation
+ * tags.
* @param sc the placement constraint
* @param te the target expression
* @param node the scheduler node
@@ -59,32 +68,123 @@ public final class PlacementConstraintsUtil {
* @throws InvalidAllocationTagsQueryException
*/
private static boolean canSatisfySingleConstraintExpression(
- ApplicationId appId, SingleConstraint sc, TargetExpression te,
- SchedulerNode node, AllocationTagsManager tm)
+ ApplicationId targetApplicationId, SingleConstraint sc,
+ TargetExpression te, SchedulerNode node, AllocationTagsManager tm)
throws InvalidAllocationTagsQueryException {
long minScopeCardinality = 0;
long maxScopeCardinality = 0;
+
+ // Optimizations to only check cardinality if necessary.
+ int desiredMinCardinality = sc.getMinCardinality();
+ int desiredMaxCardinality = sc.getMaxCardinality();
+ boolean checkMinCardinality = desiredMinCardinality > 0;
+ boolean checkMaxCardinality = desiredMaxCardinality < Integer.MAX_VALUE;
+
if (sc.getScope().equals(PlacementConstraints.NODE)) {
- minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
- te.getTargetValues(), Long::max);
- maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
- te.getTargetValues(), Long::min);
+ if (checkMinCardinality) {
+ minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
+ targetApplicationId, te.getTargetValues(), Long::max);
+ }
+ if (checkMaxCardinality) {
+ maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
+ targetApplicationId, te.getTargetValues(), Long::min);
+ }
} else if (sc.getScope().equals(PlacementConstraints.RACK)) {
- minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
- te.getTargetValues(), Long::max);
- maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
- te.getTargetValues(), Long::min);
+ if (checkMinCardinality) {
+ minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
+ targetApplicationId, te.getTargetValues(), Long::max);
+ }
+ if (checkMaxCardinality) {
+ maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
+ targetApplicationId, te.getTargetValues(), Long::min);
+ }
}
// Make sure Anti-affinity satisfies hard upper limit
- maxScopeCardinality = sc.getMaxCardinality() == 0 ? maxScopeCardinality - 1
+ maxScopeCardinality = desiredMaxCardinality == 0 ? maxScopeCardinality - 1
: maxScopeCardinality;
- return (minScopeCardinality >= sc.getMinCardinality()
- && maxScopeCardinality < sc.getMaxCardinality());
+ return (desiredMinCardinality <= 0
+ || minScopeCardinality >= desiredMinCardinality) && (
+ desiredMaxCardinality == Integer.MAX_VALUE
+ || maxScopeCardinality < desiredMaxCardinality);
+ }
+
+ private static boolean canSatisfyNodePartitionConstraintExpresssion(
+ TargetExpression targetExpression, SchedulerNode schedulerNode) {
+ Set<String> values = targetExpression.getTargetValues();
+ if (values == null || values.isEmpty()) {
+ return schedulerNode.getPartition().equals(
+ RMNodeLabelsManager.NO_LABEL);
+ } else{
+ String nodePartition = values.iterator().next();
+ if (!nodePartition.equals(schedulerNode.getPartition())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
+ SingleConstraint singleConstraint, SchedulerNode schedulerNode,
+ AllocationTagsManager tagsManager)
+ throws InvalidAllocationTagsQueryException {
+ // Iterate through TargetExpressions
+ Iterator<TargetExpression> expIt =
+ singleConstraint.getTargetExpressions().iterator();
+ while (expIt.hasNext()) {
+ TargetExpression currentExp = expIt.next();
+ // Supporting AllocationTag Expressions for now
+ if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) {
+ // Check if conditions are met
+ if (!canSatisfySingleConstraintExpression(applicationId,
+ singleConstraint, currentExp, schedulerNode, tagsManager)) {
+ return false;
+ }
+ } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)
+ && currentExp.getTargetKey().equals(NODE_PARTITION)) {
+ // This is a node partition expression, check it.
+ canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode);
+ }
+ }
+ // return true if all targetExpressions are satisfied
+ return true;
+ }
+
+ /**
+ * Returns true if all placement constraints are **currently** satisfied by a
+ * specific scheduler Node..
+ *
+ * To do so the method retrieves and goes through all application constraint
+ * expressions and checks if the specific allocation is between the allowed
+ * min-max cardinality values under the constraint scope (Node/Rack/etc).
+ *
+ * @param applicationId applicationId,
+ * @param placementConstraint placement constraint.
+ * @param node the scheduler node
+ * @param tagsManager the allocation tags store
+ * @return true if all application constraints are satisfied by node
+ * @throws InvalidAllocationTagsQueryException
+ */
+ public static boolean canSatisfySingleConstraint(ApplicationId applicationId,
+ PlacementConstraint placementConstraint, SchedulerNode node,
+ AllocationTagsManager tagsManager)
+ throws InvalidAllocationTagsQueryException {
+ if (placementConstraint == null) {
+ return true;
+ }
+ // Transform to SimpleConstraint
+ SingleConstraintTransformer singleTransformer =
+ new SingleConstraintTransformer(placementConstraint);
+ placementConstraint = singleTransformer.transform();
+ AbstractConstraint sConstraintExpr = placementConstraint.getConstraintExpr();
+ SingleConstraint single = (SingleConstraint) sConstraintExpr;
+
+ return canSatisfySingleConstraint(applicationId, single, node, tagsManager);
}
/**
- * Returns true if all application constraints with associated allocationTags
+ * Returns true if all placement constraints with associated allocationTags
* are **currently** satisfied by a specific scheduler Node.
* To do so the method retrieves and goes through all application constraint
* expressions and checks if the specific allocation is between the allowed
@@ -98,41 +198,12 @@ public final class PlacementConstraintsUtil {
* @return true if all application constraints are satisfied by node
* @throws InvalidAllocationTagsQueryException
*/
- public static boolean canSatisfyConstraints(ApplicationId appId,
+ public static boolean canSatisfySingleConstraint(ApplicationId appId,
Set<String> allocationTags, SchedulerNode node,
PlacementConstraintManager pcm, AllocationTagsManager tagsManager)
throws InvalidAllocationTagsQueryException {
PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags);
- if (constraint == null) {
- return true;
- }
- // Transform to SimpleConstraint
- SingleConstraintTransformer singleTransformer =
- new SingleConstraintTransformer(constraint);
- constraint = singleTransformer.transform();
- AbstractConstraint sConstraintExpr = constraint.getConstraintExpr();
- SingleConstraint single = (SingleConstraint) sConstraintExpr;
- // Iterate through TargetExpressions
- Iterator<TargetExpression> expIt = single.getTargetExpressions().iterator();
- while (expIt.hasNext()) {
- TargetExpression currentExp = expIt.next();
- // Supporting AllocationTag Expressions for now
- if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) {
- // If source and tag allocation tags are the same, we do not enforce
- // constraints with minimum cardinality.
- if (currentExp.getTargetValues().equals(allocationTags)
- && single.getMinCardinality() > 0) {
- return true;
- }
- // Check if conditions are met
- if (!canSatisfySingleConstraintExpression(appId, single, currentExp,
- node, tagsManager)) {
- return false;
- }
- }
- }
- // return true if all targetExpressions are satisfied
- return true;
+ return canSatisfySingleConstraint(appId, constraint, node, tagsManager);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 9ed9ab1..eb3fe88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -67,7 +67,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
throws InvalidAllocationTagsQueryException {
int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
if (numAllocs > 0) {
- if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
+ if (PlacementConstraintsUtil.canSatisfySingleConstraint(appId,
schedulingRequest.getAllocationTags(), schedulerNode,
constraintManager, tagsManager)) {
return true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
index 8e9c79c..2a6b889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
@@ -188,12 +188,18 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
@Override
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response) throws YarnException {
+ // Copy the scheduling request since we will clear it later after sending
+ // to dispatcher
List<SchedulingRequest> schedulingRequests =
- request.getSchedulingRequests();
+ new ArrayList<>(request.getSchedulingRequests());
dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
reDispatchRetryableRequests(appAttemptId);
schedulePlacedRequests(appAttemptId);
+ // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest
+ // added to scheduler.
+ request.setSchedulingRequests(Collections.emptyList());
+
nextAMSProcessor.allocate(appAttemptId, request, response);
handleRejectedRequests(appAttemptId, response);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index e2a62ec..1f85814 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -830,9 +831,9 @@ public class FairScheduler extends
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
- List<ResourceRequest> ask, List<ContainerId> release,
- List<String> blacklistAdditions, List<String> blacklistRemovals,
- ContainerUpdates updateRequests) {
+ List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
+ List<ContainerId> release, List<String> blacklistAdditions,
+ List<String> blacklistRemovals, ContainerUpdates updateRequests) {
// Make sure this application exists
FSAppAttempt application = getSchedulerApp(appAttemptId);
@@ -857,7 +858,9 @@ public class FairScheduler extends
handleContainerUpdates(application, updateRequests);
// Sanity check
- normalizeRequests(ask);
+ normalizeResourceRequests(ask);
+
+ // TODO, normalize SchedulingRequest
// Record container allocation start time
application.recordContainerRequestTime(getClock().getTime());
@@ -879,6 +882,7 @@ public class FairScheduler extends
// Update application requests
application.updateResourceRequests(ask);
+ // TODO, handle SchedulingRequest
application.showRequests();
}
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 59b9608..7ac9027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -320,8 +321,8 @@ public class FifoScheduler extends
@Override
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
- List<ResourceRequest> ask, List<ContainerId> release,
- List<String> blacklistAdditions, List<String> blacklistRemovals,
+ List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
+ List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals,
ContainerUpdates updateRequests) {
FifoAppAttempt application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
@@ -342,7 +343,7 @@ public class FifoScheduler extends
}
// Sanity check
- normalizeRequests(ask);
+ normalizeResourceRequests(ask);
// Release containers
releaseContainers(release, application);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
index 5c49450..72a6c4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -29,7 +31,6 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
/**
@@ -50,13 +51,18 @@ import java.util.Map;
* requests.
* </p>
*/
-public interface AppPlacementAllocator<N extends SchedulerNode> {
+public abstract class AppPlacementAllocator<N extends SchedulerNode> {
+ protected AppSchedulingInfo appSchedulingInfo;
+ protected SchedulerRequestKey schedulerRequestKey;
+ protected RMContext rmContext;
+
/**
* Get iterator of preferred node depends on requirement and/or availability
* @param candidateNodeSet input CandidateNodeSet
* @return iterator of preferred node
*/
- Iterator<N> getPreferredNodeIterator(CandidateNodeSet<N> candidateNodeSet);
+ public abstract Iterator<N> getPreferredNodeIterator(
+ CandidateNodeSet<N> candidateNodeSet);
/**
* Replace existing pending asks by the new requests
@@ -66,15 +72,29 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* requests for preempted container
* @return true if total pending resource changed
*/
- PendingAskUpdateResult updatePendingAsk(
+ public abstract PendingAskUpdateResult updatePendingAsk(
Collection<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer);
/**
+ * Replace existing pending asks by the new SchedulingRequest
+ *
+ * @param schedulerRequestKey scheduler request key
+ * @param schedulingRequest new asks
+ * @param recoverPreemptedRequestForAContainer if we're recovering resource
+ * requests for preempted container
+ * @return true if total pending resource changed
+ */
+ public abstract PendingAskUpdateResult updatePendingAsk(
+ SchedulerRequestKey schedulerRequestKey,
+ SchedulingRequest schedulingRequest,
+ boolean recoverPreemptedRequestForAContainer);
+
+ /**
* Get pending ResourceRequests by given schedulerRequestKey
* @return Map of resourceName to ResourceRequest
*/
- Map<String, ResourceRequest> getResourceRequests();
+ public abstract Map<String, ResourceRequest> getResourceRequests();
/**
* Get pending ask for given resourceName. If there's no such pendingAsk,
@@ -83,7 +103,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* @param resourceName resourceName
* @return PendingAsk
*/
- PendingAsk getPendingAsk(String resourceName);
+ public abstract PendingAsk getPendingAsk(String resourceName);
/**
* Get #pending-allocations for given resourceName. If there's no such
@@ -92,7 +112,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* @param resourceName resourceName
* @return #pending-allocations
*/
- int getOutstandingAsksCount(String resourceName);
+ public abstract int getOutstandingAsksCount(String resourceName);
/**
* Notify container allocated.
@@ -103,7 +123,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* the container. This will be used by scheduler to recover requests.
* Please refer to {@link ContainerRequest} for more details.
*/
- ContainerRequest allocate(SchedulerRequestKey schedulerKey,
+ public abstract ContainerRequest allocate(SchedulerRequestKey schedulerKey,
NodeType type, SchedulerNode node);
/**
@@ -112,7 +132,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* @param node which node we will allocate on
* @return true if we has pending requirement
*/
- boolean canAllocate(NodeType type, SchedulerNode node);
+ public abstract boolean canAllocate(NodeType type, SchedulerNode node);
/**
* Can delay to give locality?
@@ -123,16 +143,16 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* @param resourceName resourceName
* @return can/cannot
*/
- boolean canDelayTo(String resourceName);
+ public abstract boolean canDelayTo(String resourceName);
/**
- * Does this {@link AppPlacementAllocator} accept resources on nodePartition?
+ * Does this {@link AppPlacementAllocator} accept resources on given node?
*
- * @param nodePartition nodePartition
+ * @param schedulerNode schedulerNode
* @param schedulingMode schedulingMode
* @return accepted/not
*/
- boolean acceptNodePartition(String nodePartition,
+ public abstract boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode);
/**
@@ -142,7 +162,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
*
* @return primary requested node partition
*/
- String getPrimaryRequestedNodePartition();
+ public abstract String getPrimaryRequestedNodePartition();
/**
* @return number of unique location asks with #pending greater than 0,
@@ -152,18 +172,24 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
* and should belong to specific delay scheduling policy impl.
* See YARN-7457 for more details.
*/
- int getUniqueLocationAsks();
+ public abstract int getUniqueLocationAsks();
/**
* Print human-readable requests to LOG debug.
*/
- void showRequests();
+ public abstract void showRequests();
/**
- * Set app scheduling info.
+ * Initialize this allocator, this will be called by Factory automatically
*
- * @param appSchedulingInfo
- * app info object.
+ * @param appSchedulingInfo appSchedulingInfo
+ * @param schedulerRequestKey schedulerRequestKey
+ * @param rmContext rmContext
*/
- void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo);
+ public void initialize(AppSchedulingInfo appSchedulingInfo,
+ SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
+ this.appSchedulingInfo = appSchedulingInfo;
+ this.rmContext = rmContext;
+ this.schedulerRequestKey = schedulerRequestKey;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index be1c1cc..a0358b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -22,8 +22,9 @@ import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -46,26 +47,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* containers.
*/
public class LocalityAppPlacementAllocator <N extends SchedulerNode>
- implements AppPlacementAllocator<N> {
+ extends AppPlacementAllocator<N> {
private static final Log LOG =
LogFactory.getLog(LocalityAppPlacementAllocator.class);
private final Map<String, ResourceRequest> resourceRequestMap =
new ConcurrentHashMap<>();
- private AppSchedulingInfo appSchedulingInfo;
private volatile String primaryRequestedPartition =
RMNodeLabelsManager.NO_LABEL;
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
- public LocalityAppPlacementAllocator(AppSchedulingInfo info) {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- readLock = lock.readLock();
- writeLock = lock.writeLock();
- this.appSchedulingInfo = info;
- }
-
public LocalityAppPlacementAllocator() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
@@ -182,6 +175,19 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
}
@Override
+ public PendingAskUpdateResult updatePendingAsk(
+ SchedulerRequestKey schedulerRequestKey,
+ SchedulingRequest schedulingRequest,
+ boolean recoverPreemptedRequestForAContainer)
+ throws SchedulerInvalidResoureRequestException {
+ throw new SchedulerInvalidResoureRequestException(this.getClass().getName()
+ + " not be able to handle SchedulingRequest, there exists a "
+ + "ResourceRequest with the same scheduler key=" + schedulerRequestKey
+ + ", please send SchedulingRequest with a different allocationId and "
+ + "priority");
+ }
+
+ @Override
public Map<String, ResourceRequest> getResourceRequests() {
return resourceRequestMap;
}
@@ -362,13 +368,13 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
}
@Override
- public boolean acceptNodePartition(String nodePartition,
+ public boolean precheckNode(SchedulerNode schedulerNode,
SchedulingMode schedulingMode) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
String nodePartitionToLookAt;
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
- nodePartitionToLookAt = nodePartition;
+ nodePartitionToLookAt = schedulerNode.getPartition();
} else {
nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
}
@@ -425,9 +431,4 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
writeLock.unlock();
}
}
-
- @Override
- public void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo) {
- this.appSchedulingInfo = appSchedulingInfo;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
new file mode 100644
index 0000000..f8f758c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.APPLICATION_LABEL_INTRA_APPLICATION;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
+
+/**
+ * This is a simple implementation to do affinity or anti-affinity for
+ * inter/intra apps.
+ */
+public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
+ extends AppPlacementAllocator<N> {
+ private static final Log LOG =
+ LogFactory.getLog(SingleConstraintAppPlacementAllocator.class);
+
+ private ReentrantReadWriteLock.ReadLock readLock;
+ private ReentrantReadWriteLock.WriteLock writeLock;
+
+ private SchedulingRequest schedulingRequest = null;
+ private String targetNodePartition;
+ private Set<String> targetAllocationTags;
+ private AllocationTagsManager allocationTagsManager;
+
+ public SingleConstraintAppPlacementAllocator() {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Iterator<N> getPreferredNodeIterator(
+ CandidateNodeSet<N> candidateNodeSet) {
+ // Now only handle the case that single node in the candidateNodeSet
+ // TODO, Add support to multi-hosts inside candidateNodeSet which is passed
+ // in.
+
+ N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
+ if (null != singleNode) {
+ return IteratorUtils.singletonIterator(singleNode);
+ }
+
+ return IteratorUtils.emptyIterator();
+ }
+
+ @Override
+ public PendingAskUpdateResult updatePendingAsk(
+ Collection<ResourceRequest> requests,
+ boolean recoverPreemptedRequestForAContainer) {
+ if (requests != null && !requests.isEmpty()) {
+ throw new SchedulerInvalidResoureRequestException(
+ this.getClass().getName()
+ + " not be able to handle ResourceRequest, there exists a "
+ + "SchedulingRequest with the same scheduler key="
+ + SchedulerRequestKey.create(requests.iterator().next())
+ + ", please send ResourceRequest with a different allocationId and "
+ + "priority");
+ }
+
+ // Do nothing
+ return null;
+ }
+
+ private PendingAskUpdateResult internalUpdatePendingAsk(
+ SchedulingRequest newSchedulingRequest, boolean recoverContainer) {
+ // When it is a recover container, there must exists an schedulingRequest.
+ if (recoverContainer && schedulingRequest == null) {
+ throw new SchedulerInvalidResoureRequestException("Trying to recover a "
+ + "container request=" + newSchedulingRequest.toString() + ", however"
+ + "there's no existing scheduling request, this should not happen.");
+ }
+
+ if (schedulingRequest != null) {
+ // If we have an old scheduling request, we will make sure that no changes
+ // made except sizing.
+ // To avoid unnecessary copy of the data structure, we do this by
+ // replacing numAllocations with old numAllocations in the
+ // newSchedulingRequest#getResourceSizing, and compare the two objects.
+ ResourceSizing sizing = newSchedulingRequest.getResourceSizing();
+ int existingNumAllocations =
+ schedulingRequest.getResourceSizing().getNumAllocations();
+
+ // When it is a recovered container request, just set
+ // #newAllocations = #existingAllocations + 1;
+ int newNumAllocations;
+ if (recoverContainer) {
+ newNumAllocations = existingNumAllocations + 1;
+ } else {
+ newNumAllocations = sizing.getNumAllocations();
+ }
+ sizing.setNumAllocations(existingNumAllocations);
+
+ // Compare two objects
+ if (!schedulingRequest.equals(newSchedulingRequest)) {
+ // Rollback #numAllocations
+ sizing.setNumAllocations(newNumAllocations);
+ throw new SchedulerInvalidResoureRequestException(
+ "Invalid updated SchedulingRequest added to scheduler, "
+ + " we only allows changing numAllocations for the updated "
+ + "SchedulingRequest. Old=" + schedulingRequest.toString()
+ + " new=" + newSchedulingRequest.toString()
+ + ", if any fields need to be updated, please cancel the "
+ + "old request (by setting numAllocations to 0) and send a "
+ + "SchedulingRequest with different combination of "
+ + "priority/allocationId");
+ } else {
+ if (newNumAllocations == existingNumAllocations) {
+ // No update on pending asks, return null.
+ return null;
+ }
+ }
+
+ // Rollback #numAllocations
+ sizing.setNumAllocations(newNumAllocations);
+
+ // Basic sanity check
+ if (newNumAllocations < 0) {
+ throw new SchedulerInvalidResoureRequestException(
+ "numAllocation in ResourceSizing field must be >= 0, "
+ + "updating schedulingRequest failed.");
+ }
+
+ PendingAskUpdateResult updateResult = new PendingAskUpdateResult(
+ new PendingAsk(schedulingRequest.getResourceSizing()),
+ new PendingAsk(newSchedulingRequest.getResourceSizing()),
+ targetNodePartition, targetNodePartition);
+
+ // Ok, now everything is same except numAllocation, update numAllocation.
+ this.schedulingRequest.getResourceSizing().setNumAllocations(
+ newNumAllocations);
+ LOG.info(
+ "Update numAllocation from old=" + existingNumAllocations + " to new="
+ + newNumAllocations);
+
+ return updateResult;
+ }
+
+ // For a new schedulingRequest, we need to validate if we support its asks.
+ // This will update internal partitions, etc. after the SchedulingRequest is
+ // valid.
+ validateAndSetSchedulingRequest(newSchedulingRequest);
+
+ return new PendingAskUpdateResult(null,
+ new PendingAsk(newSchedulingRequest.getResourceSizing()), null,
+ targetNodePartition);
+ }
+
+ @Override
+ public PendingAskUpdateResult updatePendingAsk(
+ SchedulerRequestKey schedulerRequestKey,
+ SchedulingRequest newSchedulingRequest,
+ boolean recoverPreemptedRequestForAContainer) {
+ writeLock.lock();
+ try {
+ return internalUpdatePendingAsk(newSchedulingRequest,
+ recoverPreemptedRequestForAContainer);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private String throwExceptionWithMetaInfo(String message) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("AppId=").append(appSchedulingInfo.getApplicationId()).append(
+ " Key=").append(this.schedulerRequestKey).append(". Exception message:")
+ .append(message);
+ throw new SchedulerInvalidResoureRequestException(sb.toString());
+ }
+
+ private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest)
+ throws SchedulerInvalidResoureRequestException {
+ // Check sizing exists
+ if (newSchedulingRequest.getResourceSizing() == null
+ || newSchedulingRequest.getResourceSizing().getResources() == null) {
+ throwExceptionWithMetaInfo(
+ "No ResourceSizing found in the scheduling request, please double "
+ + "check");
+ }
+
+ // Check execution type == GUARANTEED
+ if (newSchedulingRequest.getExecutionType() != null
+ && newSchedulingRequest.getExecutionType().getExecutionType()
+ != ExecutionType.GUARANTEED) {
+ throwExceptionWithMetaInfo(
+ "Only GUARANTEED execution type is supported.");
+ }
+
+ PlacementConstraint constraint =
+ newSchedulingRequest.getPlacementConstraint();
+
+ // We only accept SingleConstraint
+ PlacementConstraint.AbstractConstraint ac = constraint.getConstraintExpr();
+ if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
+ throwExceptionWithMetaInfo(
+ "Only accepts " + PlacementConstraint.SingleConstraint.class.getName()
+ + " as constraint-expression. Rejecting the new added "
+ + "constraint-expression.class=" + ac.getClass().getName());
+ }
+
+ PlacementConstraint.SingleConstraint singleConstraint =
+ (PlacementConstraint.SingleConstraint) ac;
+
+ // Make sure it is an anti-affinity request (actually this implementation
+ // should be able to support both affinity / anti-affinity without much
+ // effort. Considering potential test effort required. Limit to
+ // anti-affinity to intra-app and scope is node.
+ if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
+ throwExceptionWithMetaInfo(
+ "Only support scope=" + PlacementConstraints.NODE
+ + "now. PlacementConstraint=" + singleConstraint);
+ }
+
+ if (singleConstraint.getMinCardinality() != 0
+ || singleConstraint.getMaxCardinality() != 1) {
+ throwExceptionWithMetaInfo(
+ "Only support anti-affinity, which is: minCardinality=0, "
+ + "maxCardinality=1");
+ }
+
+ Set<PlacementConstraint.TargetExpression> targetExpressionSet =
+ singleConstraint.getTargetExpressions();
+ if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
+ throwExceptionWithMetaInfo(
+ "TargetExpression should not be null or empty");
+ }
+
+ // Set node partition
+ String nodePartition = null;
+
+ // Target allocation tags
+ Set<String> targetAllocationTags = null;
+
+ for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) {
+ // Handle node partition
+ if (targetExpression.getTargetType().equals(
+ PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
+ // For node attribute target, we only support Partition now. And once
+ // YARN-3409 is merged, we will support node attribute.
+ if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
+ throwExceptionWithMetaInfo("When TargetType="
+ + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
+ + " only " + NODE_PARTITION + " is accepted as TargetKey.");
+ }
+
+ if (nodePartition != null) {
+ // This means we have duplicated node partition entry inside placement
+ // constraint, which might be set by mistake.
+ throwExceptionWithMetaInfo(
+ "Only one node partition targetExpression is allowed");
+ }
+
+ Set<String> values = targetExpression.getTargetValues();
+ if (values == null || values.isEmpty()) {
+ nodePartition = RMNodeLabelsManager.NO_LABEL;
+ continue;
+ }
+
+ if (values.size() > 1) {
+ throwExceptionWithMetaInfo("Inside one targetExpression, we only "
+ + "support affinity to at most one node partition now");
+ }
+
+ nodePartition = values.iterator().next();
+ } else if (targetExpression.getTargetType().equals(
+ PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
+ // Handle allocation tags
+ if (targetAllocationTags != null) {
+ // This means we have duplicated AllocationTag expressions entries
+ // inside placement constraint, which might be set by mistake.
+ throwExceptionWithMetaInfo(
+ "Only one AllocationTag targetExpression is allowed");
+ }
+
+ if (targetExpression.getTargetValues() == null || targetExpression
+ .getTargetValues().isEmpty()) {
+ throwExceptionWithMetaInfo("Failed to find allocation tags from "
+ + "TargetExpressions or couldn't find self-app target.");
+ }
+
+ targetAllocationTags = new HashSet<>(
+ targetExpression.getTargetValues());
+
+ if (targetExpression.getTargetKey() == null || !targetExpression
+ .getTargetKey().equals(APPLICATION_LABEL_INTRA_APPLICATION)) {
+ throwExceptionWithMetaInfo(
+ "As of now, the only accepted target key for targetKey of "
+ + "allocation_tag target expression is: ["
+ + APPLICATION_LABEL_INTRA_APPLICATION
+ + "]. Please make changes to placement constraints "
+ + "accordingly.");
+ }
+ }
+ }
+
+ if (targetAllocationTags == null) {
+ // That means we don't have ALLOCATION_TAG specified
+ throwExceptionWithMetaInfo(
+ "Couldn't find target expression with type == ALLOCATION_TAG, it is "
+ + "required to include one and only one target expression with "
+ + "type == ALLOCATION_TAG");
+
+ }
+
+ if (nodePartition == null) {
+ nodePartition = RMNodeLabelsManager.NO_LABEL;
+ }
+
+ // Validation is done. set local results:
+ this.targetNodePartition = nodePartition;
+ this.targetAllocationTags = targetAllocationTags;
+
+ this.schedulingRequest = new SchedulingRequestPBImpl(
+ ((SchedulingRequestPBImpl) newSchedulingRequest).getProto());
+
+ LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo
+ .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils
+ .join(",", targetAllocationTags) + "]. nodePartition="
+ + targetNodePartition);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map<String, ResourceRequest> getResourceRequests() {
+ return Collections.EMPTY_MAP;
+ }
+
+ @Override
+ public PendingAsk getPendingAsk(String resourceName) {
+ readLock.lock();
+ try {
+ if (resourceName.equals("*") && schedulingRequest != null) {
+ return new PendingAsk(schedulingRequest.getResourceSizing());
+ }
+ return PendingAsk.ZERO;
+ } finally {
+ readLock.unlock();
+ }
+
+ }
+
+ @Override
+ public int getOutstandingAsksCount(String resourceName) {
+ readLock.lock();
+ try {
+ if (resourceName.equals("*") && schedulingRequest != null) {
+ return schedulingRequest.getResourceSizing().getNumAllocations();
+ }
+ return 0;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void decreasePendingNumAllocation() {
+ // Deduct pending #allocations by 1
+ ResourceSizing sizing = schedulingRequest.getResourceSizing();
+ sizing.setNumAllocations(sizing.getNumAllocations() - 1);
+ }
+
+ @Override
+ public ContainerRequest allocate(SchedulerRequestKey schedulerKey,
+ NodeType type, SchedulerNode node) {
+ writeLock.lock();
+ try {
+ // Per container scheduling request, it is just a copy of existing
+ // scheduling request with #allocations=1
+ SchedulingRequest containerSchedulingRequest = new SchedulingRequestPBImpl(
+ ((SchedulingRequestPBImpl) schedulingRequest).getProto());
+ containerSchedulingRequest.getResourceSizing().setNumAllocations(1);
+
+ // Deduct sizing
+ decreasePendingNumAllocation();
+
+ return new ContainerRequest(containerSchedulingRequest);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private boolean checkCardinalityAndPending(SchedulerNode node) {
+ // Do we still have pending resource?
+ if (schedulingRequest.getResourceSizing().getNumAllocations() <= 0) {
+ return false;
+ }
+
+ // node type will be ignored.
+ try {
+ return PlacementConstraintsUtil.canSatisfySingleConstraint(
+ appSchedulingInfo.getApplicationId(),
+ this.schedulingRequest.getPlacementConstraint(), node,
+ allocationTagsManager);
+ } catch (InvalidAllocationTagsQueryException e) {
+ LOG.warn("Failed to query node cardinality:", e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canAllocate(NodeType type, SchedulerNode node) {
+ try {
+ readLock.lock();
+ return checkCardinalityAndPending(node);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean canDelayTo(String resourceName) {
+ return true;
+ }
+
+ @Override
+ public boolean precheckNode(SchedulerNode schedulerNode,
+ SchedulingMode schedulingMode) {
+ // We will only look at node label = nodeLabelToLookAt according to
+ // schedulingMode and partition of node.
+ String nodePartitionToLookAt;
+ if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+ nodePartitionToLookAt = schedulerNode.getPartition();
+ } else{
+ nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+ }
+
+ readLock.lock();
+ try {
+ // Check node partition as well as cardinality/pending resources.
+ return this.targetNodePartition.equals(nodePartitionToLookAt)
+ && checkCardinalityAndPending(schedulerNode);
+ } finally {
+ readLock.unlock();
+ }
+
+ }
+
+ @Override
+ public String getPrimaryRequestedNodePartition() {
+ return targetNodePartition;
+ }
+
+ @Override
+ public int getUniqueLocationAsks() {
+ return 1;
+ }
+
+ @Override
+ public void showRequests() {
+ try {
+ readLock.lock();
+ if (schedulingRequest != null) {
+ LOG.info(schedulingRequest.toString());
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ SchedulingRequest getSchedulingRequest() {
+ return schedulingRequest;
+ }
+
+ @VisibleForTesting
+ String getTargetNodePartition() {
+ return targetNodePartition;
+ }
+
+ @VisibleForTesting
+ Set<String> getTargetAllocationTags() {
+ return targetAllocationTags;
+ }
+
+ @Override
+ public void initialize(AppSchedulingInfo appSchedulingInfo,
+ SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
+ super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ this.allocationTagsManager = rmContext.getAllocationTagsManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index fbde681..7d1140d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -331,8 +331,7 @@ public class Application {
// Get resources from the ResourceManager
Allocation allocation = resourceManager.getResourceScheduler().allocate(
- applicationAttemptId, new ArrayList<ResourceRequest>(ask),
- new ArrayList<ContainerId>(), null, null,
+ applicationAttemptId, new ArrayList<ResourceRequest>(ask), null, new ArrayList<ContainerId>(), null, null,
new ContainerUpdates());
if (LOG.isInfoEnabled()) {
@@ -431,7 +430,7 @@ public class Application {
if (type == NodeType.NODE_LOCAL) {
for (String host : task.getHosts()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("updatePendingAsk:" + " application=" + applicationId
+ LOG.debug("updateResourceDemands:" + " application=" + applicationId
+ " type=" + type + " host=" + host
+ " request=" + ((requests == null) ? "null" : requests.get(host)));
}
@@ -442,7 +441,7 @@ public class Application {
if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) {
for (String rack : task.getRacks()) {
if(LOG.isDebugEnabled()) {
- LOG.debug("updatePendingAsk:" + " application=" + applicationId
+ LOG.debug("updateResourceDemands:" + " application=" + applicationId
+ " type=" + type + " rack=" + rack
+ " request=" + ((requests == null) ? "null" : requests.get(rack)));
}
@@ -453,7 +452,7 @@ public class Application {
updateResourceRequest(requests.get(ResourceRequest.ANY));
if(LOG.isDebugEnabled()) {
- LOG.debug("updatePendingAsk:" + " application=" + applicationId
+ LOG.debug("updateResourceDemands:" + " application=" + applicationId
+ " #asks=" + ask.size());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 975abe6..9fa2c40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -37,14 +38,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -281,6 +285,53 @@ public class MockAM {
}
return allocate(req);
}
+
+ public AllocateResponse allocate(List<ResourceRequest> resourceRequest,
+ List<SchedulingRequest> newSchedulingRequests, List<ContainerId> releases)
+ throws Exception {
+ final AllocateRequest req =
+ AllocateRequest.newInstance(0, 0F, resourceRequest,
+ releases, null);
+ if (newSchedulingRequests != null) {
+ addSchedulingRequest(newSchedulingRequests);
+ }
+ if (!schedulingRequests.isEmpty()) {
+ req.setSchedulingRequests(schedulingRequests);
+ schedulingRequests.clear();
+ }
+ return allocate(req);
+ }
+
+ public AllocateResponse allocateIntraAppAntiAffinity(
+ ResourceSizing resourceSizing, Priority priority, long allocationId,
+ Set<String> allocationTags, String... targetTags) throws Exception {
+ return this.allocate(null,
+ Arrays.asList(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(allocationId).priority(priority)
+ .allocationTags(allocationTags).placementConstraintExpression(
+ PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp(targetTags)).build())
+ .resourceSizing(resourceSizing).build()), null);
+ }
+
+ public AllocateResponse allocateIntraAppAntiAffinity(
+ String nodePartition, ResourceSizing resourceSizing, Priority priority,
+ long allocationId, String... tags) throws Exception {
+ return this.allocate(null,
+ Arrays.asList(SchedulingRequest.newBuilder().executionType(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+ .allocationRequestId(allocationId).priority(priority)
+ .placementConstraintExpression(PlacementConstraints
+ .targetCardinality(PlacementConstraints.NODE, 0, 1,
+ PlacementConstraints.PlacementTargets
+ .allocationTagToIntraApp(tags),
+ PlacementConstraints.PlacementTargets
+ .nodePartition(nodePartition)).build())
+ .resourceSizing(resourceSizing).build()), null);
+ }
public AllocateResponse sendContainerResizingRequest(
List<UpdateContainerRequest> updateRequests) throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 0e4f308..4a5c671 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -474,7 +474,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(expectedState, applicationAttempt.getAppAttemptState());
verify(scheduler, times(expectedAllocateCount)).allocate(
- any(ApplicationAttemptId.class), any(List.class), any(List.class),
+ any(ApplicationAttemptId.class), any(List.class), eq(null), any(List.class),
any(List.class), any(List.class), any(ContainerUpdates.class));
assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
@@ -495,7 +495,7 @@ public class TestRMAppAttemptTransitions {
// Check events
verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class),
- any(List.class), any(List.class), any(List.class), any(List.class),
+ any(List.class), any(List.class), any(List.class), any(List.class), any(List.class),
any(ContainerUpdates.class));
verify(nmTokenManager).clearNodeSetForAttempt(
applicationAttempt.getAppAttemptId());
@@ -643,7 +643,7 @@ public class TestRMAppAttemptTransitions {
when(allocation.getContainers()).
thenReturn(Collections.singletonList(container));
when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
- any(List.class), any(List.class), any(List.class),
+ any(List.class), any(List.class), any(List.class), any(List.class),
any(ContainerUpdates.class))).
thenReturn(allocation);
RMContainer rmContainer = mock(RMContainerImpl.class);
@@ -1161,7 +1161,7 @@ public class TestRMAppAttemptTransitions {
when(allocation.getContainers()).
thenReturn(Collections.singletonList(amContainer));
when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
- any(List.class), any(List.class), any(List.class),
+ any(List.class), any(List.class), any(List.class), any(List.class),
any(ContainerUpdates.class)))
.thenReturn(allocation);
RMContainer rmContainer = mock(RMContainerImpl.class);
@@ -1636,7 +1636,7 @@ public class TestRMAppAttemptTransitions {
public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() {
YarnScheduler mockScheduler = mock(YarnScheduler.class);
when(mockScheduler.allocate(any(ApplicationAttemptId.class),
- any(List.class), any(List.class), any(List.class), any(List.class),
+ any(List.class), any(List.class), any(List.class), any(List.class), any(List.class),
any(ContainerUpdates.class)))
.thenAnswer(new Answer<Allocation>() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index b927870..2bf6a21 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -420,9 +421,10 @@ public class TestRMContainerImpl {
when(rmContext.getYarnConfiguration()).thenReturn(conf);
/* First container: ALLOCATED -> KILLED */
- RMContainer rmContainer = new RMContainerImpl(container,
+ RMContainerImpl rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
+ rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
@@ -448,6 +450,7 @@ public class TestRMContainerImpl {
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+ rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.START));
@@ -468,6 +471,7 @@ public class TestRMContainerImpl {
rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), appAttemptId,
nodeId, "user", rmContext);
+ rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
Assert.assertEquals(0,
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index 3692b29..b7b0eb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -46,7 +46,7 @@ public class TestAppSchedulingInfo {
doReturn("test").when(queue).getQueueName();
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId,
"test", queue, null, 0, new ResourceUsage(),
- new HashMap<String, String>());
+ new HashMap<String, String>(), null);
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
new ArrayList<String>());
@@ -118,7 +118,7 @@ public class TestAppSchedulingInfo {
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
- new ResourceUsage(), new HashMap<String, String>());
+ new ResourceUsage(), new HashMap<>(), null);
Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
new file mode 100644
index 0000000..5cea3a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.junit.Assert;
+
+import java.util.Set;
+
+public class CapacitySchedulerTestBase {
+ protected final int GB = 1024;
+
+ protected static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ protected static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ protected static final String A1 = A + ".a1";
+ protected static final String A2 = A + ".a2";
+ protected static final String B1 = B + ".b1";
+ protected static final String B2 = B + ".b2";
+ protected static final String B3 = B + ".b3";
+ protected static float A_CAPACITY = 10.5f;
+ protected static float B_CAPACITY = 89.5f;
+ protected static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1";
+ protected static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2";
+ protected static final String X1 = P1 + ".x1";
+ protected static final String X2 = P1 + ".x2";
+ protected static final String Y1 = P2 + ".y1";
+ protected static final String Y2 = P2 + ".y2";
+ protected static float A1_CAPACITY = 30;
+ protected static float A2_CAPACITY = 70;
+ protected static float B1_CAPACITY = 79.2f;
+ protected static float B2_CAPACITY = 0.8f;
+ protected static float B3_CAPACITY = 20;
+
+
+ @SuppressWarnings("unchecked")
+ protected <E> Set<E> toSet(E... elements) {
+ Set<E> set = Sets.newHashSet(elements);
+ return set;
+ }
+
+ protected void checkPendingResource(MockRM rm, String queueName, int memory,
+ String label) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = cs.getQueue(queueName);
+ Assert.assertEquals(
+ memory,
+ queue.getQueueResourceUsage()
+ .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+ .getMemorySize());
+ }
+
+
+ protected void checkPendingResourceGreaterThanZero(MockRM rm, String queueName,
+ String label) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = cs.getQueue(queueName);
+ Assert.assertTrue(queue.getQueueResourceUsage()
+ .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+ .getMemorySize() > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 7628312..79898bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -103,7 +103,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -167,33 +166,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
-public class TestCapacityScheduler {
+public class TestCapacityScheduler extends CapacitySchedulerTestBase {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
- private final int GB = 1024;
private final static ContainerUpdates NULL_UPDATE_REQUESTS =
new ContainerUpdates();
-
- private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
- private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
- private static final String A1 = A + ".a1";
- private static final String A2 = A + ".a2";
- private static final String B1 = B + ".b1";
- private static final String B2 = B + ".b2";
- private static final String B3 = B + ".b3";
- private static float A_CAPACITY = 10.5f;
- private static float B_CAPACITY = 89.5f;
- private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1";
- private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2";
- private static final String X1 = P1 + ".x1";
- private static final String X2 = P1 + ".x2";
- private static final String Y1 = P2 + ".y1";
- private static final String Y2 = P2 + ".y2";
- private static float A1_CAPACITY = 30;
- private static float A2_CAPACITY = 70;
- private static float B1_CAPACITY = 79.2f;
- private static float B2_CAPACITY = 0.8f;
- private static float B3_CAPACITY = 20;
-
private ResourceManager resourceManager = null;
private RMContext mockContext;
@@ -1116,12 +1092,12 @@ public class TestCapacityScheduler {
cs.handle(addAttemptEvent);
// Verify the blacklist can be updated independent of requesting containers
- cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
+ cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
.isPlaceBlacklisted(host));
- cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
+ cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host), NULL_UPDATE_REQUESTS);
Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
@@ -1217,8 +1193,7 @@ public class TestCapacityScheduler {
//This will allocate for app1
cs.allocate(appAttemptId1,
- Collections.<ResourceRequest>singletonList(r1),
- Collections.<ContainerId>emptyList(),
+ Collections.<ResourceRequest>singletonList(r1), null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS);
//And this will result in container assignment for app1
@@ -1234,8 +1209,7 @@ public class TestCapacityScheduler {
//Now, allocate for app2 (this would be the first/AM allocation)
ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId2,
- Collections.<ResourceRequest>singletonList(r2),
- Collections.<ContainerId>emptyList(),
+ Collections.<ResourceRequest>singletonList(r2), null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS);
//In this case we do not perform container assignment because we want to
@@ -3481,12 +3455,6 @@ public class TestCapacityScheduler {
+ "queue-a's max capacity will be violated if container allocated");
}
- @SuppressWarnings("unchecked")
- private <E> Set<E> toSet(E... elements) {
- Set<E> set = Sets.newHashSet(elements);
- return set;
- }
-
@Test
public void testQueueHierarchyPendingResourceUpdate() throws Exception {
Configuration conf =
@@ -3618,26 +3586,6 @@ public class TestCapacityScheduler {
checkPendingResource(rm, "root", 0 * GB, "x");
}
- private void checkPendingResource(MockRM rm, String queueName, int memory,
- String label) {
- CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
- CSQueue queue = cs.getQueue(queueName);
- Assert.assertEquals(
- memory,
- queue.getQueueResourceUsage()
- .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
- .getMemorySize());
- }
-
- private void checkPendingResourceGreaterThanZero(MockRM rm, String queueName,
- String label) {
- CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
- CSQueue queue = cs.getQueue(queueName);
- Assert.assertTrue(queue.getQueueResourceUsage()
- .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
- .getMemorySize() > 0);
- }
-
// Test verifies AM Used resource for LeafQueue when AM ResourceRequest is
// lesser than minimumAllocation
@Test(timeout = 30000)
@@ -3707,7 +3655,7 @@ public class TestCapacityScheduler {
Allocation allocate =
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
- Collections.<ContainerId> emptyList(), null, null,
+ null, Collections.<ContainerId> emptyList(), null, null,
NULL_UPDATE_REQUESTS);
Assert.assertNotNull(attempt);
@@ -3724,7 +3672,7 @@ public class TestCapacityScheduler {
allocate =
cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
- Collections.<ContainerId> emptyList(), null, null,
+ null, Collections.<ContainerId> emptyList(), null, null,
NULL_UPDATE_REQUESTS);
// All resources should be sent as headroom
@@ -4250,8 +4198,7 @@ public class TestCapacityScheduler {
y1Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId3,
- Collections.<ResourceRequest>singletonList(y1Req),
- Collections.<ContainerId>emptyList(),
+ Collections.<ResourceRequest>singletonList(y1Req), null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
@@ -4264,8 +4211,7 @@ public class TestCapacityScheduler {
x1Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId1,
- Collections.<ResourceRequest>singletonList(x1Req),
- Collections.<ContainerId>emptyList(),
+ Collections.<ResourceRequest>singletonList(x1Req), null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
@@ -4277,8 +4223,7 @@ public class TestCapacityScheduler {
x2Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId2,
- Collections.<ResourceRequest>singletonList(x2Req),
- Collections.<ContainerId>emptyList(),
+ Collections.<ResourceRequest>singletonList(x2Req), null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
assertEquals("X2 Used Resource should be 0", 0,
@@ -4289,8 +4234,7 @@ public class TestCapacityScheduler {
x1Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId1,
- Collections.<ResourceRequest>singletonList(x1Req),
- Collections.<ContainerId>emptyList(),
+ Collections.<ResourceRequest>singletonList(x1Req), null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
@@ -4303,8 +4247,7 @@ public class TestCapacityScheduler {
y1Req = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId3,
- Collections.<ResourceRequest>singletonList(y1Req),
- Collections.<ContainerId>emptyList(),
+ Collections.<ResourceRequest>singletonList(y1Req), null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
@@ -4363,7 +4306,7 @@ public class TestCapacityScheduler {
ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
//This will allocate for app1
cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
- Collections.<ContainerId>emptyList(),
+ null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS).getContainers().size();
CapacityScheduler.schedule(cs);
ResourceRequest r2 = null;
@@ -4371,8 +4314,7 @@ public class TestCapacityScheduler {
r2 = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId2,
- Collections.<ResourceRequest>singletonList(r2),
- Collections.<ContainerId>emptyList(),
+ Collections.<ResourceRequest>singletonList(r2), null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
@@ -4385,12 +4327,12 @@ public class TestCapacityScheduler {
r2 = TestUtils.createResourceRequest(
ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
- Collections.<ContainerId>emptyList(),
+ null, Collections.<ContainerId>emptyList(),
null, null, NULL_UPDATE_REQUESTS).getContainers().size();
CapacityScheduler.schedule(cs);
cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2),
- Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
+ null, Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
//Check blocked Resource
assertEquals("A Used Resource should be 2 GB", 2 * GB,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b9dffa1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index eddf8c8..18cd942 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -106,7 +106,7 @@ public class TestCapacitySchedulerAsyncScheduling {
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
numThreads);
conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
- + ".scheduling-interval-ms", 100);
+ + ".scheduling-interval-ms", 0);
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org