You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2018/08/21 14:51:35 UTC

[2/2] hadoop git commit: YARN-7494. Add muti-node lookup mechanism and pluggable nodes sorting policies to optimize placement decision. Contributed by Sunil Govindan.

YARN-7494. Add muti-node lookup mechanism and pluggable nodes sorting policies to optimize placement decision. Contributed by Sunil Govindan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9c3fc3ef
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9c3fc3ef
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9c3fc3ef

Branch: refs/heads/trunk
Commit: 9c3fc3ef2865164aa5f121793ac914cfeb21a181
Parents: 54d0bf8
Author: Weiwei Yang <ww...@apache.org>
Authored: Tue Aug 21 22:42:23 2018 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Tue Aug 21 22:42:28 2018 +0800

----------------------------------------------------------------------
 .../resourcemanager/RMActiveServiceContext.java |  16 ++
 .../yarn/server/resourcemanager/RMContext.java  |   8 +-
 .../server/resourcemanager/RMContextImpl.java   |  14 +-
 .../server/resourcemanager/ResourceManager.java |  12 ++
 .../scheduler/AppSchedulingInfo.java            |  11 +-
 .../scheduler/ClusterNodeTracker.java           |  61 +++++++
 .../scheduler/activities/ActivitiesLogger.java  |  32 ++--
 .../scheduler/activities/ActivitiesManager.java |   8 +-
 .../scheduler/capacity/AbstractCSQueue.java     |  16 +-
 .../scheduler/capacity/CSQueue.java             |   6 +
 .../scheduler/capacity/CapacityScheduler.java   |  77 ++++++++-
 .../CapacitySchedulerConfiguration.java         | 116 +++++++++++++
 .../scheduler/capacity/LeafQueue.java           |  49 +++---
 .../scheduler/capacity/ParentQueue.java         |   4 +-
 .../allocator/RegularContainerAllocator.java    |  35 ++--
 .../common/ApplicationSchedulingConfig.java     |   4 +
 .../scheduler/common/fica/FiCaSchedulerApp.java |  23 +++
 .../LocalityAppPlacementAllocator.java          |  34 +++-
 .../placement/MultiNodeLookupPolicy.java        |  67 ++++++++
 .../placement/MultiNodePolicySpec.java          |  56 +++++++
 .../scheduler/placement/MultiNodeSorter.java    | 167 +++++++++++++++++++
 .../placement/MultiNodeSortingManager.java      | 139 +++++++++++++++
 .../ResourceUsageMultiNodeLookupPolicy.java     |  79 +++++++++
 .../reservation/ReservationSystemTestUtil.java  |   3 +
 .../scheduler/TestAppSchedulingInfo.java        |   3 +-
 .../capacity/CapacitySchedulerTestBase.java     |  13 ++
 .../capacity/TestCapacityScheduler.java         |  15 --
 .../TestCapacitySchedulerMultiNodes.java        | 166 ++++++++++++++++++
 .../TestCapacitySchedulerNodeLabelUpdate.java   |  70 ++++++++
 29 files changed, 1211 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 66065e3..8fb0de6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -43,9 +43,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -113,6 +115,7 @@ public class RMActiveServiceContext {
   private AllocationTagsManager allocationTagsManager;
   private PlacementConstraintManager placementConstraintManager;
   private ResourceProfilesManager resourceProfilesManager;
+  private MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager;
 
   public RMActiveServiceContext() {
     queuePlacementManager = new PlacementManager();
@@ -443,6 +446,19 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
+  public MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager() {
+    return multiNodeSortingManager;
+  }
+
+  @Private
+  @Unstable
+  public void setMultiNodeSortingManager(
+      MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager) {
+    this.multiNodeSortingManager = multiNodeSortingManager;
+  }
+
+  @Private
+  @Unstable
   public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
     this.schedulerRecoveryStartTime = systemClock.getTime();
     this.schedulerRecoveryWaitTime = waitTime;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index eb91a31..a30ff76 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -42,10 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -177,4 +178,9 @@ public interface RMContext extends ApplicationMasterServiceContext {
 
   void setPlacementConstraintManager(
       PlacementConstraintManager placementConstraintManager);
+
+  MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager();
+
+  void setMultiNodeSortingManager(
+      MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 84e0f6f..cb1d56f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -48,10 +48,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -538,6 +539,17 @@ public class RMContextImpl implements RMContext {
         delegatedNodeLabelsUpdater);
   }
 
+  @Override
+  public MultiNodeSortingManager<SchedulerNode> getMultiNodeSortingManager() {
+    return activeServiceContext.getMultiNodeSortingManager();
+  }
+
+  @Override
+  public void setMultiNodeSortingManager(
+      MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager) {
+    activeServiceContext.setMultiNodeSortingManager(multiNodeSortingManager);
+  }
+
   public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
     activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d459f0e..bdda871 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -96,11 +96,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
@@ -546,6 +548,10 @@ public class ResourceManager extends CompositeService
     return new FederationStateStoreService(rmContext);
   }
 
+  protected MultiNodeSortingManager<SchedulerNode> createMultiNodeSortingManager() {
+    return new MultiNodeSortingManager<SchedulerNode>();
+  }
+
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
     List<SystemMetricsPublisher> publishers =
         new ArrayList<SystemMetricsPublisher>();
@@ -665,6 +671,12 @@ public class ResourceManager extends CompositeService
       resourceProfilesManager.init(conf);
       rmContext.setResourceProfilesManager(resourceProfilesManager);
 
+      MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager =
+          createMultiNodeSortingManager();
+      multiNodeSortingManager.setRMContext(rmContext);
+      addService(multiNodeSortingManager);
+      rmContext.setMultiNodeSortingManager(multiNodeSortingManager);
+
       RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
           createRMDelegatedNodeLabelsUpdater();
       if (delegatedNodeLabelsUpdater != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index d63d2b82..ca7d9ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -93,7 +93,7 @@ public class AppSchedulingInfo {
   private final ReentrantReadWriteLock.WriteLock writeLock;
 
   public final ContainerUpdateContext updateContext;
-  public final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
+  private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
   private final RMContext rmContext;
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
@@ -782,4 +782,13 @@ public class AppSchedulingInfo {
       this.readLock.unlock();
     }
   }
+
+  /**
+   * Get scheduling envs configured for this application.
+   *
+   * @return a map of applicationSchedulingEnvs
+   */
+  public Map<String, String> getApplicationSchedulingEnvs() {
+    return applicationSchedulingEnvs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 66d8810..8c7e447 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -37,6 +37,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -57,6 +58,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   private HashMap<NodeId, N> nodes = new HashMap<>();
   private Map<String, N> nodeNameToNodeMap = new HashMap<>();
   private Map<String, List<N>> nodesPerRack = new HashMap<>();
+  private Map<String, List<N>> nodesPerLabel = new HashMap<>();
 
   private Resource clusterCapacity = Resources.createResource(0, 0);
   private volatile Resource staleClusterCapacity =
@@ -80,6 +82,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
       nodes.put(node.getNodeID(), node);
       nodeNameToNodeMap.put(node.getNodeName(), node);
 
+      List<N> nodesPerLabels = nodesPerLabel.get(node.getPartition());
+
+      if (nodesPerLabels == null) {
+        nodesPerLabels = new ArrayList<N>();
+      }
+      nodesPerLabels.add(node);
+
+      // Update new set of nodes for given partition.
+      nodesPerLabel.put(node.getPartition(), nodesPerLabels);
+
       // Update nodes per rack as well
       String rackName = node.getRackName();
       List<N> nodesList = nodesPerRack.get(rackName);
@@ -174,6 +186,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         }
       }
 
+      List<N> nodesPerPartition = nodesPerLabel.get(node.getPartition());
+      nodesPerPartition.remove(node);
+
+      // Update new set of nodes for given partition.
+      if (nodesPerPartition.isEmpty()) {
+        nodesPerLabel.remove(node.getPartition());
+      } else {
+        nodesPerLabel.put(node.getPartition(), nodesPerPartition);
+      }
+
       // Update cluster capacity
       Resources.subtractFrom(clusterCapacity, node.getTotalResource());
       staleClusterCapacity = Resources.clone(clusterCapacity);
@@ -420,4 +442,43 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     }
     return retNodes;
   }
