You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/19 10:59:11 UTC
[1/2] hadoop git commit: YARN-4412. Create ClusterMonitor to compute
ordered list of preferred NMs for OPPORTUNITIC containers (asuresh)
Repository: hadoop
Updated Branches:
refs/heads/yarn-2877 b00875e60 -> 2340511f7
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
new file mode 100644
index 0000000..5aedbed
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TopKNodeSelector implements ClusterMonitor, NodeSelector {
+
+ final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
+
+ enum TopKComparator implements Comparator<ClusterNode> {
+ WAIT_TIME,
+ QUEUE_LENGTH;
+
+ @Override
+ public int compare(ClusterNode o1, ClusterNode o2) {
+ if (getQuant(o1) == getQuant(o2)) {
+ return o1.timestamp < o2.timestamp ? +1 : -1;
+ }
+ return getQuant(o1) > getQuant(o2) ? +1 : -1;
+ }
+
+ private int getQuant(ClusterNode c) {
+ return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
+ }
+ }
+
+ static class ClusterNode {
+ int queueTime = -1;
+ int waitQueueLength = 0;
+ double timestamp;
+ final NodeId nodeId;
+
+ public ClusterNode(NodeId nodeId) {
+ this.nodeId = nodeId;
+ updateTimestamp();
+ }
+
+ public ClusterNode setQueueTime(int queueTime) {
+ this.queueTime = queueTime;
+ return this;
+ }
+
+ public ClusterNode setWaitQueueLength(int queueLength) {
+ this.waitQueueLength = queueLength;
+ return this;
+ }
+
+ public ClusterNode updateTimestamp() {
+ this.timestamp = System.currentTimeMillis();
+ return this;
+ }
+ }
+
+ private final int k;
+ private final List<NodeId> topKNodes;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
+ private final Comparator<ClusterNode> comparator;
+
+ Runnable computeTask = new Runnable() {
+ @Override
+ public void run() {
+ synchronized (topKNodes) {
+ topKNodes.clear();
+ topKNodes.addAll(computeTopKNodes());
+ }
+ }
+ };
+
+ @VisibleForTesting
+ TopKNodeSelector(int k, TopKComparator comparator) {
+ this.k = k;
+ this.topKNodes = new ArrayList<>();
+ this.comparator = comparator;
+ this.scheduledExecutor = null;
+ }
+
+ public TopKNodeSelector(int k, long nodeComputationInterval,
+ TopKComparator comparator) {
+ this.k = k;
+ this.topKNodes = new ArrayList<>();
+ this.scheduledExecutor = Executors.newScheduledThreadPool(1);
+ this.comparator = comparator;
+ this.scheduledExecutor.scheduleAtFixedRate(computeTask,
+ nodeComputationInterval, nodeComputationInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+
+ @Override
+ public void addNode(List<NMContainerStatus> containerStatuses, RMNode
+ rmNode) {
+ LOG.debug("Node added event from: " + rmNode.getNode().getName());
+ // Ignoring this currently : atleast one NODE_UPDATE heartbeat is
+ // required to ensure node eligibility.
+ }
+
+ @Override
+ public void removeNode(RMNode removedRMNode) {
+ LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
+ synchronized (this.clusterNodes) {
+ if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
+ this.clusterNodes.remove(removedRMNode.getNodeID());
+ LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
+ } else {
+ LOG.debug("Node not in list!");
+ }
+ }
+ }
+
+ @Override
+ public void nodeUpdate(RMNode rmNode) {
+ LOG.debug("Node update event from: " + rmNode.getNodeID());
+ QueuedContainersStatus queuedContainersStatus =
+ rmNode.getQueuedContainersStatus();
+ int estimatedQueueWaitTime =
+ queuedContainersStatus.getEstimatedQueueWaitTime();
+ int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
+ // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
+ // UNLESS comparator is based on queue length, in which case, we should add
+ synchronized (this.clusterNodes) {
+ ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
+ if (currentNode == null) {
+ if (estimatedQueueWaitTime != -1
+ || comparator == TopKComparator.QUEUE_LENGTH) {
+ this.clusterNodes.put(rmNode.getNodeID(),
+ new ClusterNode(rmNode.getNodeID())
+ .setQueueTime(estimatedQueueWaitTime)
+ .setWaitQueueLength(waitQueueLength));
+ LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
+ "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+ "wait queue length [" + waitQueueLength + "]");
+ } else {
+ LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
+ "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+ "wait queue length [" + waitQueueLength + "]");
+ }
+ } else {
+ if (estimatedQueueWaitTime != -1
+ || comparator == TopKComparator.QUEUE_LENGTH) {
+ currentNode
+ .setQueueTime(estimatedQueueWaitTime)
+ .setWaitQueueLength(waitQueueLength)
+ .updateTimestamp();
+ LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
+ "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+ "wait queue length [" + waitQueueLength + "]");
+ } else {
+ this.clusterNodes.remove(rmNode.getNodeID());
+ LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
+ "with queue wait time [" + currentNode.queueTime + "] and " +
+ "wait queue length [" + currentNode.waitQueueLength + "]");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
+ LOG.debug("Node resource update event from: " + rmNode.getNodeID());
+ // Ignoring this currently...
+ }
+
+ @Override
+ public List<NodeId> selectNodes() {
+ synchronized (this.topKNodes) {
+ return this.k < this.topKNodes.size() ?
+ new ArrayList<>(this.topKNodes).subList(0, this.k) :
+ new ArrayList<>(this.topKNodes);
+ }
+ }
+
+ @Override
+ public List<NodeId> selectNodes(Collection<SelectionHint> hints) {
+ List<NodeId> retList = selectNodes();
+ Set<NodeId> alreadyAdded = new HashSet<>(retList);
+ TreeSet<ClusterNode> toAdd = new TreeSet<>(this.comparator);
+ synchronized (this.clusterNodes) {
+ for (SelectionHint hint : hints) {
+ // Sort the nodes in the criteria (We need the best nodes)
+ PriorityQueue<ClusterNode> temp =
+ new PriorityQueue<>(hint.getNodeIds().length, this.comparator);
+ for (NodeId n : hint.getNodeIds()) {
+ if (!alreadyAdded.contains(n) &&
+ !toAdd.contains(clusterNodes.get(n))) {
+ temp.add(clusterNodes.get(n));
+ }
+ }
+ // From the Sorted list, select the 'minToInclude' best nodes
+ int numIncluded = 0;
+ while (!temp.isEmpty()) {
+ if (numIncluded < hint.getMinToInclude()) {
+ ClusterNode cn = temp.remove();
+ toAdd.add(cn);
+ alreadyAdded.add(cn.nodeId);
+ numIncluded++;
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ if (toAdd.size() > 0) {
+ ArrayList<NodeId> newList = new ArrayList<>();
+ for (ClusterNode cn : toAdd) {
+ newList.add(cn.nodeId);
+ }
+ newList.addAll(retList);
+ retList = newList;
+ }
+ return retList;
+ }
+
+ private List<NodeId> computeTopKNodes() {
+ synchronized (this.clusterNodes) {
+ ArrayList aList = new ArrayList<>(this.clusterNodes.values());
+ List<NodeId> retList = new ArrayList<>();
+ Object[] nodes = aList.toArray();
+ // Collections.sort would do something similar by calling Arrays.sort
+ // internally but would finally iterate through the input list (aList)
+ // to reset the value of each element.. Since we don't really care about
+ // 'aList', we can use the iteration to create the list of nodeIds which
+ // is what we ultimately care about.
+ Arrays.sort(nodes, (Comparator)comparator);
+ for (int j=0; j < nodes.length; j++) {
+ retList.add(((ClusterNode)nodes[j]).nodeId);
+ }
+ return retList;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 89aff29..f5b61a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -260,6 +260,10 @@ public class MockNodes {
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
+
+ public QueuedContainersStatus getQueuedContainersStatus() {
+ return null;
+ }
};
private static RMNode buildRMNode(int rack, final Resource perNode,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 6182b07..9b4e1c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -90,6 +90,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
+ .DistributedSchedulingService;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index 3fa377e..c45fba8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -167,10 +169,11 @@ public class TestApplicationCleanup {
MockRM rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
- return new SchedulerEventDispatcher(this.scheduler) {
+ return new EventDispatcher<SchedulerEvent>(this.scheduler,
+ this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
- scheduler.handle(event);
+ super.handle(event);
}
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
deleted file mode 100644
index 262fd5a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
- .AMLivelinessMonitor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-
-public class TestDistributedSchedulingService {
-
- // Test if the DistributedSchedulingService can handle both DSProtocol as
- // well as AMProtocol clients
- @Test
- public void testRPCWrapping() throws Exception {
- Configuration conf = new Configuration();
- conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
- .getName());
- YarnRPC rpc = YarnRPC.create(conf);
- String bindAddr = "localhost:0";
- InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
- conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
- final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
- final RMContext rmContext = new RMContextImpl() {
- @Override
- public AMLivelinessMonitor getAMLivelinessMonitor() {
- return null;
- }
- };
- DistributedSchedulingService service =
- new DistributedSchedulingService(rmContext, null) {
- @Override
- public RegisterApplicationMasterResponse registerApplicationMaster
- (RegisterApplicationMasterRequest request) throws
- YarnException, IOException {
- RegisterApplicationMasterResponse resp = factory.newRecordInstance(
- RegisterApplicationMasterResponse.class);
- // Dummy Entry to Assert that we get this object back
- resp.setQueue("dummyQueue");
- return resp;
- }
-
- @Override
- public FinishApplicationMasterResponse finishApplicationMaster
- (FinishApplicationMasterRequest request) throws YarnException,
- IOException {
- FinishApplicationMasterResponse resp = factory.newRecordInstance(
- FinishApplicationMasterResponse.class);
- // Dummy Entry to Assert that we get this object back
- resp.setIsUnregistered(false);
- return resp;
- }
-
- @Override
- public AllocateResponse allocate(AllocateRequest request) throws
- YarnException, IOException {
- AllocateResponse response = factory.newRecordInstance
- (AllocateResponse.class);
- response.setNumClusterNodes(12345);
- return response;
- }
-
- @Override
- public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling
- (RegisterApplicationMasterRequest request) throws
- YarnException, IOException {
- DistSchedRegisterResponse resp = factory.newRecordInstance(
- DistSchedRegisterResponse.class);
- resp.setContainerIdStart(54321l);
- return resp;
- }
-
- @Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- DistSchedAllocateResponse resp =
- factory.newRecordInstance(DistSchedAllocateResponse.class);
- resp.setNodesForScheduling(
- Arrays.asList(NodeId.newInstance("h1", 1234)));
- return resp;
- }
- };
- Server server = service.getServer(rpc, conf, addr, null);
- server.start();
-
- // Verify that the DistrubutedSchedulingService can handle vanilla
- // ApplicationMasterProtocol clients
- RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
- ProtobufRpcEngine.class);
- ApplicationMasterProtocol ampProxy =
- (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
- .class, NetUtils.getConnectAddress(server), conf);
- RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster(
- factory.newRecordInstance(RegisterApplicationMasterRequest.class));
- Assert.assertEquals("dummyQueue", regResp.getQueue());
- FinishApplicationMasterResponse finishResp = ampProxy
- .finishApplicationMaster(factory.newRecordInstance(
- FinishApplicationMasterRequest.class));
- Assert.assertEquals(false, finishResp.getIsUnregistered());
- AllocateResponse allocResp = ampProxy
- .allocate(factory.newRecordInstance(AllocateRequest.class));
- Assert.assertEquals(12345, allocResp.getNumClusterNodes());
-
-
- // Verify that the DistrubutedSchedulingService can handle the
- // DistributedSchedulerProtocol clients as well
- RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
- ProtobufRpcEngine.class);
- DistributedSchedulerProtocol dsProxy =
- (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol
- .class, NetUtils.getConnectAddress(server), conf);
-
- DistSchedRegisterResponse dsRegResp =
- dsProxy.registerApplicationMasterForDistributedScheduling(
- factory.newRecordInstance(RegisterApplicationMasterRequest.class));
- Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
- DistSchedAllocateResponse dsAllocResp =
- dsProxy.allocateForDistributedScheduling(
- factory.newRecordInstance(AllocateRequest.class));
- Assert.assertEquals(
- "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
index d9306dd..429817e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@@ -44,8 +44,8 @@ public class TestRMDispatcher {
AsyncDispatcher rmDispatcher = new AsyncDispatcher();
CapacityScheduler sched = spy(new CapacityScheduler());
YarnConfiguration conf = new YarnConfiguration();
- SchedulerEventDispatcher schedulerDispatcher =
- new SchedulerEventDispatcher(sched);
+ EventDispatcher schedulerDispatcher =
+ new EventDispatcher(sched, sched.getClass().getName());
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
rmDispatcher.init(conf);
rmDispatcher.start();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index e0fd9ab..119ae09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -987,7 +988,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
rm = new MockRM() {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
- return new SchedulerEventDispatcher(this.scheduler) {
+ return new EventDispatcher<SchedulerEvent>(this.scheduler,
+ this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
scheduler.handle(event);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index f4cb3b3..458f94d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
import java.security.PrivilegedExceptionAction;
import java.util.List;
+import org.apache.hadoop.yarn.event.EventDispatcher;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
@@ -65,10 +66,11 @@ public class TestAMRMRPCNodeUpdates {
}
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
- return new SchedulerEventDispatcher(this.scheduler) {
+ return new EventDispatcher<SchedulerEvent>(this.scheduler,
+ this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
- scheduler.handle(event);
+ super.handle(event);
}
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 5035afe..e5ba470 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -423,10 +424,11 @@ public class TestAMRestart {
MockRM rm1 = new MockRM(conf, memStore) {
@Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
- return new SchedulerEventDispatcher(this.scheduler) {
+ return new EventDispatcher<SchedulerEvent>(this.scheduler,
+ this.scheduler.getClass().getName()) {
@Override
public void handle(SchedulerEvent event) {
- scheduler.handle(event);
+ super.handle(event);
}
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
new file mode 100644
index 0000000..35ab6a9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestDistributedSchedulingService.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
+ .AMLivelinessMonitor;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
+ .DistributedSchedulingService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+public class TestDistributedSchedulingService {
+
+ // Test if the DistributedSchedulingService can handle both DSProtocol as
+ // well as AMProtocol clients
+ @Test
+ public void testRPCWrapping() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+ .getName());
+ YarnRPC rpc = YarnRPC.create(conf);
+ String bindAddr = "localhost:0";
+ InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+ conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, addr);
+ final RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
+ final RMContext rmContext = new RMContextImpl() {
+ @Override
+ public AMLivelinessMonitor getAMLivelinessMonitor() {
+ return null;
+ }
+
+ @Override
+ public Configuration getYarnConfiguration() {
+ return new YarnConfiguration();
+ }
+ };
+ DistributedSchedulingService service =
+ new DistributedSchedulingService(rmContext, null) {
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster
+ (RegisterApplicationMasterRequest request) throws
+ YarnException, IOException {
+ RegisterApplicationMasterResponse resp = factory.newRecordInstance(
+ RegisterApplicationMasterResponse.class);
+ // Dummy Entry to Assert that we get this object back
+ resp.setQueue("dummyQueue");
+ return resp;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster
+ (FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ FinishApplicationMasterResponse resp = factory.newRecordInstance(
+ FinishApplicationMasterResponse.class);
+ // Dummy Entry to Assert that we get this object back
+ resp.setIsUnregistered(false);
+ return resp;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request) throws
+ YarnException, IOException {
+ AllocateResponse response = factory.newRecordInstance
+ (AllocateResponse.class);
+ response.setNumClusterNodes(12345);
+ return response;
+ }
+
+ @Override
+ public DistSchedRegisterResponse
+ registerApplicationMasterForDistributedScheduling
+ (RegisterApplicationMasterRequest request) throws
+ YarnException, IOException {
+ DistSchedRegisterResponse resp = factory.newRecordInstance(
+ DistSchedRegisterResponse.class);
+ resp.setContainerIdStart(54321l);
+ return resp;
+ }
+
+ @Override
+ public DistSchedAllocateResponse allocateForDistributedScheduling
+ (AllocateRequest request) throws YarnException, IOException {
+ DistSchedAllocateResponse resp =
+ factory.newRecordInstance(DistSchedAllocateResponse.class);
+ resp.setNodesForScheduling(
+ Arrays.asList(NodeId.newInstance("h1", 1234)));
+ return resp;
+ }
+ };
+ Server server = service.getServer(rpc, conf, addr, null);
+ server.start();
+
+ // Verify that the DistrubutedSchedulingService can handle vanilla
+ // ApplicationMasterProtocol clients
+ RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
+ ProtobufRpcEngine.class);
+ ApplicationMasterProtocol ampProxy =
+ (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol
+ .class, NetUtils.getConnectAddress(server), conf);
+ RegisterApplicationMasterResponse regResp = ampProxy.registerApplicationMaster(
+ factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+ Assert.assertEquals("dummyQueue", regResp.getQueue());
+ FinishApplicationMasterResponse finishResp = ampProxy
+ .finishApplicationMaster(factory.newRecordInstance(
+ FinishApplicationMasterRequest.class));
+ Assert.assertEquals(false, finishResp.getIsUnregistered());
+ AllocateResponse allocResp = ampProxy
+ .allocate(factory.newRecordInstance(AllocateRequest.class));
+ Assert.assertEquals(12345, allocResp.getNumClusterNodes());
+
+
+ // Verify that the DistrubutedSchedulingService can handle the
+ // DistributedSchedulerProtocol clients as well
+ RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+ ProtobufRpcEngine.class);
+ DistributedSchedulerProtocol dsProxy =
+ (DistributedSchedulerProtocol) rpc.getProxy(DistributedSchedulerProtocol
+ .class, NetUtils.getConnectAddress(server), conf);
+
+ DistSchedRegisterResponse dsRegResp =
+ dsProxy.registerApplicationMasterForDistributedScheduling(
+ factory.newRecordInstance(RegisterApplicationMasterRequest.class));
+ Assert.assertEquals(54321l, dsRegResp.getContainerIdStart());
+ DistSchedAllocateResponse dsAllocResp =
+ dsProxy.allocateForDistributedScheduling(
+ factory.newRecordInstance(AllocateRequest.class));
+ Assert.assertEquals(
+ "h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
new file mode 100644
index 0000000..a21ae19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class TestTopKNodeSelector {
+
+ static class FakeNodeId extends NodeId {
+ final String host;
+ final int port;
+
+ public FakeNodeId(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ protected void setHost(String host) {}
+ @Override
+ protected void setPort(int port) {}
+ @Override
+ protected void build() {}
+
+ @Override
+ public String toString() {
+ return host + ":" + port;
+ }
+ }
+
+ @Test
+ public void testQueueTimeSort() {
+ TopKNodeSelector selector = new TopKNodeSelector(5,
+ TopKNodeSelector.TopKComparator.WAIT_TIME);
+ selector.nodeUpdate(createRMNode("h1", 1, 15, 10));
+ selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
+ selector.nodeUpdate(createRMNode("h3", 3, 10, 10));
+ selector.computeTask.run();
+ List<NodeId> nodeIds = selector.selectNodes();
+ System.out.println("1-> " + nodeIds);
+ Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+ Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+ // Now update node3
+ selector.nodeUpdate(createRMNode("h3", 3, 2, 10));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ System.out.println("2-> "+ nodeIds);
+ Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+ Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+ // Now send update with -1 wait time
+ selector.nodeUpdate(createRMNode("h4", 4, -1, 10));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ System.out.println("3-> "+ nodeIds);
+ // No change
+ Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+ Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+ }
+
+ @Test
+ public void testSelectionHints() {
+ TopKNodeSelector selector = new TopKNodeSelector(3,
+ TopKNodeSelector.TopKComparator.WAIT_TIME);
+ selector.nodeUpdate(createRMNode("h1", 1, 100, 10));
+ selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
+ selector.nodeUpdate(createRMNode("h3", 3, 95, 10));
+ selector.nodeUpdate(createRMNode("h4", 4, 10, 10));
+ selector.nodeUpdate(createRMNode("h5", 5, 90, 10));
+ selector.nodeUpdate(createRMNode("h6", 6, 15, 10));
+ selector.computeTask.run();
+
+ List<NodeId> nodeIds = selector.selectNodes();
+ System.out.println("1-> " + nodeIds);
+ Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+ Assert.assertEquals("h4:4", nodeIds.get(1).toString());
+ Assert.assertEquals("h6:6", nodeIds.get(2).toString());
+
+ nodeIds = selector.selectNodes(
+ Arrays.asList(
+ new NodeSelector.SelectionHint(Arrays.asList(
+ (NodeId) new FakeNodeId("h1", 1)
+ ), 1),
+ new NodeSelector.SelectionHint(Arrays.asList(
+ (NodeId) new FakeNodeId("h3", 3)
+ ), 1)
+ ));
+ System.out.println("2-> " + nodeIds);
+ // Enure the hinted nodes are on the top
+ Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+ Assert.assertEquals("h2:2", nodeIds.get(2).toString());
+ Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+ Assert.assertEquals("h6:6", nodeIds.get(4).toString());
+
+ nodeIds = selector.selectNodes(
+ Arrays.asList(
+ new NodeSelector.SelectionHint(Arrays.asList(
+ (NodeId) new FakeNodeId("h1", 1),
+ new FakeNodeId("h3", 3),
+ new FakeNodeId("h5", 5)
+ ), 2)
+ ));
+ System.out.println("3-> " + nodeIds);
+ // Enure the hinted nodes are on the top
+ Assert.assertEquals("h5:5", nodeIds.get(0).toString());
+ Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+ Assert.assertEquals("h2:2", nodeIds.get(2).toString());
+ Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+ Assert.assertEquals("h6:6", nodeIds.get(4).toString());
+ }
+
+ @Test
+ public void testQueueLengthSort() {
+ TopKNodeSelector selector = new TopKNodeSelector(5,
+ TopKNodeSelector.TopKComparator.QUEUE_LENGTH);
+ selector.nodeUpdate(createRMNode("h1", 1, -1, 15));
+ selector.nodeUpdate(createRMNode("h2", 2, -1, 5));
+ selector.nodeUpdate(createRMNode("h3", 3, -1, 10));
+ selector.computeTask.run();
+ List<NodeId> nodeIds = selector.selectNodes();
+ System.out.println("1-> " + nodeIds);
+ Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+ Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+ // Now update node3
+ selector.nodeUpdate(createRMNode("h3", 3, -1, 2));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ System.out.println("2-> "+ nodeIds);
+ Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+ Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+ // Now send update with -1 wait time but valid length
+ selector.nodeUpdate(createRMNode("h4", 4, -1, 20));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ System.out.println("3-> "+ nodeIds);
+ // No change
+ Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+ Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+ Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+ }
+
+ private RMNode createRMNode(String host, int port,
+ int waitTime, int queueLength) {
+ RMNode node1 = Mockito.mock(RMNode.class);
+ NodeId nID1 = new FakeNodeId(host, port);
+ Mockito.when(node1.getNodeID()).thenReturn(nID1);
+ QueuedContainersStatus status1 =
+ Mockito.mock(QueuedContainersStatus.class);
+ Mockito.when(status1.getEstimatedQueueWaitTime())
+ .thenReturn(waitTime);
+ Mockito.when(status1.getWaitQueueLength())
+ .thenReturn(queueLength);
+ Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
+ return node1;
+ }
+}
[2/2] hadoop git commit: YARN-4412. Create ClusterMonitor to compute
ordered list of preferred NMs for OPPORTUNITIC containers (asuresh)
Posted by as...@apache.org.
YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2340511f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2340511f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2340511f
Branch: refs/heads/yarn-2877
Commit: 2340511f730a8af4920d8459e2c6b4aefc94bd43
Parents: b00875e
Author: Arun Suresh <as...@apache.org>
Authored: Thu Feb 11 13:48:36 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Fri Feb 19 01:58:53 2016 -0800
----------------------------------------------------------------------
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 5 +
.../yarn/sls/scheduler/RMNodeWrapper.java | 5 +
hadoop-yarn-project/CHANGES-yarn-2877.txt | 2 +
.../hadoop/yarn/conf/YarnConfiguration.java | 17 +-
.../hadoop/yarn/event/EventDispatcher.java | 137 ++++++++
.../src/main/resources/yarn-default.xml | 30 ++
.../yarn/server/api/records/NodeStatus.java | 9 +
.../api/records/QueuedContainersStatus.java | 45 +++
.../api/records/impl/pb/NodeStatusPBImpl.java | 40 ++-
.../impl/pb/QueuedContainersStatusPBImpl.java | 80 +++++
.../main/proto/yarn_server_common_protos.proto | 6 +
.../protocolrecords/TestProtocolRecords.java | 30 ++
.../nodemanager/NodeStatusUpdaterImpl.java | 10 +
.../monitor/ContainersMonitor.java | 2 +
.../monitor/ContainersMonitorImpl.java | 12 +
.../server/resourcemanager/ClusterMonitor.java | 36 ++
.../DistributedSchedulingService.java | 162 ---------
.../server/resourcemanager/NodeSelector.java | 74 ++++
.../server/resourcemanager/ResourceManager.java | 119 +------
.../server/resourcemanager/rmnode/RMNode.java | 4 +
.../resourcemanager/rmnode/RMNodeImpl.java | 29 +-
.../rmnode/RMNodeStatusEvent.java | 7 +
.../DistributedSchedulingService.java | 341 +++++++++++++++++++
.../scheduler/distributed/TopKNodeSelector.java | 273 +++++++++++++++
.../yarn/server/resourcemanager/MockNodes.java | 6 +-
.../yarn/server/resourcemanager/MockRM.java | 2 +
.../resourcemanager/TestApplicationCleanup.java | 7 +-
.../TestDistributedSchedulingService.java | 170 ---------
.../resourcemanager/TestRMDispatcher.java | 6 +-
.../TestResourceTrackerService.java | 4 +-
.../TestAMRMRPCNodeUpdates.java | 6 +-
.../applicationsmanager/TestAMRestart.java | 6 +-
.../TestDistributedSchedulingService.java | 179 ++++++++++
.../distributed/TestTopKNodeSelector.java | 201 +++++++++++
34 files changed, 1609 insertions(+), 453 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 92d586b..85096ba 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -190,6 +191,10 @@ public class NodeInfo {
return null;
}
+ public QueuedContainersStatus getQueuedContainersStatus() {
+ return null;
+ }
+
@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 2e9cccb..ab82e66 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -179,6 +180,10 @@ public class RMNodeWrapper implements RMNode {
return Collections.EMPTY_LIST;
}
+ public QueuedContainersStatus getQueuedContainersStatus() {
+ return null;
+ }
+
@Override
public ResourceUtilization getAggregatedContainersUtilization() {
return node.getAggregatedContainersUtilization();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/CHANGES-yarn-2877.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES-yarn-2877.txt b/hadoop-yarn-project/CHANGES-yarn-2877.txt
index a147866..e3b4c50 100644
--- a/hadoop-yarn-project/CHANGES-yarn-2877.txt
+++ b/hadoop-yarn-project/CHANGES-yarn-2877.txt
@@ -16,3 +16,5 @@ yarn-2877 distributed scheduling (Unreleased)
YARN-2885. Create AMRMProxy request interceptor and ContainerAllocator
to distribute OPPORTUNISTIC containers to appropriate Nodes (asuresh)
+ YARN-4412. Create ClusterMonitor to compute ordered list of preferred
+ NMs for OPPORTUNITIC containers (asuresh)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index edae3eb..c0265ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -326,8 +326,21 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "distributed-scheduling.incr-vcores";
public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
- /** Container token expiry for container allocated via Distributed
- * Scheduling. */
+ public static final String DIST_SCHEDULING_TOP_K =
+ YARN_PREFIX + "distributed-scheduling.top-k";
+ public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
+
+ public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
+ YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
+ public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
+
+ public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
+ YARN_PREFIX + "distributed-scheduling.top-k-comparator";
+ public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
+ "WAIT_TIME";
+
+ /** Container token expiry for container allocated via
+ * Distributed Scheduling */
public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
YARN_PREFIX + "distributed-scheduling.container-token-expiry";
public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
new file mode 100644
index 0000000..8a5ad92
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.event;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * This is a specialized EventHandler to be used by Services that are expected
+ * handle a large number of events efficiently by ensuring that the caller
+ * thread is not blocked. Events are immediately stored in a BlockingQueue and
+ * a separate dedicated Thread consumes events from the queue and handles
+ * appropriately
+ * @param <T> Type of Event
+ */
+public class EventDispatcher<T extends Event> extends
+ AbstractService implements EventHandler<T> {
+
+ private final EventHandler<T> handler;
+ private final BlockingQueue<T> eventQueue =
+ new LinkedBlockingDeque<>();
+ private final Thread eventProcessor;
+ private volatile boolean stopped = false;
+ private boolean shouldExitOnError = false;
+
+ private static final Log LOG = LogFactory.getLog(EventDispatcher.class);
+
+ private final class EventProcessor implements Runnable {
+ @Override
+ public void run() {
+
+ T event;
+
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.error("Returning, interrupted : " + e);
+ return; // TODO: Kill RM.
+ }
+
+ try {
+ handler.handle(event);
+ } catch (Throwable t) {
+ // An error occurred, but we are shutting down anyway.
+ // If it was an InterruptedException, the very act of
+ // shutdown could have caused it and is probably harmless.
+ if (stopped) {
+ LOG.warn("Exception during shutdown: ", t);
+ break;
+ }
+ LOG.fatal("Error in handling event type " + event.getType()
+ + " to the Event Dispatcher", t);
+ if (shouldExitOnError
+ && !ShutdownHookManager.get().isShutdownInProgress()) {
+ LOG.info("Exiting, bbye..");
+ System.exit(-1);
+ }
+ }
+ }
+ }
+ }
+
+ public EventDispatcher(EventHandler<T> handler, String name) {
+ super(name);
+ this.handler = handler;
+ this.eventProcessor = new Thread(new EventProcessor());
+ this.eventProcessor.setName(getName() + ":Event Processor");
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.shouldExitOnError =
+ conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+ Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ this.eventProcessor.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ this.stopped = true;
+ this.eventProcessor.interrupt();
+ try {
+ this.eventProcessor.join();
+ } catch (InterruptedException e) {
+ throw new YarnRuntimeException(e);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public void handle(T event) {
+ try {
+ int qSize = eventQueue.size();
+ if (qSize !=0 && qSize %1000 == 0) {
+ LOG.info("Size of " + getName() + " event-queue is " + qSize);
+ }
+ int remCapacity = eventQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.info("Very low remaining capacity on " + getName() + "" +
+ "event queue: " + remCapacity);
+ }
+ this.eventQueue.put(event);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted. Trying to exit gracefully.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index d8ea3ad..cbadd63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2240,6 +2240,36 @@
<value>4096</value>
</property>
+ <!-- Distributed Scheduling configuration -->
+ <property>
+ <description>
+ The interval in milliseconds specifying the frequency at which the
+ Distributed Scheduling Cluster Monitor will recomute the top K
+ viable nodes on which OPPORTUNISTIC containers can be scheduled
+ </description>
+ <name>yarn.distributed-scheduling.top-k-compute-interval-ms</name>
+ <value>1000</value>
+ </property>
+
+ <property>
+ <description>
+ The Default comparator used by the Distributed Scheduling Cluster
+ Monitor to order the top K nodes on which OPPORTUNISTIC containers can
+ be scheduled. The allowed values are "WAIT_TIME" and "QUEUE_LENGTH"
+ </description>
+ <name>yarn.distributed-scheduling.top-k-comparator</name>
+ <value>WAIT_TIME</value>
+ </property>
+
+ <property>
+ <description>
+ The max number of nodes returned by the Distributed Scheduling Cluster
+ Monitor. (The value of K in top-K)
+ </description>
+ <name>yarn.distributed-scheduling.top-k</name>
+ <value>10</value>
+ </property>
+
<!-- Node Labels Configuration -->
<property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 836cd4b..89e054b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -122,4 +122,13 @@ public abstract class NodeStatus {
@Unstable
public abstract void setIncreasedContainers(
List<Container> increasedContainers);
+
+ @Private
+ @Unstable
+ public abstract QueuedContainersStatus getQueuedContainersStatus();
+
+ @Private
+ @Unstable
+ public abstract void setQueuedContainersStatus(
+ QueuedContainersStatus queuedContainersStatus);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
new file mode 100644
index 0000000..a7f0ece
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>
+ * <code>QueuedContainersStatus</code> captures information pertaining to the
+ * state of execution of the Queueable containers within a node.
+ * </p>
+ */
+@Private
+@Evolving
+public abstract class QueuedContainersStatus {
+ public static QueuedContainersStatus newInstance() {
+ return Records.newRecord(QueuedContainersStatus.class);
+ }
+
+ public abstract int getEstimatedQueueWaitTime();
+
+ public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
+
+ public abstract int getWaitQueueLength();
+
+ public abstract void setWaitQueueLength(int queueWaitTime);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 8dd4832..9a9a83a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -33,14 +33,17 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
+
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -400,6 +403,27 @@ public class NodeStatusPBImpl extends NodeStatus {
this.increasedContainers = increasedContainers;
}
+ @Override
+ public QueuedContainersStatus getQueuedContainersStatus() {
+ NodeStatusProtoOrBuilder p =
+ this.viaProto ? this.proto : this.builder;
+ if (!p.hasQueuedContainerStatus()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getQueuedContainerStatus());
+ }
+
+ @Override
+ public void setQueuedContainersStatus(QueuedContainersStatus queuedContainersStatus) {
+ maybeInitBuilder();
+ if (queuedContainersStatus == null) {
+ this.builder.clearQueuedContainerStatus();
+ return;
+ }
+ this.builder.setQueuedContainerStatus(
+ convertToProtoFormat(queuedContainersStatus));
+ }
+
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
@@ -433,15 +457,25 @@ public class NodeStatusPBImpl extends NodeStatus {
return ((ApplicationIdPBImpl)c).getProto();
}
- private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
+ private YarnProtos.ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
return ((ResourceUtilizationPBImpl) r).getProto();
}
private ResourceUtilizationPBImpl convertFromProtoFormat(
- ResourceUtilizationProto p) {
+ YarnProtos.ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p);
}
+ private YarnServerCommonProtos.QueuedContainersStatusProto convertToProtoFormat(
+ QueuedContainersStatus r) {
+ return ((QueuedContainersStatusPBImpl) r).getProto();
+ }
+
+ private QueuedContainersStatus convertFromProtoFormat(
+ YarnServerCommonProtos.QueuedContainersStatusProto p) {
+ return new QueuedContainersStatusPBImpl(p);
+ }
+
private ContainerPBImpl convertFromProtoFormat(
ContainerProto c) {
return new ContainerPBImpl(c);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
new file mode 100644
index 0000000..54470f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+
+public class QueuedContainersStatusPBImpl extends QueuedContainersStatus {
+
+ private YarnServerCommonProtos.QueuedContainersStatusProto proto =
+ YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance();
+ private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ public QueuedContainersStatusPBImpl() {
+ builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder();
+ }
+
+ public QueuedContainersStatusPBImpl(YarnServerCommonProtos
+ .QueuedContainersStatusProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public YarnServerCommonProtos.QueuedContainersStatusProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder =
+ YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public int getEstimatedQueueWaitTime() {
+ YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getEstimatedQueueWaitTime();
+ }
+
+ @Override
+ public void setEstimatedQueueWaitTime(int queueWaitTime) {
+ maybeInitBuilder();
+ builder.setEstimatedQueueWaitTime(queueWaitTime);
+ }
+
+ @Override
+ public int getWaitQueueLength() {
+ YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getWaitQueueLength();
+ }
+
+ @Override
+ public void setWaitQueueLength(int waitQueueLength) {
+ maybeInitBuilder();
+ builder.setWaitQueueLength(waitQueueLength);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 77064a0..c23d557 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -39,6 +39,12 @@ message NodeStatusProto {
optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8;
+ optional QueuedContainersStatusProto queued_container_status = 9;
+}
+
+message QueuedContainersStatusProto {
+ optional int32 estimated_queue_wait_time = 1;
+ optional int32 wait_queue_length = 2;
}
message MasterKeyProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
index 86e49f0..27bdfff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
@@ -39,8 +39,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
+ .NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -131,4 +137,28 @@ public class TestProtocolRecords {
((NodeHeartbeatResponsePBImpl) record).getProto());
Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
}
+
+ @Test
+ public void testNodeHeartBeatRequest() throws IOException {
+ NodeHeartbeatRequest record =
+ Records.newRecord(NodeHeartbeatRequest.class);
+ NodeStatus nodeStatus =
+ Records.newRecord(NodeStatus.class);
+ QueuedContainersStatus queuedContainersStatus = Records.newRecord
+ (QueuedContainersStatus.class);
+ queuedContainersStatus.setEstimatedQueueWaitTime(123);
+ queuedContainersStatus.setWaitQueueLength(321);
+ nodeStatus.setQueuedContainersStatus(queuedContainersStatus);
+ record.setNodeStatus(nodeStatus);
+
+ NodeHeartbeatRequestPBImpl pb = new
+ NodeHeartbeatRequestPBImpl(
+ ((NodeHeartbeatRequestPBImpl) record).getProto());
+
+ Assert.assertEquals(123,
+ pb.getNodeStatus()
+ .getQueuedContainersStatus().getEstimatedQueueWaitTime());
+ Assert.assertEquals(321,
+ pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 5806731..5fad500 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -71,6 +71,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -440,9 +442,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
createKeepAliveApplicationList(), nodeHealthStatus,
containersUtilization, nodeUtilization, increasedContainers);
+ nodeStatus.setQueuedContainersStatus(getContainerQueueInfo());
return nodeStatus;
}
+ private QueuedContainersStatus getContainerQueueInfo() {
+ ContainerManagerImpl containerManager =
+ (ContainerManagerImpl) this.context.getContainerManager();
+ ContainersMonitor containersMonitor =
+ containerManager.getContainersMonitor();
+ return containersMonitor.getQueuedContainersStatus();
+ }
/**
* Get the aggregated utilization of the containers in this node.
* @return Resource utilization of all the containers.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 4d69dbf..e54e298 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
public interface ContainersMonitor extends Service,
EventHandler<ContainersMonitorEvent>, ResourceView {
public ResourceUtilization getContainersUtilization();
+ public QueuedContainersStatus getQueuedContainersStatus();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 446e7a1..e6c3642 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -84,6 +85,7 @@ public class ContainersMonitorImpl extends AbstractService implements
private ResourceUtilization containersUtilization;
private volatile boolean stopped = false;
+ private QueuedContainersStatus queuedContainersStatus;
public ContainersMonitorImpl(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
@@ -96,6 +98,7 @@ public class ContainersMonitorImpl extends AbstractService implements
this.monitoringThread = new MonitoringThread();
this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
+ this.queuedContainersStatus = QueuedContainersStatus.newInstance();
}
@Override
@@ -697,6 +700,15 @@ public class ContainersMonitorImpl extends AbstractService implements
this.containersUtilization = utilization;
}
+ public QueuedContainersStatus getQueuedContainersStatus() {
+ return this.queuedContainersStatus;
+ }
+
+ public void setQueuedContainersStatus(QueuedContainersStatus
+ queuedContainersStatus) {
+ this.queuedContainersStatus = queuedContainersStatus;
+ }
+
@Override
@SuppressWarnings("unchecked")
public void handle(ContainersMonitorEvent monitoringEvent) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
new file mode 100644
index 0000000..4fd62d0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.List;
+
+public interface ClusterMonitor {
+
+ void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
+
+ void removeNode(RMNode removedRMNode);
+
+ void nodeUpdate(RMNode rmNode);
+
+ void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
deleted file mode 100644
index 5210f7f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
-import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
-
-
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security
- .AMRMTokenSecretManager;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-
-public class DistributedSchedulingService extends ApplicationMasterService
- implements DistributedSchedulerProtocol {
-
- public DistributedSchedulingService(RMContext rmContext,
- YarnScheduler scheduler) {
- super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
- }
-
- @Override
- public Server getServer(YarnRPC rpc, Configuration serverConf,
- InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
- Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
- addr, serverConf, secretManager,
- serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
- // To support application running no NMs that DO NOT support
- // Dist Scheduling...
- ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
- ApplicationMasterProtocolPB.class,
- ApplicationMasterProtocolService.newReflectiveBlockingService(
- new ApplicationMasterProtocolPBServiceImpl(this)));
- return server;
- }
-
- @Override
- public RegisterApplicationMasterResponse registerApplicationMaster
- (RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
- return super.registerApplicationMaster(request);
- }
-
- @Override
- public FinishApplicationMasterResponse finishApplicationMaster
- (FinishApplicationMasterRequest request) throws YarnException,
- IOException {
- return super.finishApplicationMaster(request);
- }
-
- @Override
- public AllocateResponse allocate(AllocateRequest request) throws
- YarnException, IOException {
- return super.allocate(request);
- }
-
- @Override
- public DistSchedRegisterResponse
- registerApplicationMasterForDistributedScheduling(
- RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
- RegisterApplicationMasterResponse response =
- registerApplicationMaster(request);
- DistSchedRegisterResponse dsResp = recordFactory
- .newRecordInstance(DistSchedRegisterResponse.class);
- dsResp.setRegisterResponse(response);
- dsResp.setMinAllocatableCapabilty(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
- )
- );
- dsResp.setMaxAllocatableCapabilty(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
- YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
- )
- );
- dsResp.setIncrAllocatableCapabilty(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
- YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
- YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
- )
- );
- dsResp.setContainerTokenExpiryInterval(
- getConfig().getInt(
- YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
- YarnConfiguration.
- DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
- dsResp.setContainerIdStart(
- this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
-
- // Set nodes to be used for scheduling
- // TODO: The actual computation of the list will happen in YARN-4412
- // TODO: Till then, send the complete list
- dsResp.setNodesForScheduling(
- new ArrayList<>(this.rmContext.getRMNodes().keySet()));
- return dsResp;
- }
-
- @Override
- public DistSchedAllocateResponse allocateForDistributedScheduling
- (AllocateRequest request) throws YarnException, IOException {
- AllocateResponse response = allocate(request);
- DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
- (DistSchedAllocateResponse.class);
- dsResp.setAllocateResponse(response);
- dsResp.setNodesForScheduling(
- new ArrayList<>(this.rmContext.getRMNodes().keySet()));
- return dsResp;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
new file mode 100644
index 0000000..d6a031c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodeSelector.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Simple Node selector interface contractually obligating the implementor to
+ * provide the caller with an ordered list of nodes. It also provides
+ * convenience methods to specify criterion
+ */
+public interface NodeSelector {
+
+ /**
+ * SelectionHint allows callers to provide additional suggestions to be
+ * used for selection
+ */
+ class SelectionHint {
+
+ private final NodeId[] nodeIds;
+
+ // minimum number of nodes to include from the Hint in the returned list
+ private final int minToInclude;
+
+ public SelectionHint(Collection<NodeId> nodeIds,
+ int minNodesToInclude) {
+ this.nodeIds = nodeIds.toArray(new NodeId[0]);
+ this.minToInclude = minNodesToInclude;
+ }
+
+ public NodeId[] getNodeIds() {
+ return nodeIds;
+ }
+
+ public int getMinToInclude() {
+ return minToInclude;
+ }
+
+ }
+
+ /**
+ * Select an ordered list of Nodes based on the Implementation
+ * @return Ordered list of Nodes
+ */
+ List<NodeId> selectNodes();
+
+ /**
+ * Select an ordered list of Nodes based on the Implementation. Also
+ * allows callers to specify some hints in terms of specific node or
+ * list of nodes (as well as a how many from the list is needed)
+ * @return Ordered list of Nodes
+ */
+ List<NodeId> selectNodes(Collection<SelectionHint> hints);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index b51f00d..ebf6027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ExitUtil;
@@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@@ -90,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.DistributedSchedulingService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -118,8 +119,6 @@ import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* The ResourceManager is the main class that is a set of components.
@@ -370,7 +369,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
- return new SchedulerEventDispatcher(this.scheduler);
+ return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
}
protected Dispatcher createDispatcher() {
@@ -725,104 +724,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
}
- @Private
- public static class SchedulerEventDispatcher extends AbstractService
- implements EventHandler<SchedulerEvent> {
-
- private final ResourceScheduler scheduler;
- private final BlockingQueue<SchedulerEvent> eventQueue =
- new LinkedBlockingQueue<SchedulerEvent>();
- private final Thread eventProcessor;
- private volatile boolean stopped = false;
- private boolean shouldExitOnError = false;
-
- public SchedulerEventDispatcher(ResourceScheduler scheduler) {
- super(SchedulerEventDispatcher.class.getName());
- this.scheduler = scheduler;
- this.eventProcessor = new Thread(new EventProcessor());
- this.eventProcessor.setName("ResourceManager Event Processor");
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- this.shouldExitOnError =
- conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
- Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- this.eventProcessor.start();
- super.serviceStart();
- }
-
- private final class EventProcessor implements Runnable {
- @Override
- public void run() {
-
- SchedulerEvent event;
-
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- try {
- event = eventQueue.take();
- } catch (InterruptedException e) {
- LOG.error("Returning, interrupted : " + e);
- return; // TODO: Kill RM.
- }
-
- try {
- scheduler.handle(event);
- } catch (Throwable t) {
- // An error occurred, but we are shutting down anyway.
- // If it was an InterruptedException, the very act of
- // shutdown could have caused it and is probably harmless.
- if (stopped) {
- LOG.warn("Exception during shutdown: ", t);
- break;
- }
- LOG.fatal("Error in handling event type " + event.getType()
- + " to the scheduler", t);
- if (shouldExitOnError
- && !ShutdownHookManager.get().isShutdownInProgress()) {
- LOG.info("Exiting, bbye..");
- System.exit(-1);
- }
- }
- }
- }
- }
-
- @Override
- protected void serviceStop() throws Exception {
- this.stopped = true;
- this.eventProcessor.interrupt();
- try {
- this.eventProcessor.join();
- } catch (InterruptedException e) {
- throw new YarnRuntimeException(e);
- }
- super.serviceStop();
- }
-
- @Override
- public void handle(SchedulerEvent event) {
- try {
- int qSize = eventQueue.size();
- if (qSize !=0 && qSize %1000 == 0) {
- LOG.info("Size of scheduler event-queue is " + qSize);
- }
- int remCapacity = eventQueue.remainingCapacity();
- if (remCapacity < 1000) {
- LOG.info("Very low remaining capacity on scheduler event queue: "
- + remCapacity);
- }
- this.eventQueue.put(event);
- } catch (InterruptedException e) {
- LOG.info("Interrupted. Trying to exit gracefully.");
- }
- }
- }
@Private
public static class RMFatalEventDispatcher
@@ -1230,7 +1131,19 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (this.rmContext.getYarnConfiguration().getBoolean(
YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
- return new DistributedSchedulingService(this.rmContext, scheduler);
+ DistributedSchedulingService distributedSchedulingService = new
+ DistributedSchedulingService(this.rmContext, scheduler);
+ EventDispatcher distSchedulerEventDispatcher =
+ new EventDispatcher(distributedSchedulingService,
+ DistributedSchedulingService.class.getName());
+ // Add an event dispoatcher for the DistributedSchedulingService
+ // to handle node updates/additions and removals.
+ // Since the SchedulerEvent is currently a super set of theses,
+ // we register interest for it..
+ addService(distSchedulerEventDispatcher);
+ rmDispatcher.register(SchedulerEventType.class,
+ distSchedulerEventDispatcher);
+ return distributedSchedulingService;
}
return new ApplicationMasterService(this.rmContext, scheduler);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d8df9f1..3bf9538 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
/**
* Node managers information on available resources
@@ -168,4 +169,7 @@ public interface RMNode {
NodeHeartbeatResponse response);
public List<Container> pullNewlyIncreasedContainers();
+
+ public QueuedContainersStatus getQueuedContainersStatus();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index f4e483b..cdfb91f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
@@ -122,6 +123,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* Resource utilization for the node. */
private ResourceUtilization nodeUtilization;
+ /* Container Queue Information for the node.. Used by Distributed Scheduler */
+ private QueuedContainersStatus queuedContainersStatus;
+
private final ContainerAllocationExpirer containerAllocationExpirer;
/* set of containers that have just launched */
private final Set<ContainerId> launchedContainers =
@@ -1095,7 +1099,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
-
+ rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo());
NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
rmNode, statusEvent);
NodeState initialState = rmNode.getState();
@@ -1353,4 +1357,25 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
writeLock.unlock();
}
}
- }
+
+ public QueuedContainersStatus getQueuedContainersStatus() {
+ this.readLock.lock();
+
+ try {
+ return this.queuedContainersStatus;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void setQueuedContainersStatus(QueuedContainersStatus
+ queuedContainersStatus) {
+ this.writeLock.lock();
+
+ try {
+ this.queuedContainersStatus = queuedContainersStatus;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index ba6ac9b..5eeaabe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -79,6 +80,10 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.logAggregationReportsForApps;
}
+ public QueuedContainersStatus getContainerQueueInfo() {
+ return this.nodeStatus.getQueuedContainersStatus();
+ }
+
public void setLogAggregationReportsForApps(
List<LogAggregationReport> logAggregationReportsForApps) {
this.logAggregationReportsForApps = logAggregationReportsForApps;
@@ -89,4 +94,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.nodeStatus.getIncreasedContainers() == null ?
Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers();
}
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2340511f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
new file mode 100644
index 0000000..f0235f7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/DistributedSchedulingService.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+ .NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+ .NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+ .NodeResourceUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+ .NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+ .SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.security
+ .AMRMTokenSecretManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The DistributedSchedulingService is started instead of the
+ * ApplicationMasterService if DistributedScheduling is enabled for the YARN
+ * cluster.
+ * It extends the functionality of the ApplicationMasterService by servicing
+ * clients (AMs and AMRMProxy request interceptors) that understand the
+ * DistributedSchedulingProtocol.
+ */
+public class DistributedSchedulingService extends ApplicationMasterService
+ implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
+
+ private static final Log LOG =
+ LogFactory.getLog(DistributedSchedulingService.class);
+
+ private final ClusterMonitor clusterMonitor;
+ private final NodeSelector nodeSelector;
+
+ private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
+ new ConcurrentHashMap<>();
+
+ public DistributedSchedulingService(RMContext rmContext,
+ YarnScheduler scheduler) {
+ super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+ int k = rmContext.getYarnConfiguration().getInt(
+ YarnConfiguration.DIST_SCHEDULING_TOP_K,
+ YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
+ long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
+ YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
+ YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
+ TopKNodeSelector.TopKComparator comparator =
+ TopKNodeSelector.TopKComparator.valueOf(
+ rmContext.getYarnConfiguration().get(
+ YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
+ YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
+ TopKNodeSelector topKSelector =
+ new TopKNodeSelector(k, topKComputationInterval, comparator);
+ this.clusterMonitor = topKSelector;
+ this.nodeSelector = topKSelector;
+ }
+
+ @Override
+ public Server getServer(YarnRPC rpc, Configuration serverConf,
+ InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
+ Server server = rpc.getServer(DistributedSchedulerProtocol.class, this,
+ addr, serverConf, secretManager,
+ serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
+ // To support application running on NMs that DO NOT support
+ // Dist Scheduling... The server multiplexes both the
+ // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
+ ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ ApplicationMasterProtocolPB.class,
+ ApplicationMasterProtocolService.newReflectiveBlockingService(
+ new ApplicationMasterProtocolPBServiceImpl(this)));
+ return server;
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return super.registerApplicationMaster(request);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster
+ (FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return super.finishApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request) throws
+ YarnException, IOException {
+ return super.allocate(request);
+ }
+
+ @Override
+ public DistSchedRegisterResponse
+ registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ RegisterApplicationMasterResponse response =
+ registerApplicationMaster(request);
+ DistSchedRegisterResponse dsResp = recordFactory
+ .newRecordInstance(DistSchedRegisterResponse.class);
+ dsResp.setRegisterResponse(response);
+ dsResp.setMinAllocatableCapabilty(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY,
+ YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_MIN_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setMaxAllocatableCapabilty(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY,
+ YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_MAX_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setIncrAllocatableCapabilty(
+ Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY,
+ YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_INCR_VCORES,
+ YarnConfiguration.DIST_SCHEDULING_INCR_VCORES_DEFAULT)
+ )
+ );
+ dsResp.setContainerTokenExpiryInterval(
+ getConfig().getInt(
+ YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS,
+ YarnConfiguration.
+ DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT));
+ dsResp.setContainerIdStart(
+ this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
+
+ // Set nodes to be used for scheduling
+ dsResp.setNodesForScheduling(
+ new ArrayList<>(this.nodeSelector.selectNodes()));
+ return dsResp;
+ }
+
+ @Override
+ public DistSchedAllocateResponse allocateForDistributedScheduling
+ (AllocateRequest request) throws YarnException, IOException {
+ AllocateResponse response = allocate(request);
+ DistSchedAllocateResponse dsResp = recordFactory.newRecordInstance
+ (DistSchedAllocateResponse.class);
+ dsResp.setAllocateResponse(response);
+ dsResp.setNodesForScheduling(
+ new ArrayList<>(
+ this.nodeSelector.selectNodes(createSelectionHints(request))));
+ return dsResp;
+ }
+
+ /**
+ * Create a selection criteria. Essentially, it checks for requests that has
+ * relax locality set to false and is a Rack local or Node local request.
+ * If so, it will add all Nodes in that rack / node (If multiple NMs are
+ * running on the node)to the selection criteria. It will also set the
+ * min number of nodes required (1 in case of node local reqs) to be equal
+ * to the number of containers required.. so that the local RM can spread the
+ * containers across the returned nodes.
+ * @param request
+ * @return Collection of SelectionHint
+ */
+ private Collection<NodeSelector.SelectionHint> createSelectionHints(
+ AllocateRequest request) {
+ List<NodeSelector.SelectionHint> retList = new ArrayList<>();
+ // TODO: Add support for node labels (support obtaining a list of nodes
+ // given a label expression)
+ for (ResourceRequest rr : request.getAskList()) {
+ if (!rr.getRelaxLocality()
+ && rackToNode.containsKey(rr.getResourceName())) {
+ retList.add(new NodeSelector.SelectionHint(
+ rackToNode.get(rr.getResourceName()), rr.getNumContainers()));
+ }
+ if (!rr.getRelaxLocality()
+ && hostToNode.containsKey(rr.getResourceName())) {
+ retList.add(new NodeSelector.SelectionHint(
+ hostToNode.get(rr.getResourceName()), 1));
+ }
+ }
+ return retList;
+ }
+
+ private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+ String rackName, NodeId nodeId) {
+ if (rackName != null) {
+ mapping.putIfAbsent(rackName, new HashSet<NodeId>());
+ Set<NodeId> nodeIds = mapping.get(rackName);
+ synchronized (nodeIds) {
+ nodeIds.add(nodeId);
+ }
+ }
+ }
+
+ private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+ String rackName, NodeId nodeId) {
+ if (rackName != null) {
+ Set<NodeId> nodeIds = mapping.get(rackName);
+ synchronized (nodeIds) {
+ nodeIds.remove(nodeId);
+ }
+ }
+ }
+
+ @Override
+ public void handle(SchedulerEvent event) {
+ switch (event.getType()) {
+ case NODE_ADDED:
+ if (!(event instanceof NodeAddedSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
+ clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
+ addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+ nodeAddedEvent.getAddedRMNode().getNodeID());
+ addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+ nodeAddedEvent.getAddedRMNode().getNodeID());
+ break;
+ case NODE_REMOVED:
+ if (!(event instanceof NodeRemovedSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ NodeRemovedSchedulerEvent nodeRemovedEvent =
+ (NodeRemovedSchedulerEvent)event;
+ clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+ removeFromMapping(rackToNode,
+ nodeRemovedEvent.getRemovedRMNode().getRackName(),
+ nodeRemovedEvent.getRemovedRMNode().getNodeID());
+ removeFromMapping(hostToNode,
+ nodeRemovedEvent.getRemovedRMNode().getHostName(),
+ nodeRemovedEvent.getRemovedRMNode().getNodeID());
+ break;
+ case NODE_UPDATE:
+ if (!(event instanceof NodeUpdateSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+ clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
+ break;
+ case NODE_RESOURCE_UPDATE:
+ if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+ throw new RuntimeException("Unexpected event type: " + event);
+ }
+ NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+ (NodeResourceUpdateSchedulerEvent)event;
+ clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+ nodeResourceUpdatedEvent.getResourceOption());
+ break;
+
+ // <-- IGNORED EVENTS : START -->
+ case APP_ADDED:
+ break;
+ case APP_REMOVED:
+ break;
+ case APP_ATTEMPT_ADDED:
+ break;
+ case APP_ATTEMPT_REMOVED:
+ break;
+ case CONTAINER_EXPIRED:
+ break;
+ case NODE_LABELS_UPDATE:
+ break;
+ // <-- IGNORED EVENTS : END -->
+ default:
+ LOG.error("Unknown event arrived at DistributedSchedulingService: "
+ + event.toString());
+ }
+ }
+
+}