You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/19 10:59:11 UTC

[1/2] hadoop git commit: YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers (asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/yarn-2877 b00875e60 -> 2340511f7


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
new file mode 100644
index 0000000..5aedbed
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TopKNodeSelector implements ClusterMonitor, NodeSelector {
+
+  final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
+
+  enum TopKComparator implements Comparator<ClusterNode> {
+    WAIT_TIME,
+    QUEUE_LENGTH;
+
+    @Override
+    public int compare(ClusterNode o1, ClusterNode o2) {
+      if (getQuant(o1) == getQuant(o2)) {
+        return o1.timestamp < o2.timestamp ? +1 : -1;
+      }
+      return getQuant(o1) > getQuant(o2) ? +1 : -1;
+    }
+
+    private int getQuant(ClusterNode c) {
+      return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
+    }
+  }
+
+  static class ClusterNode {
+    int queueTime = -1;
+    int waitQueueLength = 0;
+    double timestamp;
+    final NodeId nodeId;
+
+    public ClusterNode(NodeId nodeId) {
+      this.nodeId = nodeId;
+      updateTimestamp();
+    }
+
+    public ClusterNode setQueueTime(int queueTime) {
+      this.queueTime = queueTime;
+      return this;
+    }
+
+    public ClusterNode setWaitQueueLength(int queueLength) {
+      this.waitQueueLength = queueLength;
+      return this;
+    }
+
+    public ClusterNode updateTimestamp() {
+      this.timestamp = System.currentTimeMillis();
+      return this;
+    }
+  }
+
+  private final int k;
+  private final List<NodeId> topKNodes;
+  private final ScheduledExecutorService scheduledExecutor;
+  private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
+  private final Comparator<ClusterNode> comparator;
+
+  Runnable computeTask = new Runnable() {
+    @Override
+    public void run() {
+      synchronized (topKNodes) {
+        topKNodes.clear();
+        topKNodes.addAll(computeTopKNodes());
+      }
+    }
+  };
+
+  @VisibleForTesting
+  TopKNodeSelector(int k, TopKComparator comparator) {
+    this.k = k;
+    this.topKNodes = new ArrayList<>();
+    this.comparator = comparator;
+    this.scheduledExecutor = null;
+  }
+
+  public TopKNodeSelector(int k, long nodeComputationInterval,
+      TopKComparator comparator) {
+    this.k = k;
+    this.topKNodes = new ArrayList<>();
+    this.scheduledExecutor = Executors.newScheduledThreadPool(1);
+    this.comparator = comparator;
+    this.scheduledExecutor.scheduleAtFixedRate(computeTask,
+        nodeComputationInterval, nodeComputationInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+
+  @Override
+  public void addNode(List<NMContainerStatus> containerStatuses, RMNode
+      rmNode) {
+    LOG.debug("Node added event from: " + rmNode.getNode().getName());
+    // Ignoring this currently : atleast one NODE_UPDATE heartbeat is
+    // required to ensure node eligibility.
+  }
+
+  @Override
+  public void removeNode(RMNode removedRMNode) {
+    LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
+    synchronized (this.clusterNodes) {
+      if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
+        this.clusterNodes.remove(removedRMNode.getNodeID());
+        LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
+      } else {
+        LOG.debug("Node not in list!");
+      }
+    }
+  }
+
+  @Override
+  public void nodeUpdate(RMNode rmNode) {
+    LOG.debug("Node update event from: " + rmNode.getNodeID());
+    QueuedContainersStatus queuedContainersStatus =
+        rmNode.getQueuedContainersStatus();
+    int estimatedQueueWaitTime =
+        queuedContainersStatus.getEstimatedQueueWaitTime();
+    int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
+    // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
+    // UNLESS comparator is based on queue length, in which case, we should add
+    synchronized (this.clusterNodes) {
+      ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
+      if (currentNode == null) {
+        if (estimatedQueueWaitTime != -1
+            || comparator == TopKComparator.QUEUE_LENGTH) {
+          this.clusterNodes.put(rmNode.getNodeID(),
+              new ClusterNode(rmNode.getNodeID())
+                  .setQueueTime(estimatedQueueWaitTime)
+                  .setWaitQueueLength(waitQueueLength));
+          LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        } else {
+          LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        }
+      } else {
+        if (estimatedQueueWaitTime != -1
+            || comparator == TopKComparator.QUEUE_LENGTH) {
+          currentNode
+              .setQueueTime(estimatedQueueWaitTime)
+              .setWaitQueueLength(waitQueueLength)
+              .updateTimestamp();
+          LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        } else {
+          this.clusterNodes.remove(rmNode.getNodeID());
+          LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + currentNode.queueTime + "] and " +
+              "wait queue length [" + currentNode.waitQueueLength + "]");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
+    LOG.debug("Node resource update event from: " + rmNode.getNodeID());
+    // Ignoring this currently...
+  }
+
+  @Override
+  public List<NodeId> selectNodes() {
+    synchronized (this.topKNodes) {
+      return this.k < this.topKNodes.size() ?
+          new ArrayList<>(this.topKNodes).subList(0, this.k) :
+          new ArrayList<>(this.topKNodes);
+    }
+  }
+
+  @Override
+  public List<NodeId> selectNodes(Collection<SelectionHint> hints) {
+    List<NodeId> retList = selectNodes();
+    Set<NodeId> alreadyAdded = new HashSet<>(retList);
+    TreeSet<ClusterNode> toAdd = new TreeSet<>(this.comparator);
+    synchronized (this.clusterNodes) {
+      for (SelectionHint hint : hints) {
+        // Sort the nodes in the criteria (We need the best nodes)
+        PriorityQueue<ClusterNode> temp =
+            new PriorityQueue<>(hint.getNodeIds().length, this.comparator);
+        for (NodeId n : hint.getNodeIds()) {
+          if (!alreadyAdded.contains(n) &&
+              !toAdd.contains(clusterNodes.get(n))) {
+            temp.add(clusterNodes.get(n));
+          }
+        }
+        // From the Sorted list, select the 'minToInclude' best nodes
+        int numIncluded = 0;
+        while (!temp.isEmpty()) {
+          if (numIncluded < hint.getMinToInclude()) {
+            ClusterNode cn = temp.remove();
+            toAdd.add(cn);
+            alreadyAdded.add(cn.nodeId);
+            numIncluded++;
+          } else {
+            break;
+          }
+        }
+      }
+    }
+
+    if (toAdd.size() > 0) {
+      ArrayList<NodeId> newList = new ArrayList<>();
+      for (ClusterNode cn : toAdd) {
+        newList.add(cn.nodeId);
+      }
+      newList.addAll(retList);
+      retList = newList;
+    }
+    return retList;
+  }
+
+  private List<NodeId> computeTopKNodes() {
+    synchronized (this.clusterNodes) {
+      ArrayList aList = new ArrayList<>(this.clusterNodes.values());
+      List<NodeId> retList = new ArrayList<>();
+      Object[] nodes = aList.toArray();
+      // Collections.sort would do something similar by calling Arrays.sort
+      // internally but would finally iterate through the input list (aList)
+      // to reset the value of each element.. Since we don't really care about
+      // 'aList', we can use the iteration to create the list of nodeIds which
+      // is what we ultimately care about.
+      Arrays.sort(nodes, (Comparator)comparator);
+      for (int j=0; j < nodes.length; j++) {
+        retList.add(((ClusterNode)nodes[j]).nodeId);
+      }
+      return retList;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 89aff29..f5b61a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
@@ -260,6 +260,10 @@ public class MockNodes {
     public ResourceUtilization getNodeUtilization() {
       return this.nodeUtilization;
     }
+
+    public QueuedContainersStatus getQueuedContainersStatus() {
+      return null;
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 6182b07..9b4e1c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -90,6 +90,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
+    .DistributedSchedulingService;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index 3fa377e..c45fba8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -167,10 +169,11 @@ public class TestApplicationCleanup {
     MockRM rm = new MockRM() {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
deleted file mode 100644
index 262fd5a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
-    .AMLivelinessMonitor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-
-public class TestDistributedSchedulingService {
-
-  // Test if the DistributedSchedulingService can handle both DSProtocol as
-  // well as AMProtocol clients
-  @Test
-  public void testRPCWrapping() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
-        .getName());
-    YarnRPC rpc = YarnRPC.create(conf);
-    String bindAddr = "localhost:0";
-    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
-    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
-    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
-    final RMContext rmContext = new RMContextImpl() {
-      @Override
-      public AMLivelinessMonitor getAMLivelinessMonitor() {
-        return null;
-      }
-    };
-    DistributedSchedulingService service =
-        new DistributedSchedulingService(rmContext, null) {
-          @Override
-          public RegisterApplicationMasterResponse registerApplicationMaster
-              (RegisterApplicationMasterRequest request) throws
-              YarnException, IOException {
-            RegisterApplicationMasterResponse resp = factory.newRecordInstance(
-                RegisterApplicationMasterResponse.class);
-            // Dummy Entry to Assert that we get this object back
-            resp.setQueue("dummyQueue");
-            return resp;
-          }
-
-          @Override
-          public FinishApplicationMasterResponse finishApplicationMaster
-              (FinishApplicationMasterRequest request) throws YarnException,
-              IOException {
-            FinishApplicationMasterResponse resp = factory.newRecordInstance(
-                FinishApplicationMasterResponse.class);
-            // Dummy Entry to Assert that we get this object back
-            resp.setIsUnregistered(false);
-            return resp;
-          }
-
-          @Override
-          public AllocateResponse allocate(AllocateRequest request) throws
-              YarnException, IOException {
-            AllocateResponse response = factory.newRecordInstance
-                (AllocateResponse.class);
-            response.setNumClusterNodes(12345);
-            return response;
-          }
-
-          @Override
-          public DistSchedRegisterResponse
-          registerApplicationMasterForDistributedScheduling
-              (RegisterApplicationMasterRequest request) throws
-              YarnException, IOException {
-            DistSchedRegisterResponse resp = factory.newRecordInstance(
-                DistSchedRegisterResponse.class);
-            resp.setContainerIdStart(54321l);
-            return resp;
-          }
-
-          @Override
-          public DistSchedAllocateResponse allocateForDistributedScheduling
-              (AllocateRequest request) throws YarnException, IOException {
-            DistSchedAllocateResponse resp =
-                factory.newRecordInstance(DistSchedAllocateResponse.class);
-            resp.setNodesForScheduling(
-                Arrays.asList(NodeId.newInstance("h1", 1234)));
-            return resp;
-          }
-        };
-    Server server = service.getServer(rpc, conf, addr, null);
-    server.start();
-
-    // Verify that the DistrubutedSchedulingService can handle vanilla
-    // ApplicationMasterProtocol clients
-    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
-        ProtobufRpcEngine.class);
-    ApplicationMasterProtocol ampProxy =
-        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
-            .class, NetUtils.getConnectAddress(server), conf);
-    RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster(
-            factory.newRecordInstance(RegisterApplicationMasterRequest.class));
-    Assert.assertEquals("dummyQueue", regResp.getQueue());
-    FinishApplicationMasterResponse finishResp = ampProxy
-        .finishApplicationMaster(factory.newRecordInstance(
-            FinishApplicationMasterRequest.class));
-    Assert.assertEquals(false, finishResp.getIsUnregistered());
-    AllocateResponse allocResp = ampProxy
-        .allocate(factory.newRecordInstance(AllocateRequest.class));
-    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
-
-
-    // Verify that the DistrubutedSchedulingService can handle the
-    // DistributedSchedulerProtocol clients as well
-    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
-        ProtobufRpcEngine.class);
-    DistributedSchedulerProtocol dsProxy =
-        (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol
-            .class, NetUtils.getConnectAddress(server), conf);
-
-    DistSchedRegisterResponse dsRegResp =
-        dsProxy.registerApplicationMasterForDistributedScheduling(
-        factory.newRecordInstance(RegisterApplicationMasterRequest.class));
-    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
-    DistSchedAllocateResponse dsAllocResp =
-        dsProxy.allocateForDistributedScheduling(
-            factory.newRecordInstance(AllocateRequest.class));
-    Assert.assertEquals(
-        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
index d9306dd..429817e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@@ -44,8 +44,8 @@ public class TestRMDispatcher {
     AsyncDispatcher rmDispatcher = new AsyncDispatcher();
     CapacityScheduler sched = spy(new CapacityScheduler());
     YarnConfiguration conf = new YarnConfiguration();
-    SchedulerEventDispatcher schedulerDispatcher =
-        new SchedulerEventDispatcher(sched);
+    EventDispatcher schedulerDispatcher =
+        new EventDispatcher(sched, sched.getClass().getName());
     rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
     rmDispatcher.init(conf);
     rmDispatcher.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e0fd9ab..119ae09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -987,7 +988,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     rm = new MockRM() {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
             scheduler.handle(event);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index f4cb3b3..458f94d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
 
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -65,10 +66,11 @@ public class TestAMRMRPCNodeUpdates {
       }
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 5035afe..e5ba470 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -423,10 +424,11 @@ public class TestAMRestart {
     MockRM rm1 = new MockRM(conf, memStore) {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
new file mode 100644
index 0000000..35ab6a9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
+    .AMLivelinessMonitor;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
+    .DistributedSchedulingService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+public class TestDistributedSchedulingService {
+
+  // Test if the DistributedSchedulingService can handle both DSProtocol as
+  // well as AMProtocol clients
+  @Test
+  public void testRPCWrapping() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
+    final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
+    final RMContext rmContext = new RMContextImpl() {
+      @Override
+      public AMLivelinessMonitor getAMLivelinessMonitor() {
+        return null;
+      }
+
+      @Override
+      public Configuration getYarnConfiguration() {
+        return new YarnConfiguration();
+      }
+    };
+    DistributedSchedulingService service =
+        new DistributedSchedulingService(rmContext, null) {
+          @Override
+          public RegisterApplicationMasterResponse registerApplicationMaster
+              (RegisterApplicationMasterRequest request) throws
+              YarnException, IOException {
+            RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+                RegisterApplicationMasterResponse.class);
+            // Dummy Entry to Assert that we get this object back
+            resp.setQueue("dummyQueue");
+            return resp;
+          }
+
+          @Override
+          public FinishApplicationMasterResponse finishApplicationMaster
+              (FinishApplicationMasterRequest request) throws YarnException,
+              IOException {
+            FinishApplicationMasterResponse resp = factory.newRecordInstance(
+                FinishApplicationMasterResponse.class);
+            // Dummy Entry to Assert that we get this object back
+            resp.setIsUnregistered(false);
+            return resp;
+          }
+
+          @Override
+          public AllocateResponse allocate(AllocateRequest request) throws
+              YarnException, IOException {
+            AllocateResponse response = factory.newRecordInstance
+                (AllocateResponse.class);
+            response.setNumClusterNodes(12345);
+            return response;
+          }
+
+          @Override
+          public DistSchedRegisterResponse
+          registerApplicationMasterForDistributedScheduling
+              (RegisterApplicationMasterRequest request) throws
+              YarnException, IOException {
+            DistSchedRegisterResponse resp = factory.newRecordInstance(
+                DistSchedRegisterResponse.class);
+            resp.setContainerIdStart(54321l);
+            return resp;
+          }
+
+          @Override
+          public DistSchedAllocateResponse allocateForDistributedScheduling
+              (AllocateRequest request) throws YarnException, IOException {
+            DistSchedAllocateResponse resp =
+                factory.newRecordInstance(DistSchedAllocateResponse.class);
+            resp.setNodesForScheduling(
+                Arrays.asList(NodeId.newInstance("h1", 1234)));
+            return resp;
+          }
+        };
+    Server server = service.getServer(rpc, conf, addr, null);
+    server.start();
+
+    // Verify that the DistrubutedSchedulingService can handle vanilla
+    // ApplicationMasterProtocol clients
+    RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
+        ProtobufRpcEngine.class);
+    ApplicationMasterProtocol ampProxy =
+        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
+            .class, NetUtils.getConnectAddress(server), conf);
+    RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster(
+            factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+    Assert.assertEquals("dummyQueue", regResp.getQueue());
+    FinishApplicationMasterResponse finishResp = ampProxy
+        .finishApplicationMaster(factory.newRecordInstance(
+            FinishApplicationMasterRequest.class));
+    Assert.assertEquals(false, finishResp.getIsUnregistered());
+    AllocateResponse allocResp = ampProxy
+        .allocate(factory.newRecordInstance(AllocateRequest.class));
+    Assert.assertEquals(12345, allocResp.getNumClusterNodes());
+
+
+    // Verify that the DistrubutedSchedulingService can handle the
+    // DistributedSchedulerProtocol clients as well
+    RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    DistributedSchedulerProtocol dsProxy =
+        (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol
+            .class, NetUtils.getConnectAddress(server), conf);
+
+    DistSchedRegisterResponse dsRegResp =
+        dsProxy.registerApplicationMasterForDistributedScheduling(
+        factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+    Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
+    DistSchedAllocateResponse dsAllocResp =
+        dsProxy.allocateForDistributedScheduling(
+            factory.newRecordInstance(AllocateRequest.class));
+    Assert.assertEquals(
+        "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
new file mode 100644
index 0000000..a21ae19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class TestTopKNodeSelector {
+
+  static class FakeNodeId extends NodeId {
+    final String host;
+    final int port;
+
+    public FakeNodeId(String host, int port) {
+      this.host = host;
+      this.port = port;
+    }
+
+    @Override
+    public String getHost() {
+      return host;
+    }
+
+    @Override
+    public int getPort() {
+      return port;
+    }
+
+    @Override
+    protected void setHost(String host) {}
+    @Override
+    protected void setPort(int port) {}
+    @Override
+    protected void build() {}
+
+    @Override
+    public String toString() {
+      return host + ":" + port;
+    }
+  }
+
+  @Test
+  public void testQueueTimeSort() {
+    TopKNodeSelector selector = new TopKNodeSelector(5,
+        TopKNodeSelector.TopKComparator.WAIT_TIME);
+    selector.nodeUpdate(createRMNode("h1", 1, 15, 10));
+    selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
+    selector.nodeUpdate(createRMNode("h3", 3, 10, 10));
+    selector.computeTask.run();
+    List<NodeId> nodeIds = selector.selectNodes();
+    System.out.println("1-> " + nodeIds);
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now update node3
+    selector.nodeUpdate(createRMNode("h3", 3, 2, 10));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("2-> "+ nodeIds);
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now send update with -1 wait time
+    selector.nodeUpdate(createRMNode("h4", 4, -1, 10));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("3-> "+ nodeIds);
+    // No change
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+  }
+
+  @Test
+  public void testSelectionHints() {
+    TopKNodeSelector selector = new TopKNodeSelector(3,
+        TopKNodeSelector.TopKComparator.WAIT_TIME);
+    selector.nodeUpdate(createRMNode("h1", 1, 100, 10));
+    selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
+    selector.nodeUpdate(createRMNode("h3", 3, 95, 10));
+    selector.nodeUpdate(createRMNode("h4", 4, 10, 10));
+    selector.nodeUpdate(createRMNode("h5", 5, 90, 10));
+    selector.nodeUpdate(createRMNode("h6", 6, 15, 10));
+    selector.computeTask.run();
+
+    List<NodeId> nodeIds = selector.selectNodes();
+    System.out.println("1-> " + nodeIds);
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(1).toString());
+    Assert.assertEquals("h6:6", nodeIds.get(2).toString());
+
+    nodeIds = selector.selectNodes(
+        Arrays.asList(
+            new NodeSelector.SelectionHint(Arrays.asList(
+                (NodeId) new FakeNodeId("h1", 1)
+            ), 1),
+            new NodeSelector.SelectionHint(Arrays.asList(
+                (NodeId) new FakeNodeId("h3", 3)
+            ), 1)
+        ));
+    System.out.println("2-> " + nodeIds);
+    // Enure the hinted nodes are on the top
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+    Assert.assertEquals("h6:6", nodeIds.get(4).toString());
+
+    nodeIds = selector.selectNodes(
+        Arrays.asList(
+            new NodeSelector.SelectionHint(Arrays.asList(
+                (NodeId) new FakeNodeId("h1", 1),
+                new FakeNodeId("h3", 3),
+                new FakeNodeId("h5", 5)
+            ), 2)
+        ));
+    System.out.println("3-> " + nodeIds);
+    // Enure the hinted nodes are on the top
+    Assert.assertEquals("h5:5", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+    Assert.assertEquals("h6:6", nodeIds.get(4).toString());
+  }
+
+  @Test
+  public void testQueueLengthSort() {
+    TopKNodeSelector selector = new TopKNodeSelector(5,
+        TopKNodeSelector.TopKComparator.QUEUE_LENGTH);
+    selector.nodeUpdate(createRMNode("h1", 1, -1, 15));
+    selector.nodeUpdate(createRMNode("h2", 2, -1, 5));
+    selector.nodeUpdate(createRMNode("h3", 3, -1, 10));
+    selector.computeTask.run();
+    List<NodeId> nodeIds = selector.selectNodes();
+    System.out.println("1-> " + nodeIds);
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now update node3
+    selector.nodeUpdate(createRMNode("h3", 3, -1, 2));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("2-> "+ nodeIds);
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now send update with -1 wait time but valid length
+    selector.nodeUpdate(createRMNode("h4", 4, -1, 20));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("3-> "+ nodeIds);
+    // No change
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+  }
+
+  private RMNode createRMNode(String host, int port,
+      int waitTime, int queueLength) {
+    RMNode node1 = Mockito.mock(RMNode.class);
+    NodeId nID1 = new FakeNodeId(host, port);
+    Mockito.when(node1.getNodeID()).thenReturn(nID1);
+    QueuedContainersStatus status1 =
+        Mockito.mock(QueuedContainersStatus.class);
+    Mockito.when(status1.getEstimatedQueueWaitTime())
+        .thenReturn(waitTime);
+    Mockito.when(status1.getWaitQueueLength())
+        .thenReturn(queueLength);
+    Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
+    return node1;
+  }
+}


[2/2] hadoop git commit: YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers (asuresh)

Posted by as...@apache.org.
YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2340511f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2340511f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2340511f

Branch: refs/heads/yarn-2877
Commit: 2340511f730a8af4920d8459e2c6b4aefc94bd43
Parents: b00875e
Author: Arun Suresh <as...@apache.org>
Authored: Thu Feb 11 13:48:36 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Feb 19 01:58:53 2016 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   5 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   5 +
 hadoop-yarn-project/CHANGES-yarn-2877.txt       |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  17 +-
 .../hadoop/yarn/event/EventDispatcher.java      | 137 ++++++++
 .../src/main/resources/yarn-default.xml         |  30 ++
 .../yarn/server/api/records/NodeStatus.java     |   9 +
 .../api/records/QueuedContainersStatus.java     |  45 +++
 .../api/records/impl/pb/NodeStatusPBImpl.java   |  40 ++-
 .../impl/pb/QueuedContainersStatusPBImpl.java   |  80 +++++
 .../main/proto/yarn_server_common_protos.proto  |   6 +
 .../protocolrecords/TestProtocolRecords.java    |  30 ++
 .../nodemanager/NodeStatusUpdaterImpl.java      |  10 +
 .../monitor/ContainersMonitor.java              |   2 +
 .../monitor/ContainersMonitorImpl.java          |  12 +
 .../server/resourcemanager/ClusterMonitor.java  |  36 ++
 .../DistributedSchedulingService.java           | 162 ---------
 .../server/resourcemanager/NodeSelector.java    |  74 ++++
 .../server/resourcemanager/ResourceManager.java | 119 +------
 .../server/resourcemanager/rmnode/RMNode.java   |   4 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  29 +-
 .../rmnode/RMNodeStatusEvent.java               |   7 +
 .../DistributedSchedulingService.java           | 341 +++++++++++++++++++
 .../scheduler/distributed/TopKNodeSelector.java | 273 +++++++++++++++
 .../yarn/server/resourcemanager/MockNodes.java  |   6 +-
 .../yarn/server/resourcemanager/MockRM.java     |   2 +
 .../resourcemanager/TestApplicationCleanup.java |   7 +-
 .../TestDistributedSchedulingService.java       | 170 ---------
 .../resourcemanager/TestRMDispatcher.java       |   6 +-
 .../TestResourceTrackerService.java             |   4 +-
 .../TestAMRMRPCNodeUpdates.java                 |   6 +-
 .../applicationsmanager/TestAMRestart.java      |   6 +-
 .../TestDistributedSchedulingService.java       | 179 ++++++++++
 .../distributed/TestTopKNodeSelector.java       | 201 +++++++++++
 34 files changed, 1609 insertions(+), 453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 92d586b..85096ba 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -190,6 +191,10 @@ public class NodeInfo {
       return null;
     }
 
+    public QueuedContainersStatus getQueuedContainersStatus() {
+      return null;
+    }
+
     @Override
     public ResourceUtilization getAggregatedContainersUtilization() {
       return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 2e9cccb..ab82e66 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -179,6 +180,10 @@ public class RMNodeWrapper implements RMNode {
     return Collections.EMPTY_LIST;
   }
 
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    return null;
+  }
+
   @Override
   public ResourceUtilization getAggregatedContainersUtilization() {
     return node.getAggregatedContainersUtilization();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/CHANGES-yarn-2877.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES-yarn-2877.txt b/hadoop-yarn-project/CHANGES-yarn-2877.txt
index a147866..e3b4c50 100644
--- a/hadoop-yarn-project/CHANGES-yarn-2877.txt
+++ b/hadoop-yarn-project/CHANGES-yarn-2877.txt
@@ -16,3 +16,5 @@ yarn-2877 distributed scheduling (Unreleased)
     YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator
     to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)
 
+    YARN-4412. Create ClusterMonitor to compute ordered list of preferred
+    NMs for OPPORTUNITIC containers (asuresh)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index edae3eb..c0265ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -326,8 +326,21 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "distributed-scheduling.incr-vcores";
   public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
 
-  /** Container token expiry for container allocated via Distributed
-   * Scheduling. */
+  public static final String DIST_SCHEDULING_TOP_K =
+      YARN_PREFIX + "distributed-scheduling.top-k";
+  public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
+
+  public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
+      YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
+  public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
+
+  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
+      YARN_PREFIX + "distributed-scheduling.top-k-comparator";
+  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
+      "WAIT_TIME";
+
+  /** Container token expiry for container allocated via
+   * Distributed Scheduling */
   public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
       YARN_PREFIX + "distributed-scheduling.container-token-expiry";
   public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
new file mode 100644
index 0000000..8a5ad92
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.event;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * This is a specialized EventHandler to be used by Services that are expected
+ * handle a large number of events efficiently by ensuring that the caller
+ * thread is not blocked. Events are immediately stored in a BlockingQueue and
+ * a separate dedicated Thread consumes events from the queue and handles
+ * appropriately
+ * @param <T> Type of Event
+ */
+public class EventDispatcher<T extends Event> extends
+    AbstractService implements EventHandler<T> {
+
+  private final EventHandler<T> handler;
+  private final BlockingQueue<T> eventQueue =
+      new LinkedBlockingDeque<>();
+  private final Thread eventProcessor;
+  private volatile boolean stopped = false;
+  private boolean shouldExitOnError = false;
+
+  private static final Log LOG = LogFactory.getLog(EventDispatcher.class);
+
+  private final class EventProcessor implements Runnable {
+    @Override
+    public void run() {
+
+      T event;
+
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        try {
+          event = eventQueue.take();
+        } catch (InterruptedException e) {
+          LOG.error("Returning, interrupted : " + e);
+          return; // TODO: Kill RM.
+        }
+
+        try {
+          handler.handle(event);
+        } catch (Throwable t) {
+          // An error occurred, but we are shutting down anyway.
+          // If it was an InterruptedException, the very act of
+          // shutdown could have caused it and is probably harmless.
+          if (stopped) {
+            LOG.warn("Exception during shutdown: ", t);
+            break;
+          }
+          LOG.fatal("Error in handling event type " + event.getType()
+              + " to the Event Dispatcher", t);
+          if (shouldExitOnError
+              && !ShutdownHookManager.get().isShutdownInProgress()) {
+            LOG.info("Exiting, bbye..");
+            System.exit(-1);
+          }
+        }
+      }
+    }
+  }
+
+  public EventDispatcher(EventHandler<T> handler, String name) {
+    super(name);
+    this.handler = handler;
+    this.eventProcessor = new Thread(new EventProcessor());
+    this.eventProcessor.setName(getName() + ":Event Processor");
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.shouldExitOnError =
+        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    this.eventProcessor.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    this.stopped = true;
+    this.eventProcessor.interrupt();
+    try {
+      this.eventProcessor.join();
+    } catch (InterruptedException e) {
+      throw new YarnRuntimeException(e);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(T event) {
+    try {
+      int qSize = eventQueue.size();
+      if (qSize !=0 && qSize %1000 == 0) {
+        LOG.info("Size of " + getName() + " event-queue is " + qSize);
+      }
+      int remCapacity = eventQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.info("Very low remaining capacity on " + getName() + "" +
+            "event queue: " + remCapacity);
+      }
+      this.eventQueue.put(event);
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted. Trying to exit gracefully.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d8ea3ad..cbadd63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2240,6 +2240,36 @@
     <value>4096</value>
   </property>
 
+  <!-- Distributed Scheduling configuration -->
+  <property>
+    <description>
+      The interval in milliseconds specifying the frequency at which the
+      Distributed Scheduling Cluster Monitor will recomute the top K
+      viable nodes on which OPPORTUNISTIC containers can be scheduled
+    </description>
+    <name>yarn.distributed-scheduling.top-k-compute-interval-ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>
+      The Default comparator used by the Distributed Scheduling Cluster
+      Monitor to order the top K nodes on which OPPORTUNISTIC containers can
+      be scheduled. The allowed values are "WAIT_TIME" and "QUEUE_LENGTH"
+    </description>
+    <name>yarn.distributed-scheduling.top-k-comparator</name>
+    <value>WAIT_TIME</value>
+  </property>
+
+  <property>
+    <description>
+      The max number of nodes returned by the Distributed Scheduling Cluster
+      Monitor. (The value of K in top-K)
+    </description>
+    <name>yarn.distributed-scheduling.top-k</name>
+    <value>10</value>
+  </property>
+
   <!-- Node Labels Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 836cd4b..89e054b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -122,4 +122,13 @@ public abstract class NodeStatus {
   @Unstable
   public abstract void setIncreasedContainers(
       List<Container> increasedContainers);
+
+  @Private
+  @Unstable
+  public abstract QueuedContainersStatus getQueuedContainersStatus();
+
+  @Private
+  @Unstable
+  public abstract void setQueuedContainersStatus(
+      QueuedContainersStatus queuedContainersStatus);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
new file mode 100644
index 0000000..a7f0ece
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>
+ * <code>QueuedContainersStatus</code> captures information pertaining to the
+ * state of execution of the Queueable containers within a node.
+ * </p>
+ */
+@Private
+@Evolving
+public abstract class QueuedContainersStatus {
+  public static QueuedContainersStatus newInstance() {
+    return Records.newRecord(QueuedContainersStatus.class);
+  }
+
+  public abstract int getEstimatedQueueWaitTime();
+
+  public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
+
+  public abstract int getWaitQueueLength();
+
+  public abstract void setWaitQueueLength(int queueWaitTime);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 8dd4832..9a9a83a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -33,14 +33,17 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
+
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
@@ -400,6 +403,27 @@ public class NodeStatusPBImpl extends NodeStatus {
     this.increasedContainers = increasedContainers;
   }
 
+  @Override
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    NodeStatusProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    if (!p.hasQueuedContainerStatus()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getQueuedContainerStatus());
+  }
+
+  @Override
+  public void setQueuedContainersStatus(QueuedContainersStatus queuedContainersStatus) {
+    maybeInitBuilder();
+    if (queuedContainersStatus == null) {
+      this.builder.clearQueuedContainerStatus();
+      return;
+    }
+    this.builder.setQueuedContainerStatus(
+        convertToProtoFormat(queuedContainersStatus));
+  }
+
   private NodeIdProto convertToProtoFormat(NodeId nodeId) {
     return ((NodeIdPBImpl)nodeId).getProto();
   }
@@ -433,15 +457,25 @@ public class NodeStatusPBImpl extends NodeStatus {
     return ((ApplicationIdPBImpl)c).getProto();
   }
 
-  private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
+  private YarnProtos.ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
     return ((ResourceUtilizationPBImpl) r).getProto();
   }
 
   private ResourceUtilizationPBImpl convertFromProtoFormat(
-      ResourceUtilizationProto p) {
+      YarnProtos.ResourceUtilizationProto p) {
     return new ResourceUtilizationPBImpl(p);
   }
 
+  private YarnServerCommonProtos.QueuedContainersStatusProto convertToProtoFormat(
+      QueuedContainersStatus r) {
+    return ((QueuedContainersStatusPBImpl) r).getProto();
+  }
+
+  private QueuedContainersStatus convertFromProtoFormat(
+      YarnServerCommonProtos.QueuedContainersStatusProto p) {
+    return new QueuedContainersStatusPBImpl(p);
+  }
+
   private ContainerPBImpl convertFromProtoFormat(
       ContainerProto c) {
     return new ContainerPBImpl(c);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
new file mode 100644
index 0000000..54470f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+
+public class QueuedContainersStatusPBImpl extends QueuedContainersStatus {
+
+  private YarnServerCommonProtos.QueuedContainersStatusProto proto =
+      YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance();
+  private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public QueuedContainersStatusPBImpl() {
+    builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder();
+  }
+
+  public QueuedContainersStatusPBImpl(YarnServerCommonProtos
+      .QueuedContainersStatusProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public YarnServerCommonProtos.QueuedContainersStatusProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder =
+          YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public int getEstimatedQueueWaitTime() {
+    YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getEstimatedQueueWaitTime();
+  }
+
+  @Override
+  public void setEstimatedQueueWaitTime(int queueWaitTime) {
+    maybeInitBuilder();
+    builder.setEstimatedQueueWaitTime(queueWaitTime);
+  }
+
+  @Override
+  public int getWaitQueueLength() {
+    YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getWaitQueueLength();
+  }
+
+  @Override
+  public void setWaitQueueLength(int waitQueueLength) {
+    maybeInitBuilder();
+    builder.setWaitQueueLength(waitQueueLength);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 77064a0..c23d557 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -39,6 +39,12 @@ message NodeStatusProto {
   optional ResourceUtilizationProto containers_utilization = 6;
   optional ResourceUtilizationProto node_utilization = 7;
   repeated ContainerProto increased_containers = 8;
+  optional QueuedContainersStatusProto queued_container_status = 9;
+}
+
+message QueuedContainersStatusProto {
+  optional int32 estimated_queue_wait_time = 1;
+  optional int32 wait_queue_length = 2;
 }
 
 message MasterKeyProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
index 86e49f0..27bdfff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
@@ -39,8 +39,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
+    .NodeHeartbeatRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -131,4 +137,28 @@ public class TestProtocolRecords {
           ((NodeHeartbeatResponsePBImpl) record).getProto());
     Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
   }
+
+  @Test
+  public void testNodeHeartBeatRequest() throws IOException {
+    NodeHeartbeatRequest record =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus nodeStatus =
+        Records.newRecord(NodeStatus.class);
+    QueuedContainersStatus queuedContainersStatus = Records.newRecord
+        (QueuedContainersStatus.class);
+    queuedContainersStatus.setEstimatedQueueWaitTime(123);
+    queuedContainersStatus.setWaitQueueLength(321);
+    nodeStatus.setQueuedContainersStatus(queuedContainersStatus);
+    record.setNodeStatus(nodeStatus);
+
+    NodeHeartbeatRequestPBImpl pb = new
+        NodeHeartbeatRequestPBImpl(
+        ((NodeHeartbeatRequestPBImpl) record).getProto());
+
+    Assert.assertEquals(123,
+        pb.getNodeStatus()
+            .getQueuedContainersStatus().getEstimatedQueueWaitTime());
+    Assert.assertEquals(321,
+        pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 5806731..5fad500 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -71,6 +71,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -440,9 +442,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
           createKeepAliveApplicationList(), nodeHealthStatus,
           containersUtilization, nodeUtilization, increasedContainers);
 
+    nodeStatus.setQueuedContainersStatus(getContainerQueueInfo());
     return nodeStatus;
   }
 
+  private QueuedContainersStatus getContainerQueueInfo() {
+    ContainerManagerImpl containerManager =
+        (ContainerManagerImpl) this.context.getContainerManager();
+    ContainersMonitor containersMonitor =
+        containerManager.getContainersMonitor();
+    return containersMonitor.getQueuedContainersStatus();
+  }
   /**
    * Get the aggregated utilization of the containers in this node.
    * @return Resource utilization of all the containers.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 4d69dbf..e54e298 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 
 public interface ContainersMonitor extends Service,
     EventHandler<ContainersMonitorEvent>, ResourceView {
   public ResourceUtilization getContainersUtilization();
+  public QueuedContainersStatus getQueuedContainersStatus();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 446e7a1..e6c3642 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -84,6 +85,7 @@ public class ContainersMonitorImpl extends AbstractService implements
   private ResourceUtilization containersUtilization;
 
   private volatile boolean stopped = false;
+  private QueuedContainersStatus queuedContainersStatus;
 
   public ContainersMonitorImpl(ContainerExecutor exec,
       AsyncDispatcher dispatcher, Context context) {
@@ -96,6 +98,7 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.monitoringThread = new MonitoringThread();
 
     this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
+    this.queuedContainersStatus = QueuedContainersStatus.newInstance();
   }
 
   @Override
@@ -697,6 +700,15 @@ public class ContainersMonitorImpl extends AbstractService implements
     this.containersUtilization = utilization;
   }
 
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    return this.queuedContainersStatus;
+  }
+
+  public void setQueuedContainersStatus(QueuedContainersStatus
+      queuedContainersStatus) {
+    this.queuedContainersStatus = queuedContainersStatus;
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void handle(ContainersMonitorEvent monitoringEvent) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
new file mode 100644
index 0000000..4fd62d0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.List;
+
+public interface ClusterMonitor {
+
+  void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
+
+  void removeNode(RMNode removedRMNode);
+
+  void nodeUpdate(RMNode rmNode);
+
+  void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
deleted file mode 100644
index 5210f7f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
-import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security
-    .AMRMTokenSecretManager;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-
-public class DistributedSchedulingService extends ApplicationMasterService
-    implements DistributedSchedulerProtocol {
-
-  public DistributedSchedulingService(RMContext rmContext,
-      YarnScheduler scheduler) {
-    super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
-  }
-
-  @Override
-  public Server getServer(YarnRPC rpc, Configuration serverConf,
-      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
-    Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
-        addr, serverConf, secretManager,
-        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
-    // To support application running no NMs that DO NOT support
-    // Dist Scheduling...
-    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-        ApplicationMasterProtocolPB.class,
-        ApplicationMasterProtocolService.newReflectiveBlockingService(
-            new ApplicationMasterProtocolPBServiceImpl(this)));
-    return server;
-  }
-
-  @Override
-  public RegisterApplicationMasterResponse registerApplicationMaster
-      (RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.registerApplicationMaster(request);
-  }
-
-  @Override
-  public FinishApplicationMasterResponse finishApplicationMaster
-      (FinishApplicationMasterRequest request) throws YarnException,
-      IOException {
-    return super.finishApplicationMaster(request);
-  }
-
-  @Override
-  public AllocateResponse allocate(AllocateRequest request) throws
-      YarnException, IOException {
-    return super.allocate(request);
-  }
-
-  @Override
-  public DistSchedRegisterResponse
-  registerApplicationMasterForDistributedScheduling(
-      RegisterApplicationMasterRequest request) throws YarnException,
-      IOException {
-    RegisterApplicationMasterResponse response =
-        registerApplicationMaster(request);
-    DistSchedRegisterResponse dsResp = recordFactory
-        .newRecordInstance(DistSchedRegisterResponse.class);
-    dsResp.setRegisterResponse(response);
-    dsResp.setMinAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setMaxAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setIncrAllocatableCapabilty(
-        Resource.newInstance(
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
-                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
-            getConfig().getInt(
-                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
-                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
-        )
-    );
-    dsResp.setContainerTokenExpiryInterval(
-        getConfig().getInt(
-            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
-            YarnConfiguration.
-                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
-    dsResp.setContainerIdStart(
-        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
-
-    // Set nodes to be used for scheduling
-    // TODO: The actual computation of the list will happen in YARN-4412
-    // TODO: Till then, send the complete list
-    dsResp.setNodesForScheduling(
-        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
-    return dsResp;
-  }
-
-  @Override
-  public DistSchedAllocateResponse allocateForDistributedScheduling
-      (AllocateRequest request) throws YarnException, IOException {
-    AllocateResponse response = allocate(request);
-    DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
-        (DistSchedAllocateResponse.class);
-    dsResp.setAllocateResponse(response);
-    dsResp.setNodesForScheduling(
-        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
-    return dsResp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
new file mode 100644
index 0000000..d6a031c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Simple Node selector interface contractually obligating the implementor to
+ * provide the caller with an ordered list of nodes. It also provides
+ * convenience methods to specify criterion
+ */
+public interface NodeSelector {
+
+  /**
+   * SelectionHint allows callers to provide additional suggestions to be
+   * used for selection
+   */
+  class SelectionHint {
+
+    private final NodeId[] nodeIds;
+
+    // minimum number of nodes to include from the Hint in the returned list
+    private final int minToInclude;
+
+    public SelectionHint(Collection<NodeId> nodeIds,
+        int minNodesToInclude) {
+      this.nodeIds = nodeIds.toArray(new NodeId[0]);
+      this.minToInclude = minNodesToInclude;
+    }
+
+    public NodeId[] getNodeIds() {
+      return nodeIds;
+    }
+
+    public int getMinToInclude() {
+      return minToInclude;
+    }
+
+  }
+
+  /**
+   * Select an ordered list of Nodes based on the Implementation
+   * @return Ordered list of Nodes
+   */
+  List<NodeId> selectNodes();
+
+  /**
+   * Select an ordered list of Nodes based on the Implementation. Also
+   * allows callers to specify some hints in terms of specific node or
+   * list of nodes (as well as a how many from the list is needed)
+   * @return Ordered list of Nodes
+   */
+  List<NodeId> selectNodes(Collection<SelectionHint> hints);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index b51f00d..ebf6027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.ExitUtil;
@@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.DistributedSchedulingService;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -118,8 +119,6 @@ import java.security.PrivilegedExceptionAction;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * The ResourceManager is the main class that is a set of components.
@@ -370,7 +369,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-    return new SchedulerEventDispatcher(this.scheduler);
+    return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
   }
 
   protected Dispatcher createDispatcher() {
@@ -725,104 +724,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
   }
 
-  @Private
-  public static class SchedulerEventDispatcher extends AbstractService
-      implements EventHandler<SchedulerEvent> {
-
-    private final ResourceScheduler scheduler;
-    private final BlockingQueue<SchedulerEvent> eventQueue =
-      new LinkedBlockingQueue<SchedulerEvent>();
-    private final Thread eventProcessor;
-    private volatile boolean stopped = false;
-    private boolean shouldExitOnError = false;
-
-    public SchedulerEventDispatcher(ResourceScheduler scheduler) {
-      super(SchedulerEventDispatcher.class.getName());
-      this.scheduler = scheduler;
-      this.eventProcessor = new Thread(new EventProcessor());
-      this.eventProcessor.setName("ResourceManager Event Processor");
-    }
-
-    @Override
-    protected void serviceInit(Configuration conf) throws Exception {
-      this.shouldExitOnError =
-          conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
-            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
-      super.serviceInit(conf);
-    }
-
-    @Override
-    protected void serviceStart() throws Exception {
-      this.eventProcessor.start();
-      super.serviceStart();
-    }
-
-    private final class EventProcessor implements Runnable {
-      @Override
-      public void run() {
-
-        SchedulerEvent event;
-
-        while (!stopped && !Thread.currentThread().isInterrupted()) {
-          try {
-            event = eventQueue.take();
-          } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
-            return; // TODO: Kill RM.
-          }
-
-          try {
-            scheduler.handle(event);
-          } catch (Throwable t) {
-            // An error occurred, but we are shutting down anyway.
-            // If it was an InterruptedException, the very act of 
-            // shutdown could have caused it and is probably harmless.
-            if (stopped) {
-              LOG.warn("Exception during shutdown: ", t);
-              break;
-            }
-            LOG.fatal("Error in handling event type " + event.getType()
-                + " to the scheduler", t);
-            if (shouldExitOnError
-                && !ShutdownHookManager.get().isShutdownInProgress()) {
-              LOG.info("Exiting, bbye..");
-              System.exit(-1);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    protected void serviceStop() throws Exception {
-      this.stopped = true;
-      this.eventProcessor.interrupt();
-      try {
-        this.eventProcessor.join();
-      } catch (InterruptedException e) {
-        throw new YarnRuntimeException(e);
-      }
-      super.serviceStop();
-    }
-
-    @Override
-    public void handle(SchedulerEvent event) {
-      try {
-        int qSize = eventQueue.size();
-        if (qSize !=0 && qSize %1000 == 0) {
-          LOG.info("Size of scheduler event-queue is " + qSize);
-        }
-        int remCapacity = eventQueue.remainingCapacity();
-        if (remCapacity < 1000) {
-          LOG.info("Very low remaining capacity on scheduler event queue: "
-              + remCapacity);
-        }
-        this.eventQueue.put(event);
-      } catch (InterruptedException e) {
-        LOG.info("Interrupted. Trying to exit gracefully.");
-      }
-    }
-  }
 
   @Private
   public static class RMFatalEventDispatcher
@@ -1230,7 +1131,19 @@ public class ResourceManager extends CompositeService implements Recoverable {
     if (this.rmContext.getYarnConfiguration().getBoolean(
         YarnConfiguration.DIST_SCHEDULING_ENABLED,
         YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      return new DistributedSchedulingService(this.rmContext, scheduler);
+      DistributedSchedulingService distributedSchedulingService = new
+          DistributedSchedulingService(this.rmContext, scheduler);
+      EventDispatcher distSchedulerEventDispatcher =
+          new EventDispatcher(distributedSchedulingService,
+              DistributedSchedulingService.class.getName());
+      // Add an event dispoatcher for the DistributedSchedulingService
+      // to handle node updates/additions and removals.
+      // Since the SchedulerEvent is currently a super set of theses,
+      // we register interest for it..
+      addService(distSchedulerEventDispatcher);
+      rmDispatcher.register(SchedulerEventType.class,
+          distSchedulerEventDispatcher);
+      return distributedSchedulingService;
     }
     return new ApplicationMasterService(this.rmContext, scheduler);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d8df9f1..3bf9538 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 
 /**
  * Node managers information on available resources 
@@ -168,4 +169,7 @@ public interface RMNode {
       NodeHeartbeatResponse response);
   
   public List<Container> pullNewlyIncreasedContainers();
+
+  public QueuedContainersStatus getQueuedContainersStatus();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index f4e483b..cdfb91f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
@@ -122,6 +123,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   /* Resource utilization for the node. */
   private ResourceUtilization nodeUtilization;
 
+  /* Container Queue Information for the node.. Used by Distributed Scheduler */
+  private QueuedContainersStatus queuedContainersStatus;
+
   private final ContainerAllocationExpirer containerAllocationExpirer;
   /* set of containers that have just launched */
   private final Set<ContainerId> launchedContainers =
@@ -1095,7 +1099,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
 
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
-
+      rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo());
       NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
           rmNode, statusEvent);
       NodeState initialState = rmNode.getState();
@@ -1353,4 +1357,25 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       writeLock.unlock();
     }
    }
- }
+
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    this.readLock.lock();
+
+    try {
+      return this.queuedContainersStatus;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void setQueuedContainersStatus(QueuedContainersStatus
+      queuedContainersStatus) {
+    this.writeLock.lock();
+
+    try {
+      this.queuedContainersStatus = queuedContainersStatus;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index ba6ac9b..5eeaabe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
@@ -79,6 +80,10 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.logAggregationReportsForApps;
   }
 
+  public QueuedContainersStatus getContainerQueueInfo() {
+    return this.nodeStatus.getQueuedContainersStatus();
+  }
+
   public void setLogAggregationReportsForApps(
       List<LogAggregationReport> logAggregationReportsForApps) {
     this.logAggregationReportsForApps = logAggregationReportsForApps;
@@ -89,4 +94,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.nodeStatus.getIncreasedContainers() == null ?
         Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers();
   }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
new file mode 100644
index 0000000..f0235f7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .NodeResourceUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+    .AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The DistributedSchedulingService is started instead of the
+ * ApplicationMasterService if DistributedScheduling is enabled for the YARN
+ * cluster.
+ * It extends the functionality of the ApplicationMasterService by servicing
+ * clients (AMs and AMRMProxy request interceptors) that understand the
+ * DistributedSchedulingProtocol.
+ */
+public class DistributedSchedulingService extends ApplicationMasterService
+    implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
+
+  private static final Log LOG =
+      LogFactory.getLog(DistributedSchedulingService.class);
+
+  private final ClusterMonitor clusterMonitor;
+  private final NodeSelector nodeSelector;
+
+  private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
+      new ConcurrentHashMap<>();
+
+  public DistributedSchedulingService(RMContext rmContext,
+      YarnScheduler scheduler) {
+    super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+    int k = rmContext.getYarnConfiguration().getInt(
+        YarnConfiguration.DIST_SCHEDULING_TOP_K,
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
+    long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
+    TopKNodeSelector.TopKComparator comparator =
+        TopKNodeSelector.TopKComparator.valueOf(
+            rmContext.getYarnConfiguration().get(
+                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
+                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
+    TopKNodeSelector topKSelector =
+        new TopKNodeSelector(k, topKComputationInterval, comparator);
+    this.clusterMonitor = topKSelector;
+    this.nodeSelector = topKSelector;
+  }
+
+  @Override
+  public Server getServer(YarnRPC rpc, Configuration serverConf,
+      InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+    Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+        addr, serverConf, secretManager,
+        serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+    // To support application running on NMs that DO NOT support
+    // Dist Scheduling... The server multiplexes both the
+    // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
+    ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        ApplicationMasterProtocolPB.class,
+        ApplicationMasterProtocolService.newReflectiveBlockingService(
+            new ApplicationMasterProtocolPBServiceImpl(this)));
+    return server;
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster
+      (RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster
+      (FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return super.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request) throws
+      YarnException, IOException {
+    return super.allocate(request);
+  }
+
+  @Override
+  public DistSchedRegisterResponse
+  registerApplicationMasterForDistributedScheduling(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    RegisterApplicationMasterResponse response =
+        registerApplicationMaster(request);
+    DistSchedRegisterResponse dsResp = recordFactory
+        .newRecordInstance(DistSchedRegisterResponse.class);
+    dsResp.setRegisterResponse(response);
+    dsResp.setMinAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setMaxAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setIncrAllocatableCapabilty(
+        Resource.newInstance(
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
+                YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+            getConfig().getInt(
+                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
+                YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+        )
+    );
+    dsResp.setContainerTokenExpiryInterval(
+        getConfig().getInt(
+            YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
+            YarnConfiguration.
+                DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
+    dsResp.setContainerIdStart(
+        this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+    // Set nodes to be used for scheduling
+    dsResp.setNodesForScheduling(
+        new ArrayList<>(this.nodeSelector.selectNodes()));
+    return dsResp;
+  }
+
+  @Override
+  public DistSchedAllocateResponse allocateForDistributedScheduling
+      (AllocateRequest request) throws YarnException, IOException {
+    AllocateResponse response = allocate(request);
+    DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
+        (DistSchedAllocateResponse.class);
+    dsResp.setAllocateResponse(response);
+    dsResp.setNodesForScheduling(
+        new ArrayList<>(
+            this.nodeSelector.selectNodes(createSelectionHints(request))));
+    return dsResp;
+  }
+
+  /**
+   * Create a selection criteria. Essentially, it checks for requests that has
+   * relax locality set to false and is a Rack local or Node local request.
+   * If so, it will add all Nodes in that rack / node (If multiple NMs are
+   * running on the node)to the selection criteria.  It will also set the
+   * min number of nodes required (1 in case of node local reqs) to be equal
+   * to the number of containers required.. so that the local RM can spread the
+   * containers across the returned nodes.
+   * @param request
+   * @return Collection of SelectionHint
+   */
+  private Collection<NodeSelector.SelectionHint> createSelectionHints(
+      AllocateRequest request) {
+    List<NodeSelector.SelectionHint> retList = new ArrayList<>();
+    // TODO: Add support for node labels (support obtaining a list of nodes
+    //       given a label expression)
+    for (ResourceRequest rr : request.getAskList()) {
+      if (!rr.getRelaxLocality()
+          && rackToNode.containsKey(rr.getResourceName())) {
+        retList.add(new NodeSelector.SelectionHint(
+            rackToNode.get(rr.getResourceName()), rr.getNumContainers()));
+      }
+      if (!rr.getRelaxLocality()
+          && hostToNode.containsKey(rr.getResourceName())) {
+        retList.add(new NodeSelector.SelectionHint(
+            hostToNode.get(rr.getResourceName()), 1));
+      }
+    }
+    return retList;
+  }
+
+  private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+      String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      mapping.putIfAbsent(rackName, new HashSet<NodeId>());
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.add(nodeId);
+      }
+    }
+  }
+
+  private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+      String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.remove(nodeId);
+      }
+    }
+  }
+
+  @Override
+  public void handle(SchedulerEvent event) {
+    switch (event.getType()) {
+      case NODE_ADDED:
+        if (!(event instanceof NodeAddedSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
+        clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
+            nodeAddedEvent.getAddedRMNode());
+        addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+            nodeAddedEvent.getAddedRMNode().getNodeID());
+        addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+            nodeAddedEvent.getAddedRMNode().getNodeID());
+        break;
+      case NODE_REMOVED:
+        if (!(event instanceof NodeRemovedSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeRemovedSchedulerEvent nodeRemovedEvent =
+            (NodeRemovedSchedulerEvent)event;
+        clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+        removeFromMapping(rackToNode,
+            nodeRemovedEvent.getRemovedRMNode().getRackName(),
+            nodeRemovedEvent.getRemovedRMNode().getNodeID());
+        removeFromMapping(hostToNode,
+            nodeRemovedEvent.getRemovedRMNode().getHostName(),
+            nodeRemovedEvent.getRemovedRMNode().getNodeID());
+        break;
+      case NODE_UPDATE:
+        if (!(event instanceof NodeUpdateSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+        clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
+        break;
+      case NODE_RESOURCE_UPDATE:
+        if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+            (NodeResourceUpdateSchedulerEvent)event;
+        clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+            nodeResourceUpdatedEvent.getResourceOption());
+        break;
+
+      // <-- IGNORED EVENTS : START -->
+      case APP_ADDED:
+        break;
+      case APP_REMOVED:
+        break;
+      case APP_ATTEMPT_ADDED:
+        break;
+      case APP_ATTEMPT_REMOVED:
+        break;
+      case CONTAINER_EXPIRED:
+        break;
+      case NODE_LABELS_UPDATE:
+        break;
+      // <-- IGNORED EVENTS : END -->
+      default:
+        LOG.error("Unknown event arrived at DistributedSchedulingService: "
+            + event.toString());
+    }
+  }
+
+}