+
+  /**
+   * update cached nodes per partition on a node label change event.
+   * @param partition nodeLabel
+   * @param nodeIds List of Node IDs
+   */
+  public void updateNodesPerPartition(String partition, Set<NodeId> nodeIds) {
+    writeLock.lock();
+    try {
+      // Clear all entries.
+      nodesPerLabel.remove(partition);
+
+      List<N> nodesPerPartition = new ArrayList<N>();
+      for (NodeId nodeId : nodeIds) {
+        N n = getNode(nodeId);
+        if (n != null) {
+          nodesPerPartition.add(n);
+        }
+      }
+
+      // Update new set of nodes for given partition.
+      nodesPerLabel.put(partition, nodesPerPartition);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public List<N> getNodesPerPartition(String partition) {
+    List<N> nodesPerPartition = null;
+    readLock.lock();
+    try {
+      if (nodesPerLabel.containsKey(partition)) {
+        nodesPerPartition = new ArrayList<N>(nodesPerLabel.get(partition));
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return nodesPerPartition;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
index 0c351b6..8a3ffce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -21,13 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
 /**
  * Utility for logging scheduler activities
@@ -63,7 +63,7 @@ public class ActivitiesLogger {
         SchedulerApplicationAttempt application, Priority priority,
         String diagnostic) {
       String type = "app";
-      if (activitiesManager == null) {
+      if (node == null || activitiesManager == null) {
         return;
       }
       if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@@ -84,18 +84,18 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, SchedulerNode node,
         SchedulerApplicationAttempt application, Priority priority,
         String diagnostic, ActivityState appState) {
-      if (activitiesManager == null) {
+      if (node == null || activitiesManager == null) {
         return;
       }
       if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
         String type = "container";
         // Add application-container activity into specific node allocation.
-        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+        activitiesManager.addSchedulingActivityForNode(node,
             application.getApplicationId().toString(), null,
             priority.toString(), ActivityState.SKIPPED, diagnostic, type);
         type = "app";
         // Add queue-application activity into specific node allocation.
-        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+        activitiesManager.addSchedulingActivityForNode(node,
             application.getQueueName(),
             application.getApplicationId().toString(),
             application.getPriority().toString(), ActivityState.SKIPPED,
@@ -121,20 +121,20 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, SchedulerNode node,
         SchedulerApplicationAttempt application, RMContainer updatedContainer,
         ActivityState activityState) {
-      if (activitiesManager == null) {
+      if (node == null || activitiesManager == null) {
         return;
       }
       if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
         String type = "container";
         // Add application-container activity into specific node allocation.
-        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+        activitiesManager.addSchedulingActivityForNode(node,
             application.getApplicationId().toString(),
             updatedContainer.getContainer().toString(),
             updatedContainer.getContainer().getPriority().toString(),
             activityState, ActivityDiagnosticConstant.EMPTY, type);
         type = "app";
         // Add queue-application activity into specific node allocation.
-        activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
+        activitiesManager.addSchedulingActivityForNode(node,
             application.getQueueName(),
             application.getApplicationId().toString(),
             application.getPriority().toString(), ActivityState.ACCEPTED,
@@ -157,13 +157,15 @@ public class ActivitiesLogger {
      * update.
      */
     public static void startAppAllocationRecording(
-        ActivitiesManager activitiesManager, NodeId nodeId, long currentTime,
+        ActivitiesManager activitiesManager, FiCaSchedulerNode node,
+        long currentTime,
         SchedulerApplicationAttempt application) {
-      if (activitiesManager == null) {
+      if (node == null || activitiesManager == null) {
         return;
       }
-      activitiesManager.startAppAllocationRecording(nodeId, currentTime,
-          application);
+      activitiesManager
+          .startAppAllocationRecording(node.getNodeID(), currentTime,
+              application);
     }
 
     /*
@@ -208,7 +210,7 @@ public class ActivitiesLogger {
     public static void recordQueueActivity(ActivitiesManager activitiesManager,
         SchedulerNode node, String parentQueueName, String queueName,
         ActivityState state, String diagnostic) {
-      if (activitiesManager == null) {
+      if (node == null || activitiesManager == null) {
         return;
       }
       if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@@ -240,7 +242,7 @@ public class ActivitiesLogger {
     public static void finishAllocatedNodeAllocation(
         ActivitiesManager activitiesManager, SchedulerNode node,
         ContainerId containerId, AllocationState containerState) {
-      if (activitiesManager == null) {
+      if (node == null || activitiesManager == null) {
         return;
       }
       if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
@@ -277,7 +279,7 @@ public class ActivitiesLogger {
       SchedulerNode node, String parentName, String childName,
       Priority priority, ActivityState state, String diagnostic, String type) {
 
-    activitiesManager.addSchedulingActivityForNode(node.getNodeID(), parentName,
+    activitiesManager.addSchedulingActivityForNode(node, parentName,
         childName, priority != null ? priority.toString() : null, state,
         diagnostic, type);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index 8498c40..5d96b17 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -197,11 +198,12 @@ public class ActivitiesManager extends AbstractService {
   }
 
   // Add queue, application or container activity into specific node allocation.
-  void addSchedulingActivityForNode(NodeId nodeID, String parentName,
+  void addSchedulingActivityForNode(SchedulerNode node, String parentName,
       String childName, String priority, ActivityState state, String diagnostic,
       String type) {
-    if (shouldRecordThisNode(nodeID)) {
-      NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
+    if (shouldRecordThisNode(node.getNodeID())) {
+      NodeAllocation nodeAllocation = getCurrentNodeAllocation(
+          node.getNodeID());
       nodeAllocation.addAllocationActivity(parentName, childName, priority,
           state, diagnostic, type);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 9c3e98f..2c9f9a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -92,7 +92,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   Set<String> resourceTypes;
   final RMNodeLabelsManager labelManager;
   String defaultLabelExpression;
-  
+  private String multiNodeSortingPolicyName = null;
+
   Map<AccessType, AccessControlList> acls = 
       new HashMap<AccessType, AccessControlList>();
   volatile boolean reservationsContinueLooking;
@@ -414,6 +415,10 @@ public abstract class AbstractCSQueue implements CSQueue {
       this.priority = configuration.getQueuePriority(
           getQueuePath());
 
+      // Update multi-node sorting algorithm for scheduling as configured.
+      setMultiNodeSortingPolicyName(
+          configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));
+
       this.userWeights = getUserWeightsFromHierarchy(configuration);
     } finally {
       writeLock.unlock();
@@ -1259,4 +1264,13 @@ public abstract class AbstractCSQueue implements CSQueue {
       this.writeLock.unlock();
     }
   }
+
+  @Override
+  public String getMultiNodeSortingPolicyName() {
+    return this.multiNodeSortingPolicyName;
+  }
+
+  public void setMultiNodeSortingPolicyName(String policyName) {
+    this.multiNodeSortingPolicyName = policyName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 3963dc0..c0c280e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -430,4 +430,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    * @return effective max queue capacity
    */
   Resource getEffectiveMaxCapacityDown(String label, Resource factor);
+
+  /**
+   * Get Multi Node scheduling policy name.
+   * @return policy name
+   */
+  String getMultiNodeSortingPolicyName();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 0b7fe92..dec1301 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -251,6 +252,7 @@ public class CapacityScheduler extends
   private ResourceCommitterService resourceCommitterService;
   private RMNodeLabelsManager labelManager;
   private AppPriorityACLsManager appPriorityACLManager;
+  private boolean multiNodePlacementEnabled;
 
   private static boolean printedVerboseLoggingForAsyncScheduling = false;
 
@@ -391,12 +393,23 @@ public class CapacityScheduler extends
       // Setup how many containers we can allocate for each round
       offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
 
+      // Register CS specific multi-node policies to common MultiNodeManager
+      // which will add to a MultiNodeSorter which gives a pre-sorted list of
+      // nodes to scheduler's allocation.
+      multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
+      if(rmContext.getMultiNodeSortingManager() != null) {
+        rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
+            multiNodePlacementEnabled,
+            this.conf.getMultiNodePlacementPolicies());
+      }
+
       LOG.info("Initialized CapacityScheduler with " + "calculator="
           + getResourceCalculator().getClass() + ", " + "minimumAllocation=<"
           + getMinimumResourceCapability() + ">, " + "maximumAllocation=<"
           + getMaximumResourceCapability() + ">, " + "asynchronousScheduling="
           + scheduleAsynchronously + ", " + "asyncScheduleInterval="
-          + asyncScheduleInterval + "ms");
+          + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
+          + multiNodePlacementEnabled);
     } finally {
       writeLock.unlock();
     }
@@ -1373,18 +1386,23 @@ public class CapacityScheduler extends
         assignment.getAssignmentInformation().getAllocationDetails();
     List<AssignmentInformation.AssignmentDetails> reservations =
         assignment.getAssignmentInformation().getReservationDetails();
+    // Get nodeId from allocated container if incoming argument is null.
+    NodeId updatedNodeid = (nodeId == null)
+        ? allocations.get(allocations.size() - 1).rmContainer.getNodeId()
+        : nodeId;
+
     if (!allocations.isEmpty()) {
       ContainerId allocatedContainerId =
           allocations.get(allocations.size() - 1).containerId;
       String allocatedQueue = allocations.get(allocations.size() - 1).queue;
-      schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId,
+      schedulerHealth.updateAllocation(now, updatedNodeid, allocatedContainerId,
         allocatedQueue);
     }
     if (!reservations.isEmpty()) {
       ContainerId reservedContainerId =
           reservations.get(reservations.size() - 1).containerId;
       String reservedQueue = reservations.get(reservations.size() - 1).queue;
-      schedulerHealth.updateReservation(now, nodeId, reservedContainerId,
+      schedulerHealth.updateReservation(now, updatedNodeid, reservedContainerId,
         reservedQueue);
     }
     schedulerHealth.updateSchedulerReservationCounts(assignment
@@ -1421,6 +1439,23 @@ public class CapacityScheduler extends
             || assignedContainers < maxAssignPerHeartbeat);
   }
 
+  private CandidateNodeSet<FiCaSchedulerNode> getCandidateNodeSet(
+      FiCaSchedulerNode node) {
+    CandidateNodeSet<FiCaSchedulerNode> candidates = null;
+    candidates = new SimpleCandidateNodeSet<>(node);
+    if (multiNodePlacementEnabled) {
+      Map<NodeId, FiCaSchedulerNode> nodesByPartition = new HashMap<>();
+      List<FiCaSchedulerNode> nodes = nodeTracker
+          .getNodesPerPartition(node.getPartition());
+      if (nodes != null && !nodes.isEmpty()) {
+        nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+        candidates = new SimpleCandidateNodeSet<FiCaSchedulerNode>(
+            nodesByPartition, node.getPartition());
+      }
+    }
+    return candidates;
+  }
+
   /**
    * We need to make sure when doing allocation, Node should be existed
    * And we will construct a {@link CandidateNodeSet} before proceeding
@@ -1432,8 +1467,8 @@ public class CapacityScheduler extends
       int offswitchCount = 0;
       int assignedContainers = 0;
 
-      CandidateNodeSet<FiCaSchedulerNode> candidates =
-          new SimpleCandidateNodeSet<>(node);
+      CandidateNodeSet<FiCaSchedulerNode> candidates = getCandidateNodeSet(
+          node);
       CSAssignment assignment = allocateContainersToNode(candidates,
           withNodeHeartbeat);
       // Only check if we can allocate more container on the same node when
@@ -1599,10 +1634,13 @@ public class CapacityScheduler extends
 
     if (Resources.greaterThan(calculator, getClusterResource(),
         assignment.getResource(), Resources.none())) {
+      FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
+      NodeId nodeId = null;
+      if (node != null) {
+        nodeId = node.getNodeID();
+      }
       if (withNodeHeartbeat) {
-        updateSchedulerHealth(lastNodeUpdateTime,
-            CandidateNodeSetUtils.getSingleNode(candidates).getNodeID(),
-            assignment);
+        updateSchedulerHealth(lastNodeUpdateTime, nodeId, assignment);
       }
       return assignment;
     }
@@ -1681,7 +1719,7 @@ public class CapacityScheduler extends
     // We have two different logics to handle allocation on single node / multi
     // nodes.
     CSAssignment assignment;
-    if (null != node) {
+    if (!multiNodePlacementEnabled) {
       assignment = allocateContainerOnSingleNode(candidates,
           node, withNodeHeartbeat);
     } else{
@@ -1869,12 +1907,21 @@ public class CapacityScheduler extends
       NodeLabelsUpdateSchedulerEvent labelUpdateEvent) {
     try {
       writeLock.lock();
+      Set<String> updateLabels = new HashSet<String>();
       for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
           .getUpdatedNodeToLabels().entrySet()) {
         NodeId id = entry.getKey();
         Set<String> labels = entry.getValue();
+        FiCaSchedulerNode node = nodeTracker.getNode(id);
+
+        if (node != null) {
+          // Update old partition to list.
+          updateLabels.add(node.getPartition());
+        }
         updateLabelsOnNode(id, labels);
+        updateLabels.addAll(labels);
       }
+      refreshLabelToNodeCache(updateLabels);
       Resource clusterResource = getClusterResource();
       getRootQueue().updateClusterResource(clusterResource,
           new ResourceLimits(clusterResource));
@@ -1883,6 +1930,18 @@ public class CapacityScheduler extends
     }
   }
 
+  private void refreshLabelToNodeCache(Set<String> updateLabels) {
+    Map<String, Set<NodeId>> labelMapping = labelManager
+        .getLabelsToNodes(updateLabels);
+    for (String label : updateLabels) {
+      Set<NodeId> nodes = labelMapping.get(label);
+      if (nodes == null) {
+        continue;
+      }
+      nodeTracker.updateNodesPerPartition(label, nodes);
+    }
+  }
+
   private void addNode(RMNode nodeManager) {
     try {
       writeLock.lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index e8de096..b937ae7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodePolicySpec;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@@ -2129,4 +2131,118 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
       break;
     }
   }
+
+  @Private public static final String MULTI_NODE_SORTING_POLICIES =
+      PREFIX + "multi-node-sorting.policy.names";
+
+  @Private public static final String MULTI_NODE_SORTING_POLICY_NAME =
+      PREFIX + "multi-node-sorting.policy";
+
+  /**
+   * resource usage based node sorting algorithm.
+   */
+  public static final String DEFAULT_NODE_SORTING_POLICY = "default";
+  public static final String DEFAULT_NODE_SORTING_POLICY_CLASSNAME
+      = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
+  public static final long DEFAULT_MULTI_NODE_SORTING_INTERVAL = 1000L;
+
+  @Private
+  public static final String MULTI_NODE_PLACEMENT_ENABLED = PREFIX
+      + "multi-node-placement-enabled";
+
+  @Private
+  public static final boolean DEFAULT_MULTI_NODE_PLACEMENT_ENABLED = false;
+
+  public String getMultiNodesSortingAlgorithmPolicy(
+      String queue) {
+
+    String policyName = get(
+        getQueuePrefix(queue) + "multi-node-sorting.policy");
+
+    if (policyName == null) {
+      policyName = get(MULTI_NODE_SORTING_POLICY_NAME);
+    }
+
+    // If node sorting policy is not configured in queue and in cluster level,
+    // it is been assumed that this queue is not enabled with multi-node lookup.
+    if (policyName == null || policyName.isEmpty()) {
+      return null;
+    }
+
+    String policyClassName = get(MULTI_NODE_SORTING_POLICY_NAME + DOT
+        + policyName.trim() + DOT + "class");
+
+    if (policyClassName == null || policyClassName.isEmpty()) {
+      throw new YarnRuntimeException(
+          policyName.trim() + " Class is not configured or not an instance of "
+              + MultiNodeLookupPolicy.class.getCanonicalName());
+    }
+
+    return normalizePolicyName(policyClassName.trim());
+  }
+
+  public boolean getMultiNodePlacementEnabled() {
+    return getBoolean(MULTI_NODE_PLACEMENT_ENABLED,
+        DEFAULT_MULTI_NODE_PLACEMENT_ENABLED);
+  }
+
+  public Set<MultiNodePolicySpec> getMultiNodePlacementPolicies() {
+    String[] policies = getTrimmedStrings(MULTI_NODE_SORTING_POLICIES);
+
+    // In other cases, split the accessibleLabelStr by ","
+    Set<MultiNodePolicySpec> set = new HashSet<MultiNodePolicySpec>();
+    for (String str : policies) {
+      if (!str.trim().isEmpty()) {
+        String policyClassName = get(
+            MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class");
+        if (str.trim().equals(DEFAULT_NODE_SORTING_POLICY)) {
+          policyClassName = get(
+              MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim() + DOT + "class",
+              DEFAULT_NODE_SORTING_POLICY_CLASSNAME);
+        }
+
+        // This check is needed as default class name is loaded only for
+        // DEFAULT_NODE_SORTING_POLICY.
+        if (policyClassName == null) {
+          throw new YarnRuntimeException(
+              str.trim() + " Class is not configured or not an instance of "
+                  + MultiNodeLookupPolicy.class.getCanonicalName());
+        }
+        policyClassName = normalizePolicyName(policyClassName.trim());
+        long policySortingInterval = getLong(
+            MULTI_NODE_SORTING_POLICY_NAME + DOT + str.trim()
+                + DOT + "sorting-interval.ms",
+            DEFAULT_MULTI_NODE_SORTING_INTERVAL);
+        if (policySortingInterval < 0) {
+          throw new YarnRuntimeException(
+              str.trim()
+                  + " multi-node policy is configured with invalid"
+                  + " sorting-interval:" + policySortingInterval);
+        }
+        set.add(
+            new MultiNodePolicySpec(policyClassName, policySortingInterval));
+      }
+    }
+
+    return Collections.unmodifiableSet(set);
+  }
+
+  private String normalizePolicyName(String policyName) {
+
+    // Ensure that custom node sorting algorithm class is valid.
+    try {
+      Class<?> nodeSortingPolicyClazz = getClassByName(policyName);
+      if (MultiNodeLookupPolicy.class
+          .isAssignableFrom(nodeSortingPolicyClazz)) {
+        return policyName;
+      } else {
+        throw new YarnRuntimeException(
+            "Class: " + policyName + " not instance of "
+                + MultiNodeLookupPolicy.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate " + "NodesSortingPolicy: " + policyName, e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 366bad0..ffe862f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -53,10 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
@@ -1036,23 +1032,24 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment allocateFromReservedContainer(Resource clusterResource,
       CandidateNodeSet<FiCaSchedulerNode> candidates,
       ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
-    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
-    if (null == node) {
-      return null;
-    }
-
-    RMContainer reservedContainer = node.getReservedContainer();
-    if (reservedContainer != null) {
-      FiCaSchedulerApp application = getApplication(
-          reservedContainer.getApplicationAttemptId());
-
-      if (null != application) {
-        ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-            node.getNodeID(), SystemClock.getInstance().getTime(), application);
-        CSAssignment assignment = application.assignContainers(clusterResource,
-            candidates, currentResourceLimits, schedulingMode,
-            reservedContainer);
-        return assignment;
+    // Considering multi-node scheduling, its better to iterate through
+    // all candidates and stop once we get atleast one good node to allocate
+    // where reservation was made earlier. In normal case, there is only one
+    // node and hence there wont be any impact after this change.
+    for (FiCaSchedulerNode node : candidates.getAllNodes().values()) {
+      RMContainer reservedContainer = node.getReservedContainer();
+      if (reservedContainer != null) {
+        FiCaSchedulerApp application = getApplication(
+            reservedContainer.getApplicationAttemptId());
+
+        if (null != application) {
+          ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+              node, SystemClock.getInstance().getTime(), application);
+          CSAssignment assignment = application.assignContainers(
+              clusterResource, candidates, currentResourceLimits,
+              schedulingMode, reservedContainer);
+          return assignment;
+        }
       }
     }
 
@@ -1114,13 +1111,14 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerApp application = assignmentIterator.next();
 
       ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-          node.getNodeID(), SystemClock.getInstance().getTime(), application);
+          node, SystemClock.getInstance().getTime(), application);
 
       // Check queue max-capacity limit
       Resource appReserved = application.getCurrentReservation();
       if (needAssignToQueueCheck) {
-        if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-            currentResourceLimits, appReserved, schedulingMode)) {
+        if (!super.canAssignToThisQueue(clusterResource,
+            candidates.getPartition(), currentResourceLimits, appReserved,
+            schedulingMode)) {
           ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
               activitiesManager, node, application, application.getPriority(),
               ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
@@ -1155,7 +1153,8 @@ public class LeafQueue extends AbstractCSQueue {
         userAssignable = false;
       } else {
         userAssignable = canAssignToUser(clusterResource, application.getUser(),
-            userLimit, application, node.getPartition(), currentResourceLimits);
+            userLimit, application, candidates.getPartition(),
+            currentResourceLimits);
         if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
           cul.canAssign = false;
           cul.reservation = appReserved;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 2363b88..80549ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -553,8 +553,8 @@ public class ParentQueue extends AbstractCSQueue {
 
       ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
           getParentName(), getQueueName(), ActivityState.REJECTED,
-          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
-              .getPartition());
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION
+              + candidates.getPartition());
       if (rootQueue) {
         ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
             node);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index a843002..3e337ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -96,11 +96,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
    * headroom, etc.
    */
   private ContainerAllocation preCheckForNodeCandidateSet(
-      Resource clusterResource, CandidateNodeSet<FiCaSchedulerNode> candidates,
+      Resource clusterResource, FiCaSchedulerNode node,
       SchedulingMode schedulingMode, ResourceLimits resourceLimits,
       SchedulerRequestKey schedulerKey) {
     Priority priority = schedulerKey.getPriority();
-    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
         ResourceRequest.ANY);
@@ -164,7 +163,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     }
 
     if (!checkHeadroom(clusterResource, resourceLimits, required,
-        candidates.getPartition())) {
+        node.getPartition())) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("cannot allocate required resource=" + required
             + " because of headroom");
@@ -801,20 +800,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // Do checks before determining which node to allocate
     // Directly return if this check fails.
     ContainerAllocation result;
-    if (reservedContainer == null) {
-      result = preCheckForNodeCandidateSet(clusterResource, candidates,
-          schedulingMode, resourceLimits, schedulerKey);
-      if (null != result) {
-        return result;
-      }
-    } else {
-      // pre-check when allocating reserved container
-      if (application.getOutstandingAsksCount(schedulerKey) == 0) {
-        // Release
-        return new ContainerAllocation(reservedContainer, null,
-            AllocationState.QUEUE_SKIPPED);
-      }
-    }
 
     AppPlacementAllocator<FiCaSchedulerNode> schedulingPS =
         application.getAppSchedulingInfo().getAppPlacementAllocator(
@@ -833,6 +818,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     while (iter.hasNext()) {
       FiCaSchedulerNode node = iter.next();
 
+      if (reservedContainer == null) {
+        result = preCheckForNodeCandidateSet(clusterResource, node,
+            schedulingMode, resourceLimits, schedulerKey);
+        if (null != result) {
+          continue;
+        }
+      } else {
+        // pre-check when allocating reserved container
+        if (application.getOutstandingAsksCount(schedulerKey) == 0) {
+          // Release
+          result = new ContainerAllocation(reservedContainer, null,
+              AllocationState.QUEUE_SKIPPED);
+          continue;
+        }
+      }
+
       result = tryAllocateOnNode(clusterResource, node, schedulingMode,
           resourceLimits, schedulerKey, reservedContainer);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
index 1bd3743..06f74de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
@@ -32,4 +32,8 @@ public class ApplicationSchedulingConfig {
   @InterfaceAudience.Private
   public static final Class<? extends AppPlacementAllocator>
       DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class;
+
+  @InterfaceAudience.Private
+  public static final String ENV_MULTI_NODE_SORTING_POLICY_CLASS =
+      "MULTI_NODE_SORTING_POLICY_CLASS";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 6a5af81..4bfdae9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCap
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
@@ -170,10 +171,32 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       rc = scheduler.getResourceCalculator();
     }
 
+    // Update multi-node sorting algorithm to scheduler envs
+    updateMultiNodeSortingPolicy(rmApp);
+
     containerAllocator = new ContainerAllocator(this, rc, rmContext,
         activitiesManager);
   }
 
+  private void updateMultiNodeSortingPolicy(RMApp rmApp) {
+    if (rmApp == null) {
+      return;
+    }
+
+    String queueName = null;
+    if (scheduler instanceof CapacityScheduler) {
+      queueName = getCSLeafQueue().getMultiNodeSortingPolicyName();
+    }
+
+    if (!appSchedulingInfo.getApplicationSchedulingEnvs().containsKey(
+        ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS)
+        && queueName != null) {
+      appSchedulingInfo.getApplicationSchedulingEnvs().put(
+          ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS,
+          queueName);
+    }
+  }
+
   public boolean containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event,
       String partition) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index f1df343..9d30e90 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -24,11 +24,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -55,6 +58,8 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
       new ConcurrentHashMap<>();
   private volatile String primaryRequestedPartition =
       RMNodeLabelsManager.NO_LABEL;
+  private MultiNodeSortingManager<N> multiNodeSortingManager = null;
+  private String multiNodeSortPolicyName;
 
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
@@ -65,6 +70,26 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
     writeLock = lock.writeLock();
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public void initialize(AppSchedulingInfo appSchedulingInfo,
+      SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
+    super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    multiNodeSortPolicyName = appSchedulingInfo
+        .getApplicationSchedulingEnvs().get(
+            ApplicationSchedulingConfig.ENV_MULTI_NODE_SORTING_POLICY_CLASS);
+    multiNodeSortingManager = (MultiNodeSortingManager<N>) rmContext
+        .getMultiNodeSortingManager();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "nodeLookupPolicy used for " + appSchedulingInfo
+              .getApplicationId()
+              + " is " + ((multiNodeSortPolicyName != null) ?
+              multiNodeSortPolicyName :
+              ""));
+    }
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public Iterator<N> getPreferredNodeIterator(
@@ -74,11 +99,16 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
     // in.
 
     N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
-    if (null != singleNode) {
+    if (singleNode != null) {
       return IteratorUtils.singletonIterator(singleNode);
     }
 
-    return IteratorUtils.emptyIterator();
+    // singleNode will be null if Multi-node placement lookup is enabled, and
+    // hence could consider sorting policies.
+    return multiNodeSortingManager.getMultiNodeSortIterator(
+        candidateNodeSet.getAllNodes().values(),
+        candidateNodeSet.getPartition(),
+        multiNodeSortPolicyName);
   }
 
   private boolean hasRequestLabelChanged(ResourceRequest requestOne,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java
new file mode 100644
index 0000000..662e34d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeLookupPolicy.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * <p>
+ * This class has the following functionality.
+ *
+ * <p>
+ * Provide an interface for MultiNodeLookupPolicy so that different placement
+ * allocator can choose nodes based on need.
+ * </p>
+ */
+public interface MultiNodeLookupPolicy<N extends SchedulerNode> {
+  /**
+   * Get iterator of preferred node depends on requirement and/or availability.
+   *
+   * @param nodes
+   *          List of Nodes
+   * @param partition
+   *          node label
+   *
+   * @return iterator of preferred node
+   */
+  Iterator<N> getPreferredNodeIterator(Collection<N> nodes, String partition);
+
+  /**
+   * Refresh working nodes set for re-ordering based on the algorithm selected.
+   *
+   * @param nodes
+   *          a collection working nm's.
+   */
+  void addAndRefreshNodesSet(Collection<N> nodes, String partition);
+
+  /**
+   * Get sorted nodes per partition.
+   *
+   * @param partition
+   *          node label
+   *
+   * @return collection of sorted nodes
+   */
+  Set<N> getNodesPerPartition(String partition);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java
new file mode 100644
index 0000000..8386d78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodePolicySpec.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+/**
+ * MultiNodePolicySpec contains policyName and timeout.
+ */
+public class MultiNodePolicySpec {
+
+  private String policyName;
+  private long sortingInterval;
+
+  public MultiNodePolicySpec(String policyName, long timeout) {
+    this.setSortingInterval(timeout);
+    this.setPolicyName(policyName);
+  }
+
+  public long getSortingInterval() {
+    return sortingInterval;
+  }
+
+  public void setSortingInterval(long timeout) {
+    this.sortingInterval = timeout;
+  }
+
+  public String getPolicyName() {
+    return policyName;
+  }
+
+  public void setPolicyName(String policyName) {
+    this.policyName = policyName;
+  }
+
+  @Override
+  public String toString() {
+    return "MultiNodePolicySpec {" +
+        "policyName='" + policyName + '\'' +
+        ", sortingInterval=" + sortingInterval +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
new file mode 100644
index 0000000..7e27c34
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSorter.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Common node sorting class which will do sorting based on policy spec.
+ * @param <N> extends SchedulerNode.
+ */
+public class MultiNodeSorter<N extends SchedulerNode> extends AbstractService {
+
+  private MultiNodeLookupPolicy<N> multiNodePolicy;
+  private static final Log LOG = LogFactory.getLog(MultiNodeSorter.class);
+
+  // ScheduledExecutorService which schedules the PreemptionChecker to run
+  // periodically.
+  private ScheduledExecutorService ses;
+  private ScheduledFuture<?> handler;
+  private volatile boolean stopped;
+  private RMContext rmContext;
+  private MultiNodePolicySpec policySpec;
+
+  public MultiNodeSorter(RMContext rmContext,
+      MultiNodePolicySpec policy) {
+    super("MultiNodeLookupPolicy");
+    this.rmContext = rmContext;
+    this.policySpec = policy;
+  }
+
+  @VisibleForTesting
+  public synchronized MultiNodeLookupPolicy<N> getMultiNodeLookupPolicy() {
+    return multiNodePolicy;
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    LOG.info("Initializing MultiNodeSorter=" + policySpec.getPolicyName()
+        + ", with sorting interval=" + policySpec.getSortingInterval());
+    initPolicy(policySpec.getPolicyName());
+    super.serviceInit(conf);
+  }
+
+  @SuppressWarnings("unchecked")
+  void initPolicy(String policyName) throws YarnException {
+    Class<?> policyClass;
+    try {
+      policyClass = Class.forName(policyName);
+    } catch (ClassNotFoundException e) {
+      throw new YarnException(
+          "Invalid policy name:" + policyName + e.getMessage());
+    }
+    this.multiNodePolicy = (MultiNodeLookupPolicy<N>) ReflectionUtils
+        .newInstance(policyClass, null);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    LOG.info("Starting SchedulingMonitor=" + getName());
+    assert !stopped : "starting when already stopped";
+    ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setName(getName());
+        return t;
+      }
+    });
+
+    // Start sorter thread only if sorting interval is a +ve value.
+    if(policySpec.getSortingInterval() != 0) {
+      handler = ses.scheduleAtFixedRate(new SortingThread(),
+          0, policySpec.getSortingInterval(), TimeUnit.MILLISECONDS);
+    }
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    stopped = true;
+    if (handler != null) {
+      LOG.info("Stop " + getName());
+      handler.cancel(true);
+      ses.shutdown();
+    }
+    super.serviceStop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @VisibleForTesting
+  public void reSortClusterNodes() {
+    Set<String> nodeLabels = new HashSet<>();
+    nodeLabels
+        .addAll(rmContext.getNodeLabelManager().getClusterNodeLabelNames());
+    nodeLabels.add(RMNodeLabelsManager.NO_LABEL);
+    for (String label : nodeLabels) {
+      Map<NodeId, SchedulerNode> nodesByPartition = new HashMap<>();
+      List<SchedulerNode> nodes = ((AbstractYarnScheduler) rmContext
+          .getScheduler()).getNodeTracker().getNodesPerPartition(label);
+      if (nodes != null && !nodes.isEmpty()) {
+        nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n));
+        multiNodePolicy.addAndRefreshNodesSet(
+            (Collection<N>) nodesByPartition.values(), label);
+      }
+    }
+  }
+
+  private class SortingThread implements Runnable {
+    @Override
+    public void run() {
+      try {
+        reSortClusterNodes();
+      } catch (Throwable t) {
+        // The preemption monitor does not alter structures nor do structures
+        // persist across invocations. Therefore, log, skip, and retry.
+        LOG.error("Exception raised while executing multinode"
+            + " sorter, skip this run..., exception=", t);
+      }
+    }
+  }
+
+  /**
+   * Verify whether sorter thread is running or not.
+   *
+   * @return true if sorter thread is running, false otherwise.
+   */
+  public boolean isSorterThreadRunning() {
+    return (handler != null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c3fc3ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
new file mode 100644
index 0000000..e872317
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/MultiNodeSortingManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * Node Sorting Manager which runs all sorter threads and policies.
+ * @param <N> extends SchedulerNode
+ */
+public class MultiNodeSortingManager<N extends SchedulerNode>
+    extends AbstractService {
+
+  private static final Log LOG = LogFactory
+      .getLog(MultiNodeSortingManager.class);
+
+  private RMContext rmContext;
+  private Map<String, MultiNodeSorter<N>> runningMultiNodeSorters;
+  private Set<MultiNodePolicySpec> policySpecs = new HashSet<MultiNodePolicySpec>();
+  private Configuration conf;
+  private boolean multiNodePlacementEnabled;
+
+  public MultiNodeSortingManager() {
+    super("MultiNodeSortingManager");
+    this.runningMultiNodeSorters = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void serviceInit(Configuration configuration) throws Exception {
+    LOG.info("Initializing NodeSortingService=" + getName());
+    super.serviceInit(configuration);
+    this.conf = configuration;
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    LOG.info("Starting NodeSortingService=" + getName());
+    createAllPolicies();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    for (MultiNodeSorter<N> sorter : runningMultiNodeSorters.values()) {
+      sorter.stop();
+    }
+    super.serviceStop();
+  }
+
+  private void createAllPolicies() {
+    if (!multiNodePlacementEnabled) {
+      return;
+    }
+    for (MultiNodePolicySpec policy : policySpecs) {
+      MultiNodeSorter<N> mon = new MultiNodeSorter<N>(rmContext, policy);
+      mon.init(conf);
+      mon.start();
+      runningMultiNodeSorters.put(policy.getPolicyName(), mon);
+    }
+  }
+
+  public MultiNodeSorter<N> getMultiNodePolicy(String name) {
+    return runningMultiNodeSorters.get(name);
+  }
+
+  public void setRMContext(RMContext context) {
+    this.rmContext = context;
+  }
+
+  public void registerMultiNodePolicyNames(
+      boolean isMultiNodePlacementEnabled,
+      Set<MultiNodePolicySpec> multiNodePlacementPolicies) {
+    this.policySpecs.addAll(multiNodePlacementPolicies);
+    this.multiNodePlacementEnabled = isMultiNodePlacementEnabled;
+    LOG.info("MultiNode scheduling is '" + multiNodePlacementEnabled +
+        "', and configured policies are " + StringUtils
+        .join(policySpecs.iterator(), ","));
+  }
+
+  public Iterator<N> getMultiNodeSortIterator(Collection<N> nodes,
+      String partition, String policyName) {
+    // nodeLookupPolicy can be null if app is configured with invalid policy.
+    // in such cases, use the the first node.
+    if(policyName == null) {
+      LOG.warn("Multi Node scheduling is enabled, however invalid class is"
+          + " configured. Valid sorting policy has to be configured in"
+          + " yarn.scheduler.capacity.<queue>.multi-node-sorting.policy");
+      return IteratorUtils.singletonIterator(
+          nodes.iterator().next());
+    }
+
+    MultiNodeSorter multiNodeSorter = getMultiNodePolicy(policyName);
+    if (multiNodeSorter == null) {
+      LOG.warn(
+          "MultiNode policy '" + policyName + "' is configured, however " +
+              "yarn.scheduler.capacity.multi-node-placement-enabled is false");
+      return IteratorUtils.singletonIterator(
+          nodes.iterator().next());
+    }
+
+    MultiNodeLookupPolicy<N> policy = multiNodeSorter
+        .getMultiNodeLookupPolicy();
+    // If sorter thread is not running, refresh node set.
+    if (!multiNodeSorter.isSorterThreadRunning()) {
+      policy.addAndRefreshNodesSet(nodes, partition);
+    }
+
+    return policy.getPreferredNodeIterator(nodes, partition);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org