You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:26 UTC
[09/50] [abbrv] hadoop git commit: YARN-7494. Add muti-node lookup
mechanism and pluggable nodes sorting policies to optimize placement
decision. Contributed by Sunil Govindan.
YARN-7494. Add muti-node lookup mechanism and pluggable nodes sorting policies to optimize placement decision. Contributed by Sunil Govindan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9c3fc3ef
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9c3fc3ef
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9c3fc3ef
Branch: refs/heads/YARN-3409
Commit: 9c3fc3ef2865164aa5f121793ac914cfeb21a181
Parents: 54d0bf8
Author: Weiwei Yang <ww...@apache.org>
Authored: Tue Aug 21 22:42:23 2018 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Tue Aug 21 22:42:28 2018 +0800
----------------------------------------------------------------------
.../resourcemanager/RMActiveServiceContext.java | 16 ++
.../yarn/server/resourcemanager/RMContext.java | 8 +-
.../server/resourcemanager/RMContextImpl.java | 14 +-
.../server/resourcemanager/ResourceManager.java | 12 ++
.../scheduler/AppSchedulingInfo.java | 11 +-
.../scheduler/ClusterNodeTracker.java | 61 +++++++
.../scheduler/activities/ActivitiesLogger.java | 32 ++--
.../scheduler/activities/ActivitiesManager.java | 8 +-
.../scheduler/capacity/AbstractCSQueue.java | 16 +-
.../scheduler/capacity/CSQueue.java | 6 +
.../scheduler/capacity/CapacityScheduler.java | 77 ++++++++-
.../CapacitySchedulerConfiguration.java | 116 +++++++++++++
.../scheduler/capacity/LeafQueue.java | 49 +++---
.../scheduler/capacity/ParentQueue.java | 4 +-
.../allocator/RegularContainerAllocator.java | 35 ++--
.../common/ApplicationSchedulingConfig.java | 4 +
.../scheduler/common/fica/FiCaSchedulerApp.java | 23 +++
.../LocalityAppPlacementAllocator.java | 34 +++-
.../placement/MultiNodeLookupPolicy.java | 67 ++++++++
.../placement/MultiNodePolicySpec.java | 56 +++++++
.../scheduler/placement/MultiNodeSorter.java | 167 +++++++++++++++++++
.../placement/MultiNodeSortingManager.java | 139 +++++++++++++++
.../ResourceUsageMultiNodeLookupPolicy.java | 79 +++++++++
.../reservation/ReservationSystemTestUtil.java | 3 +
.../scheduler/TestAppSchedulingInfo.java | 3 +-
.../capacity/CapacitySchedulerTestBase.java | 13 ++
.../capacity/TestCapacityScheduler.java | 15 --
.../TestCapacitySchedulerMultiNodes.java | 166 ++++++++++++++++++
.../TestCapacitySchedulerNodeLabelUpdate.java | 70 ++++++++
29 files changed, 1211 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 66065e3..8fb0de6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -43,9 +43,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -113,6 +115,7 @@ public class RMActiveServiceContext {
private AllocationTagsManager allocationTagsManager;
private PlacementConstraintManager placementConstraintManager;
private ResourceProfilesManager resourceProfilesManager;
+ private MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@@ -443,6 +446,19 @@ public class RMActiveServiceContext {
@Private
@Unstable
+ public MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager() {
+ return multiNodeSortingManager;
+ }
+
+ @Private
+ @Unstable
+ public void setMultiNodeSortingManager(
+ MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager) {
+ this.multiNodeSortingManager = multiNodeSortingManager;
+ }
+
+ @Private
+ @Unstable
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
this.schedulerRecoveryStartTime = systemClock.getTime();
this.schedulerRecoveryWaitTime = waitTime;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index eb91a31..a30ff76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -42,10 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -177,4 +178,9 @@ public interface RMContext extends ApplicationMasterServiceContext {
void setPlacementConstraintManager(
PlacementConstraintManager placementConstraintManager);
+
+ MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager();
+
+ void setMultiNodeSortingManager(
+ MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 84e0f6f..cb1d56f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -48,10 +48,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -538,6 +539,17 @@ public class RMContextImpl implements RMContext {
delegatedNodeLabelsUpdater);
}
+ @Override
+ public MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager() {
+ return activeServiceContext.getMultiNodeSortingManager();
+ }
+
+ @Override
+ public void setMultiNodeSortingManager(
+ MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager) {
+ activeServiceContext.setMultiNodeSortingManager(multiNodeSortingManager);
+ }
+
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d459f0e..bdda871 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -96,11 +96,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
@@ -546,6 +548,10 @@ public class ResourceManager extends CompositeService
return new FederationStateStoreService(rmContext);
}
+ protected MultiNodeSortingManager<SchedulerNode> createMultiNodeSortingManager() {
+ return new MultiNodeSortingManager<SchedulerNode>();
+ }
+
protected SystemMetricsPublisher createSystemMetricsPublisher() {
List<SystemMetricsPublisher> publishers =
new ArrayList<SystemMetricsPublisher>();
@@ -665,6 +671,12 @@ public class ResourceManager extends CompositeService
resourceProfilesManager.init(conf);
rmContext.setResourceProfilesManager(resourceProfilesManager);
+ MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager =
+ createMultiNodeSortingManager();
+ multiNodeSortingManager.setRMContext(rmContext);
+ addService(multiNodeSortingManager);
+ rmContext.setMultiNodeSortingManager(multiNodeSortingManager);
+
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index d63d2b82..ca7d9ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -93,7 +93,7 @@ public class AppSchedulingInfo {
private final ReentrantReadWriteLock.WriteLock writeLock;
public final ContainerUpdateContext updateContext;
- public final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
+ private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
private final RMContext rmContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
@@ -782,4 +782,13 @@ public class AppSchedulingInfo {
this.readLock.unlock();
}
}
+
+ /**
+ * Get scheduling envs configured for this application.
+ *
+ * @return a map of applicationSchedulingEnvs
+ */
+ public Map<String, String> getApplicationSchedulingEnvs() {
+ return applicationSchedulingEnvs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 66d8810..8c7e447 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -37,6 +37,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -57,6 +58,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
private HashMap<NodeId, N> nodes = new HashMap<>();
private Map<String, N> nodeNameToNodeMap = new HashMap<>();
private Map<String, List<N>> nodesPerRack = new HashMap<>();
+ private Map<String, List<N>> nodesPerLabel = new HashMap<>();
private Resource clusterCapacity = Resources.createResource(0, 0);
private volatile Resource staleClusterCapacity =
@@ -80,6 +82,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
nodes.put(node.getNodeID(), node);
nodeNameToNodeMap.put(node.getNodeName(), node);
+ List<N> nodesPerLabels = nodesPerLabel.get(node.getPartition());
+
+ if (nodesPerLabels == null) {
+ nodesPerLabels = new ArrayList<N>();
+ }
+ nodesPerLabels.add(node);
+
+ // Update new set of nodes for given partition.
+ nodesPerLabel.put(node.getPartition(), nodesPerLabels);
+
// Update nodes per rack as well
String rackName = node.getRackName();
List<N> nodesList = nodesPerRack.get(rackName);
@@ -174,6 +186,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
}
}
+ List<N> nodesPerPartition = nodesPerLabel.get(node.getPartition());
+ nodesPerPartition.remove(node);
+
+ // Update new set of nodes for given partition.
+ if (nodesPerPartition.isEmpty()) {
+ nodesPerLabel.remove(node.getPartition());
+ } else {
+ nodesPerLabel.put(node.getPartition(), nodesPerPartition);
+ }
+
// Update cluster capacity
Resources.subtractFrom(clusterCapacity, node.getTotalResource());
staleClusterCapacity = Resources.clone(clusterCapacity);
@@ -420,4 +442,43 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
}
return retNodes;
}
+
+ /**
+ * update cached nodes per partition on a node label change event.
+ * @param partition nodeLabel
+ * @param nodeIds List of Node IDs
+ */
+ public void updateNodesPerPartition(String partition, Set<NodeId> nodeIds) {
+ writeLock.lock();
+ try {
+ // Clear all entries.
+ nodesPerLabel.remove(partition);
+
+ List<N> nodesPerPartition = new ArrayList<N>();
+ for (NodeId nodeId : nodeIds) {
+ N n = getNode(nodeId);
+ if (n != null) {
+ nodesPerPartition.add(n);
+ }
+ }
+
+ // Update new set of nodes for given partition.
+ nodesPerLabel.put(partition, nodesPerPartition);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public List<N> getNodesPerPartition(String partition) {
+ List<N> nodesPerPartition = null;
+ readLock.lock();
+ try {
+ if (nodesPerLabel.containsKey(partition)) {
+ nodesPerPartition = new ArrayList<N>(nodesPerLabel.get(partition));
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return nodesPerPartition;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
index 0c351b6..8a3ffce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -21,13 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
/**
* Utility for logging scheduler activities
@@ -63,7 +63,7 @@ public class ActivitiesLogger {
SchedulerApplicationAttempt application, Priority priority,
String diagnostic) {
String type = "app";
- if (activitiesManager == null) {
+ if (node == null || activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@@ -84,18 +84,18 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic, ActivityState appState) {
- if (activitiesManager == null) {
+ if (node == null || activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
String type = "container";
// Add application-container activity into specific node allocation.
- activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ activitiesManager.addSchedulingActivityForNode(node,
application.getApplicationId().toString(), null,
priority.toString(), ActivityState.SKIPPED, diagnostic, type);
type = "app";
// Add queue-application activity into specific node allocation.
- activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ activitiesManager.addSchedulingActivityForNode(node,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.SKIPPED,
@@ -121,20 +121,20 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, RMContainer updatedContainer,
ActivityState activityState) {
- if (activitiesManager == null) {
+ if (node == null || activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
String type = "container";
// Add application-container activity into specific node allocation.
- activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ activitiesManager.addSchedulingActivityForNode(node,
application.getApplicationId().toString(),
updatedContainer.getContainer().toString(),
updatedContainer.getContainer().getPriority().toString(),
activityState, ActivityDiagnosticConstant.EMPTY, type);
type = "app";
// Add queue-application activity into specific node allocation.
- activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+ activitiesManager.addSchedulingActivityForNode(node,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.ACCEPTED,
@@ -157,13 +157,15 @@ public class ActivitiesLogger {
* update.
*/
public static void startAppAllocationRecording(
- ActivitiesManager activitiesManager, NodeId nodeId, long currentTime,
+ ActivitiesManager activitiesManager, FiCaSchedulerNode node,
+ long currentTime,
SchedulerApplicationAttempt application) {
- if (activitiesManager == null) {
+ if (node == null || activitiesManager == null) {
return;
}
- activitiesManager.startAppAllocationRecording(nodeId, currentTime,
- application);
+ activitiesManager
+ .startAppAllocationRecording(node.getNodeID(), currentTime,
+ application);
}
/*
@@ -208,7 +210,7 @@ public class ActivitiesLogger {
public static void recordQueueActivity(ActivitiesManager activitiesManager,
SchedulerNode node, String parentQueueName, String queueName,
ActivityState state, String diagnostic) {
- if (activitiesManager == null) {
+ if (node == null || activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@@ -240,7 +242,7 @@ public class ActivitiesLogger {
public static void finishAllocatedNodeAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
ContainerId containerId, AllocationState containerState) {
- if (activitiesManager == null) {
+ if (node == null || activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@@ -277,7 +279,7 @@ public class ActivitiesLogger {
SchedulerNode node, String parentName, String childName,
Priority priority, ActivityState state, String diagnostic, String type) {
- activitiesManager.addSchedulingActivityForNode(node.getNodeID(), parentName,
+ activitiesManager.addSchedulingActivityForNode(node, parentName,
childName, priority != null ? priority.toString() : null, state,
diagnostic, type);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index 8498c40..5d96b17 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -197,11 +198,12 @@ public class ActivitiesManager extends AbstractService {
}
// Add queue, application or container activity into specific node allocation.
- void addSchedulingActivityForNode(NodeId nodeID, String parentName,
+ void addSchedulingActivityForNode(SchedulerNode node, String parentName,
String childName, String priority, ActivityState state, String diagnostic,
String type) {
- if (shouldRecordThisNode(nodeID)) {
- NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
+ if (shouldRecordThisNode(node.getNodeID())) {
+ NodeAllocation nodeAllocation = getCurrentNodeAllocation(
+ node.getNodeID());
nodeAllocation.addAllocationActivity(parentName, childName, priority,
state, diagnostic, type);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 9c3e98f..2c9f9a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -92,7 +92,8 @@ public abstract class AbstractCSQueue implements CSQueue {
Set<String> resourceTypes;
final RMNodeLabelsManager labelManager;
String defaultLabelExpression;
-
+ private String multiNodeSortingPolicyName = null;
+
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
volatile boolean reservationsContinueLooking;
@@ -414,6 +415,10 @@ public abstract class AbstractCSQueue implements CSQueue {
this.priority = configuration.getQueuePriority(
getQueuePath());
+ // Update multi-node sorting algorithm for scheduling as configured.
+ setMultiNodeSortingPolicyName(
+ configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));
+
this.userWeights = getUserWeightsFromHierarchy(configuration);
} finally {
writeLock.unlock();
@@ -1259,4 +1264,13 @@ public abstract class AbstractCSQueue implements CSQueue {
this.writeLock.unlock();
}
}
+
+ @Override
+ public String getMultiNodeSortingPolicyName() {
+ return this.multiNodeSortingPolicyName;
+ }
+
+ public void setMultiNodeSortingPolicyName(String policyName) {
+ this.multiNodeSortingPolicyName = policyName;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 3963dc0..c0c280e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -430,4 +430,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* @return effective max queue capacity
*/
Resource getEffectiveMaxCapacityDown(String label, Resource factor);
+
+ /**
+ * Get Multi Node scheduling policy name.
+ * @return policy name
+ */
+ String getMultiNodeSortingPolicyName();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 0b7fe92..dec1301 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -251,6 +252,7 @@ public class CapacityScheduler extends
private ResourceCommitterService resourceCommitterService;
private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
+ private boolean multiNodePlacementEnabled;
private static boolean printedVerboseLoggingForAsyncScheduling = false;
@@ -391,12 +393,23 @@ public class CapacityScheduler extends
// Setup how many containers we can allocate for each round
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
+ // Register CS specific multi-node policies to common MultiNodeManager
+ // which will add to a MultiNodeSorter which gives a pre-sorted list of
+ // nodes to scheduler's allocation.
+ multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
+ if(rmContext.getMultiNodeSortingManager() != null) {
+ rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
+ multiNodePlacementEnabled,
+ this.conf.getMultiNodePlacementPolicies());
+ }
+
LOG.info("Initialized CapacityScheduler with " + "calculator="
+ getResourceCalculator().getClass() + ", " + "minimumAllocation=<"
+ getMinimumResourceCapability() + ">, " + "maximumAllocation=<"
+ getMaximumResourceCapability() + ">, " + "asynchronousScheduling="
+ scheduleAsynchronously + ", " + "asyncScheduleInterval="
- + asyncScheduleInterval + "ms");
+ + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
+ + multiNodePlacementEnabled);
} finally {
writeLock.unlock();
}
@@ -1373,18 +1386,23 @@ public class CapacityScheduler extends
assignment.getAssignmentInformation().getAllocationDetails();
List<AssignmentInformation.AssignmentDetails> reservations =
assignment.getAssignmentInformation().getReservationDetails();
+ // Get nodeId from allocated container if incoming argument is null.
+ NodeId updatedNodeid = (nodeId == null)
+ ? allocations.get(allocations.size() - 1).rmContainer.getNodeId()
+ : nodeId;
+
if (!allocations.isEmpty()) {
ContainerId allocatedContainerId =
allocations.get(allocations.size() - 1).containerId;
String allocatedQueue = allocations.get(allocations.size() - 1).queue;
- schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId,
+ schedulerHealth.updateAllocation(now, updatedNodeid, allocatedContainerId,
allocatedQueue);
}
if (!reservations.isEmpty()) {
ContainerId reservedContainerId =
reservations.get(reservations.size() - 1).containerId;
String reservedQueue = reservations.get(reservations.size() - 1).queue;
- schedulerHealth.updateReservation(now, nodeId, reservedContainerId,
+ schedulerHealth.updateReservation(now, updatedNodeid, reservedContainerId,
reservedQueue);
}
schedulerHealth.updateSchedulerReservationCounts(assignment
@@ -1421,6 +1439,23 @@ public class CapacityScheduler extends
|| assignedContainers < maxAssignPerHeartbeat);
}
+ private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
+ FiCaSchedulerNode node) {
+ CandidateNodeSet<FiCaSchedulerNode> candidates = null;
+ candidates = new SimpleCandidateNodeSet<>(node);
+ if (multiNodePlacementEnabled) {
+ Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
+ List<FiCaSchedulerNode> nodes = nodeTracker
+ .getNodesPerPartition(node.getPartition());
+ if (nodes != null && !nodes.isEmpty()) {
+ nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+ candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
+ nodesByPartition, node.getPartition());
+ }
+ }
+ return candidates;
+ }
+
/**
* We need to make sure when doing allocation, Node should be existed
* And we will construct a {@link CandidateNodeSet} before proceeding
@@ -1432,8 +1467,8 @@ public class CapacityScheduler extends
int offswitchCount = 0;
int assignedContainers = 0;
- CandidateNodeSet<FiCaSchedulerNode> candidates =
- new SimpleCandidateNodeSet<>(node);
+ CandidateNodeSet<FiCaSchedulerNode> candidates = getCandidateNodeSet(
+ node);
CSAssignment assignment = allocateContainersToNode(candidates,
withNodeHeartbeat);
// Only check if we can allocate more container on the same node when
@@ -1599,10 +1634,13 @@ public class CapacityScheduler extends
if (Resources.greaterThan(calculator, getClusterResource(),
assignment.getResource(), Resources.none())) {
+ FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
+ NodeId nodeId = null;
+ if (node != null) {
+ nodeId = node.getNodeID();
+ }
if (withNodeHeartbeat) {
- updateSchedulerHealth(lastNodeUpdateTime,
- CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(),
- assignment);
+ updateSchedulerHealth(lastNodeUpdateTime, nodeId, assignment);
}
return assignment;
}
@@ -1681,7 +1719,7 @@ public class CapacityScheduler extends
// We have two different logics to handle allocation on single node / multi
// nodes.
CSAssignment assignment;
- if (null != node) {
+ if (!multiNodePlacementEnabled) {
assignment = allocateContainerOnSingleNode(candidates,
node, withNodeHeartbeat);
} else{
@@ -1869,12 +1907,21 @@ public class CapacityScheduler extends
NodeLabelsUpdateSchedulerEvent labelUpdateEvent) {
try {
writeLock.lock();
+ Set<String> updateLabels = new HashSet<String>();
for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
.getUpdatedNodeToLabels().entrySet()) {
NodeId id = entry.getKey();
Set<String> labels = entry.getValue();
+ FiCaSchedulerNode node = nodeTracker.getNode(id);
+
+ if (node != null) {
+ // Update old partition to list.
+ updateLabels.add(node.getPartition());
+ }
updateLabelsOnNode(id, labels);
+ updateLabels.addAll(labels);
}
+ refreshLabelToNodeCache(updateLabels);
Resource clusterResource = getClusterResource();
getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
@@ -1883,6 +1930,18 @@ public class CapacityScheduler extends
}
}
+ private void refreshLabelToNodeCache(Set<String> updateLabels) {
+ Map<String, Set<NodeId>> labelMapping = labelManager
+ .getLabelsToNodes(updateLabels);
+ for (String label : updateLabels) {
+ Set<NodeId> nodes = labelMapping.get(label);
+ if (nodes == null) {
+ continue;
+ }
+ nodeTracker.updateNodesPerPartition(label, nodes);
+ }
+ }
+
private void addNode(RMNode nodeManager) {
try {
writeLock.lock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index e8de096..b937ae7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@@ -2129,4 +2131,118 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
break;
}
}
+
+ @Private public static final String MULTI_NODE_SORTING_POLICIES =
+ PREFIX + "multi-node-sorting.policy.names";
+
+ @Private public static final String MULTI_NODE_SORTING_POLICY_NAME =
+ PREFIX + "multi-node-sorting.policy";
+
+ /**
+ * resource usage based node sorting algorithm.
+ */
+ public static final String DEFAULT_NODE_SORTING_POLICY = "default";
+ public static final String DEFAULT_NODE_SORTING_POLICY_CLASSNAME
+ = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
+ public static final long DEFAULT_MULTI_NODE_SORTING_INTERVAL = 1000L;
+
+ @Private
+ public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX
+ + "multi-node-placement-enabled";
+
+ @Private
+ public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false;
+
+ public String getMultiNodesSortingAlgorithmPolicy(
+ String queue) {
+
+ String policyName = get(
+ getQueuePrefix(queue) + "multi-node-sorting.policy");
+
+ if (policyName == null) {
+ policyName = get(MULTI_NODE_SORTING_POLICY_NAME);
+ }
+
+ // If node sorting policy is not configured in queue and in cluster level,
+ // it is been assumed that this queue is not enabled with multi-node lookup.
+ if (policyName == null || policyName.isEmpty()) {
+ return null;
+ }
+
+ String policyClassName = get(MULTI_NODE_SORTING_POLICY_NAME + DOT
+ + policyName.trim() + DOT + "class");
+
+ if (policyClassName == null || policyClassName.isEmpty()) {
+ throw new YarnRuntimeException(
+ policyName.trim() + " Class is not configured or not an instance of "
+ + MultiNodeLookupPolicy.class.getCanonicalName());
+ }
+
+ return normalizePolicyName(policyClassName.trim());
+ }
+
+ public boolean getMultiNodePlacementEnabled() {
+ return getBoolean(MULTI_NODE_PLACEMENT_ENABLED,
+ DEFAULT_MULTI_NODE_PLACEMENT_ENABLED);
+ }
+
+ public Set<MultiNodePolicySpec> getMultiNodePlacementPolicies() {
+ String[] policies = getTrimmedStrings(MULTI_NODE_SORTING_POLICIES);
+
+ // In other cases, split the accessibleLabelStr by ","
+ Set<MultiNodePolicySpec> set = new HashSet<MultiNodePolicySpec>();
+ for (String str : policies) {
+ if (!str.trim().isEmpty()) {
+ String policyClassName = get(
+ MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class");
+ if (str.trim().equals(DEFAULT_NODE_SORTING_POLICY)) {
+ policyClassName = get(
+ MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class",
+ DEFAULT_NODE_SORTING_POLICY_CLASSNAME);
+ }
+
+ // This check is needed as default class name is loaded only for
+ // DEFAULT_NODE_SORTING_POLICY.
+ if (policyClassName == null) {
+ throw new YarnRuntimeException(
+ str.trim() + " Class is not configured or not an instance of "
+ + MultiNodeLookupPolicy.class.getCanonicalName());
+ }
+ policyClassName = normalizePolicyName(policyClassName.trim());
+ long policySortingInterval = getLong(
+ MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim()
+ + DOT + "sorting-interval.ms",
+ DEFAULT_MULTI_NODE_SORTING_INTERVAL);
+ if (policySortingInterval < 0) {
+ throw new YarnRuntimeException(
+ str.trim()
+ + " multi-node policy is configured with invalid"
+ + " sorting-interval:" + policySortingInterval);
+ }
+ set.add(
+ new MultiNodePolicySpec(policyClassName, policySortingInterval));
+ }
+ }
+
+ return Collections.unmodifiableSet(set);
+ }
+
+ private String normalizePolicyName(String policyName) {
+
+ // Ensure that custom node sorting algorithm class is valid.
+ try {
+ Class<?> nodeSortingPolicyClazz = getClassByName(policyName);
+ if (MultiNodeLookupPolicy.class
+ .isAssignableFrom(nodeSortingPolicyClazz)) {
+ return policyName;
+ } else {
+ throw new YarnRuntimeException(
+ "Class: " + policyName + " not instance of "
+ + MultiNodeLookupPolicy.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate " + "NodesSortingPolicy: " + policyName, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 366bad0..ffe862f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -53,10 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
@@ -1036,23 +1032,24 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment allocateFromReservedContainer(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
- FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
- if (null == node) {
- return null;
- }
-
- RMContainer reservedContainer = node.getReservedContainer();
- if (reservedContainer != null) {
- FiCaSchedulerApp application = getApplication(
- reservedContainer.getApplicationAttemptId());
-
- if (null != application) {
- ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
- node.getNodeID(), SystemClock.getInstance().getTime(), application);
- CSAssignment assignment = application.assignContainers(clusterResource,
- candidates, currentResourceLimits, schedulingMode,
- reservedContainer);
- return assignment;
+ // Considering multi-node scheduling, its better to iterate through
+ // all candidates and stop once we get atleast one good node to allocate
+ // where reservation was made earlier. In normal case, there is only one
+ // node and hence there wont be any impact after this change.
+ for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
+ RMContainer reservedContainer = node.getReservedContainer();
+ if (reservedContainer != null) {
+ FiCaSchedulerApp application = getApplication(
+ reservedContainer.getApplicationAttemptId());
+
+ if (null != application) {
+ ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+ node, SystemClock.getInstance().getTime(), application);
+ CSAssignment assignment = application.assignContainers(
+ clusterResource, candidates, currentResourceLimits,
+ schedulingMode, reservedContainer);
+ return assignment;
+ }
}
}
@@ -1114,13 +1111,14 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerApp application = assignmentIterator.next();
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
- node.getNodeID(), SystemClock.getInstance().getTime(), application);
+ node, SystemClock.getInstance().getTime(), application);
// Check queue max-capacity limit
Resource appReserved = application.getCurrentReservation();
if (needAssignToQueueCheck) {
- if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
- currentResourceLimits, appReserved, schedulingMode)) {
+ if (!super.canAssignToThisQueue(clusterResource,
+ candidates.getPartition(), currentResourceLimits, appReserved,
+ schedulingMode)) {
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
@@ -1155,7 +1153,8 @@ public class LeafQueue extends AbstractCSQueue {
userAssignable = false;
} else {
userAssignable = canAssignToUser(clusterResource, application.getUser(),
- userLimit, application, node.getPartition(), currentResourceLimits);
+ userLimit, application, candidates.getPartition(),
+ currentResourceLimits);
if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
cul.canAssign = false;
cul.reservation = appReserved;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 2363b88..80549ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -553,8 +553,8 @@ public class ParentQueue extends AbstractCSQueue {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.REJECTED,
- ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
- .getPartition());
+ ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION
+ + candidates.getPartition());
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index a843002..3e337ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -96,11 +96,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
* headroom, etc.
*/
private ContainerAllocation preCheckForNodeCandidateSet(
- Resource clusterResource, CandidateNodeSet<FiCaSchedulerNode> candidates,
+ Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey.getPriority();
- FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
ResourceRequest.ANY);
@@ -164,7 +163,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
if (!checkHeadroom(clusterResource, resourceLimits, required,
- candidates.getPartition())) {
+ node.getPartition())) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot allocate required resource=" + required
+ " because of headroom");
@@ -801,20 +800,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Do checks before determining which node to allocate
// Directly return if this check fails.
ContainerAllocation result;
- if (reservedContainer == null) {
- result = preCheckForNodeCandidateSet(clusterResource, candidates,
- schedulingMode, resourceLimits, schedulerKey);
- if (null != result) {
- return result;
- }
- } else {
- // pre-check when allocating reserved container
- if (application.getOutstandingAsksCount(schedulerKey) == 0) {
- // Release
- return new ContainerAllocation(reservedContainer, null,
- AllocationState.QUEUE_SKIPPED);
- }
- }
AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
application.getAppSchedulingInfo().getAppPlacementAllocator(
@@ -833,6 +818,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
while (iter.hasNext()) {
FiCaSchedulerNode node = iter.next();
+ if (reservedContainer == null) {
+ result = preCheckForNodeCandidateSet(clusterResource, node,
+ schedulingMode, resourceLimits, schedulerKey);
+ if (null != result) {
+ continue;
+ }
+ } else {
+ // pre-check when allocating reserved container
+ if (application.getOutstandingAsksCount(schedulerKey) == 0) {
+ // Release
+ result = new ContainerAllocation(reservedContainer, null,
+ AllocationState.QUEUE_SKIPPED);
+ continue;
+ }
+ }
+
result = tryAllocateOnNode(clusterResource, node, schedulingMode,
resourceLimits, schedulerKey, reservedContainer);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
index 1bd3743..06f74de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
@@ -32,4 +32,8 @@ public class ApplicationSchedulingConfig {
@InterfaceAudience.Private
public static final Class<? extends AppPlacementAllocator>
DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class;
+
+ @InterfaceAudience.Private
+ public static final String ENV_MULTI_NODE_SORTING_POLICY_CLASS =
+ "MULTI_NODE_SORTING_POLICY_CLASS";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 6a5af81..4bfdae9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCap
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
@@ -170,10 +171,32 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
rc = scheduler.getResourceCalculator();
}
+ // Update multi-node sorting algorithm to scheduler envs
+ updateMultiNodeSortingPolicy(rmApp);
+
containerAllocator = new ContainerAllocator(this, rc, rmContext,
activitiesManager);
}
+ private void updateMultiNodeSortingPolicy(RMApp rmApp) {
+ if (rmApp == null) {
+ return;
+ }
+
+ String queueName = null;
+ if (scheduler instanceof CapacityScheduler) {
+ queueName = getCSLeafQueue().getMultiNodeSortingPolicyName();
+ }
+
+ if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey(
+ ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS)
+ && queueName != null) {
+ appSchedulingInfo.getApplicationSchedulingEnvs().put(
+ ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS,
+ queueName);
+ }
+ }
+
public boolean containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event,
String partition) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index f1df343..9d30e90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -24,11 +24,14 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -55,6 +58,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
new ConcurrentHashMap<>();
private volatile String primaryRequestedPartition =
RMNodeLabelsManager.NO_LABEL;
+ private MultiNodeSortingManager<N> multiNodeSortingManager = null;
+ private String multiNodeSortPolicyName;
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
@@ -65,6 +70,26 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
writeLock = lock.writeLock();
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public void initialize(AppSchedulingInfo appSchedulingInfo,
+ SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
+ super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+ multiNodeSortPolicyName = appSchedulingInfo
+ .getApplicationSchedulingEnvs().get(
+ ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS);
+ multiNodeSortingManager = (MultiNodeSortingManager<N>) rmContext
+ .getMultiNodeSortingManager();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "nodeLookupPolicy used for " + appSchedulingInfo
+ .getApplicationId()
+ + " is " + ((multiNodeSortPolicyName != null) ?
+ multiNodeSortPolicyName :
+ ""));
+ }
+ }
+
@Override
@SuppressWarnings("unchecked")
public Iterator<N> getPreferredNodeIterator(
@@ -74,11 +99,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
// in.
N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
- if (null != singleNode) {
+ if (singleNode != null) {
return IteratorUtils.singletonIterator(singleNode);
}
- return IteratorUtils.emptyIterator();
+ // singleNode will be null if Multi-node placement lookup is enabled, and
+ // hence could consider sorting policies.
+ return multiNodeSortingManager.getMultiNodeSortIterator(
+ candidateNodeSet.getAllNodes().values(),
+ candidateNodeSet.getPartition(),
+ multiNodeSortPolicyName);
}
private boolean hasRequestLabelChanged(ResourceRequest requestOne,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java
new file mode 100644
index 0000000..662e34d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * <p>
+ * This class has the following functionality.
+ *
+ * <p>
+ * Provide an interface for MultiNodeLookupPolicy so that different placement
+ * allocator can choose nodes based on need.
+ * </p>
+ */
+public interface MultiNodeLookupPolicy<N extends SchedulerNode> {
+ /**
+ * Get iterator of preferred node depends on requirement and/or availability.
+ *
+ * @param nodes
+ * List of Nodes
+ * @param partition
+ * node label
+ *
+ * @return iterator of preferred node
+ */
+ Iterator<N> getPreferredNodeIterator(Collection<N> nodes, String partition);
+
+ /**
+ * Refresh working nodes set for re-ordering based on the algorithm selected.
+ *
+ * @param nodes
+ * a collection working nm's.
+ */
+ void addAndRefreshNodesSet(Collection<N> nodes, String partition);
+
+ /**
+ * Get sorted nodes per partition.
+ *
+ * @param partition
+ * node label
+ *
+ * @return collection of sorted nodes
+ */
+ Set<N> getNodesPerPartition(String partition);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java
new file mode 100644
index 0000000..8386d78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+/**
+ * MultiNodePolicySpec contains policyName and timeout.
+ */
+public class MultiNodePolicySpec {
+
+ private String policyName;
+ private long sortingInterval;
+
+ public MultiNodePolicySpec(String policyName, long timeout) {
+ this.setSortingInterval(timeout);
+ this.setPolicyName(policyName);
+ }
+
+ public long getSortingInterval() {
+ return sortingInterval;
+ }
+
+ public void setSortingInterval(long timeout) {
+ this.sortingInterval = timeout;
+ }
+
+ public String getPolicyName() {
+ return policyName;
+ }
+
+ public void setPolicyName(String policyName) {
+ this.policyName = policyName;
+ }
+
+ @Override
+ public String toString() {
+ return "MultiNodePolicySpec {" +
+ "policyName='" + policyName + '\'' +
+ ", sortingInterval=" + sortingInterval +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
new file mode 100644
index 0000000..7e27c34
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Common node sorting class which will do sorting based on policy spec.
+ * @param <N> extends SchedulerNode.
+ */
+public class MultiNodeSorter<N extends SchedulerNode> extends AbstractService {
+
+ private MultiNodeLookupPolicy<N> multiNodePolicy;
+ private static final Log LOG = LogFactory.getLog(MultiNodeSorter.class);
+
+ // ScheduledExecutorService which schedules the PreemptionChecker to run
+ // periodically.
+ private ScheduledExecutorService ses;
+ private ScheduledFuture<?> handler;
+ private volatile boolean stopped;
+ private RMContext rmContext;
+ private MultiNodePolicySpec policySpec;
+
+ public MultiNodeSorter(RMContext rmContext,
+ MultiNodePolicySpec policy) {
+ super("MultiNodeLookupPolicy");
+ this.rmContext = rmContext;
+ this.policySpec = policy;
+ }
+
+ @VisibleForTesting
+ public synchronized MultiNodeLookupPolicy<N> getMultiNodeLookupPolicy() {
+ return multiNodePolicy;
+ }
+
+ public void serviceInit(Configuration conf) throws Exception {
+ LOG.info("Initializing MultiNodeSorter=" + policySpec.getPolicyName()
+ + ", with sorting interval=" + policySpec.getSortingInterval());
+ initPolicy(policySpec.getPolicyName());
+ super.serviceInit(conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ void initPolicy(String policyName) throws YarnException {
+ Class<?> policyClass;
+ try {
+ policyClass = Class.forName(policyName);
+ } catch (ClassNotFoundException e) {
+ throw new YarnException(
+ "Invalid policy name:" + policyName + e.getMessage());
+ }
+ this.multiNodePolicy = (MultiNodeLookupPolicy<N>) ReflectionUtils
+ .newInstance(policyClass, null);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ LOG.info("Starting SchedulingMonitor=" + getName());
+ assert !stopped : "starting when already stopped";
+ ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName(getName());
+ return t;
+ }
+ });
+
+ // Start sorter thread only if sorting interval is a +ve value.
+ if(policySpec.getSortingInterval() != 0) {
+ handler = ses.scheduleAtFixedRate(new SortingThread(),
+ 0, policySpec.getSortingInterval(), TimeUnit.MILLISECONDS);
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ stopped = true;
+ if (handler != null) {
+ LOG.info("Stop " + getName());
+ handler.cancel(true);
+ ses.shutdown();
+ }
+ super.serviceStop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @VisibleForTesting
+ public void reSortClusterNodes() {
+ Set<String> nodeLabels = new HashSet<>();
+ nodeLabels
+ .addAll(rmContext.getNodeLabelManager().getClusterNodeLabelNames());
+ nodeLabels.add(RMNodeLabelsManager.NO_LABEL);
+ for (String label : nodeLabels) {
+ Map<NodeId, SchedulerNode> nodesByPartition = new HashMap<>();
+ List<SchedulerNode> nodes = ((AbstractYarnScheduler) rmContext
+ .getScheduler()).getNodeTracker().getNodesPerPartition(label);
+ if (nodes != null && !nodes.isEmpty()) {
+ nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+ multiNodePolicy.addAndRefreshNodesSet(
+ (Collection<N>) nodesByPartition.values(), label);
+ }
+ }
+ }
+
+ private class SortingThread implements Runnable {
+ @Override
+ public void run() {
+ try {
+ reSortClusterNodes();
+ } catch (Throwable t) {
+ // The preemption monitor does not alter structures nor do structures
+ // persist across invocations. Therefore, log, skip, and retry.
+ LOG.error("Exception raised while executing multinode"
+ + " sorter, skip this run..., exception=", t);
+ }
+ }
+ }
+
+ /**
+ * Verify whether sorter thread is running or not.
+ *
+ * @return true if sorter thread is running, false otherwise.
+ */
+ public boolean isSorterThreadRunning() {
+ return (handler != null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
new file mode 100644
index 0000000..e872317
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * Node Sorting Manager which runs all sorter threads and policies.
+ * @param <N> extends SchedulerNode
+ */
+public class MultiNodeSortingManager<N extends SchedulerNode>
+ extends AbstractService {
+
+ private static final Log LOG = LogFactory
+ .getLog(MultiNodeSortingManager.class);
+
+ private RMContext rmContext;
+ private Map<String, MultiNodeSorter<N>> runningMultiNodeSorters;
+ private Set<MultiNodePolicySpec> policySpecs = new HashSet<MultiNodePolicySpec>();
+ private Configuration conf;
+ private boolean multiNodePlacementEnabled;
+
+ public MultiNodeSortingManager() {
+ super("MultiNodeSortingManager");
+ this.runningMultiNodeSorters = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void serviceInit(Configuration configuration) throws Exception {
+ LOG.info("Initializing NodeSortingService=" + getName());
+ super.serviceInit(configuration);
+ this.conf = configuration;
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ LOG.info("Starting NodeSortingService=" + getName());
+ createAllPolicies();
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ for (MultiNodeSorter<N> sorter : runningMultiNodeSorters.values()) {
+ sorter.stop();
+ }
+ super.serviceStop();
+ }
+
+ private void createAllPolicies() {
+ if (!multiNodePlacementEnabled) {
+ return;
+ }
+ for (MultiNodePolicySpec policy : policySpecs) {
+ MultiNodeSorter<N> mon = new MultiNodeSorter<N>(rmContext, policy);
+ mon.init(conf);
+ mon.start();
+ runningMultiNodeSorters.put(policy.getPolicyName(), mon);
+ }
+ }
+
+ public MultiNodeSorter<N> getMultiNodePolicy(String name) {
+ return runningMultiNodeSorters.get(name);
+ }
+
+ public void setRMContext(RMContext context) {
+ this.rmContext = context;
+ }
+
+ public void registerMultiNodePolicyNames(
+ boolean isMultiNodePlacementEnabled,
+ Set<MultiNodePolicySpec> multiNodePlacementPolicies) {
+ this.policySpecs.addAll(multiNodePlacementPolicies);
+ this.multiNodePlacementEnabled = isMultiNodePlacementEnabled;
+ LOG.info("MultiNode scheduling is '" + multiNodePlacementEnabled +
+ "', and configured policies are " + StringUtils
+ .join(policySpecs.iterator(), ","));
+ }
+
+ public Iterator<N> getMultiNodeSortIterator(Collection<N> nodes,
+ String partition, String policyName) {
+ // nodeLookupPolicy can be null if app is configured with invalid policy.
+ // in such cases, use the the first node.
+ if(policyName == null) {
+ LOG.warn("Multi Node scheduling is enabled, however invalid class is"
+ + " configured. Valid sorting policy has to be configured in"
+ + " yarn.scheduler.capacity.<queue>.multi-node-sorting.policy");
+ return IteratorUtils.singletonIterator(
+ nodes.iterator().next());
+ }
+
+ MultiNodeSorter multiNodeSorter = getMultiNodePolicy(policyName);
+ if (multiNodeSorter == null) {
+ LOG.warn(
+ "MultiNode policy '" + policyName + "' is configured, however " +
+ "yarn.scheduler.capacity.multi-node-placement-enabled is false");
+ return IteratorUtils.singletonIterator(
+ nodes.iterator().next());
+ }
+
+ MultiNodeLookupPolicy<N> policy = multiNodeSorter
+ .getMultiNodeLookupPolicy();
+ // If sorter thread is not running, refresh node set.
+ if (!multiNodeSorter.isSorterThreadRunning()) {
+ policy.addAndRefreshNodesSet(nodes, partition);
+ }
+
+ return policy.getPreferredNodeIterator(nodes, partition);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org