You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/12/04 06:23:32 UTC
hadoop git commit: YARN-7587. Skip dispatching opportunistic
containers to nodes whose queue is already full. (Weiwei Yang via asuresh)
Repository: hadoop
Updated Branches:
refs/heads/trunk 81f6e46b2 -> 37ca41695
YARN-7587. Skip dispatching opportunistic containers to nodes whose queue is already full. (Weiwei Yang via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/37ca4169
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/37ca4169
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/37ca4169
Branch: refs/heads/trunk
Commit: 37ca4169508c3003dbe9044fefd37eb8cd8c0503
Parents: 81f6e46
Author: Arun Suresh <as...@apache.org>
Authored: Sun Dec 3 22:22:01 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Sun Dec 3 22:22:01 2017 -0800
----------------------------------------------------------------------
.../records/OpportunisticContainersStatus.java | 19 +++++++++++++++++
.../pb/OpportunisticContainersStatusPBImpl.java | 13 ++++++++++++
.../main/proto/yarn_server_common_protos.proto | 1 +
.../scheduler/ContainerScheduler.java | 12 +++++++++++
.../distributed/NodeQueueLoadMonitor.java | 22 ++++++++++++++++++--
.../distributed/TestNodeQueueLoadMonitor.java | 21 +++++++++++++++++++
6 files changed, 86 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37ca4169/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
index 732db2a..c8a81a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java
@@ -149,4 +149,23 @@ public abstract class OpportunisticContainersStatus {
@Unstable
public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
+
+ /**
+ * Gets the capacity of the opportunistic containers queue on the node.
+ *
+ * @return queue capacity.
+ */
+ @Private
+ @Unstable
+ public abstract int getOpportQueueCapacity();
+
+
+ /**
+ * Sets the capacity of the opportunistic containers queue on the node.
+ *
+ * @param queueCapacity queue capacity.
+ */
+ @Private
+ @Unstable
+ public abstract void setOpportQueueCapacity(int queueCapacity);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37ca4169/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
index 8399713..5d1005c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java
@@ -136,4 +136,17 @@ public class OpportunisticContainersStatusPBImpl
maybeInitBuilder();
builder.setEstimatedQueueWaitTime(queueWaitTime);
}
+
+ @Override
+ public int getOpportQueueCapacity() {
+ YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p =
+ viaProto ? proto : builder;
+ return p.getOpportQueueCapacity();
+ }
+
+ @Override
+ public void setOpportQueueCapacity(int maxOpportQueueLength) {
+ maybeInitBuilder();
+ builder.setOpportQueueCapacity(maxOpportQueueLength);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37ca4169/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 98b172d..8200808 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -49,6 +49,7 @@ message OpportunisticContainersStatusProto {
optional int32 queued_opport_containers = 4;
optional int32 wait_queue_length = 5;
optional int32 estimated_queue_wait_time = 6;
+ optional int32 opport_queue_capacity = 7;
}
message MasterKeyProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37ca4169/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 76da37c..d9b713f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -68,6 +68,7 @@ public class ContainerScheduler extends AbstractService implements
LoggerFactory.getLogger(ContainerScheduler.class);
private final Context context;
+ // Capacity of the queue for opportunistic Containers.
private final int maxOppQueueLength;
// Queue of Guaranteed Containers waiting for resources to run
@@ -258,6 +259,15 @@ public class ContainerScheduler extends AbstractService implements
+ this.queuedOpportunisticContainers.size();
}
+ /**
+ * Return the capacity of the queue for opportunistic containers
+ * on this node.
+ * @return queue capacity.
+ */
+ public int getOpportunisticQueueCapacity() {
+ return this.maxOppQueueLength;
+ }
+
@VisibleForTesting
public int getNumQueuedGuaranteedContainers() {
return this.queuedGuaranteedContainers.size();
@@ -290,6 +300,8 @@ public class ContainerScheduler extends AbstractService implements
metrics.getAllocatedOpportunisticVCores());
this.opportunisticContainersStatus.setRunningOpportContainers(
metrics.getRunningOpportunisticContainers());
+ this.opportunisticContainersStatus.setOpportQueueCapacity(
+ getOpportunisticQueueCapacity());
return this.opportunisticContainersStatus;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37ca4169/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index ed0ee1e..e989099 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -75,6 +75,7 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
int queueWaitTime = -1;
double timestamp;
final NodeId nodeId;
+ private int queueCapacity = 0;
public ClusterNode(NodeId nodeId) {
this.nodeId = nodeId;
@@ -95,6 +96,16 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
this.timestamp = System.currentTimeMillis();
return this;
}
+
+ public ClusterNode setQueueCapacity(int capacity) {
+ this.queueCapacity = capacity;
+ return this;
+ }
+
+ public boolean isQueueFull() {
+ return this.queueCapacity > 0 &&
+ this.queueLength >= this.queueCapacity;
+ }
}
private final ScheduledExecutorService scheduledExecutor;
@@ -207,6 +218,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
opportunisticContainersStatus =
OpportunisticContainersStatus.newInstance();
}
+ int opportQueueCapacity =
+ opportunisticContainersStatus.getOpportQueueCapacity();
int estimatedQueueWaitTime =
opportunisticContainersStatus.getEstimatedQueueWaitTime();
int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength();
@@ -222,7 +235,8 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
this.clusterNodes.put(rmNode.getNodeID(),
new ClusterNode(rmNode.getNodeID())
.setQueueWaitTime(estimatedQueueWaitTime)
- .setQueueLength(waitQueueLength));
+ .setQueueLength(waitQueueLength)
+ .setQueueCapacity(opportQueueCapacity));
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
@@ -301,7 +315,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
// is what we ultimately care about.
Arrays.sort(nodes, (Comparator)comparator);
for (int j=0; j < nodes.length; j++) {
- retList.add(((ClusterNode)nodes[j]).nodeId);
+ ClusterNode cNode = (ClusterNode)nodes[j];
+ // Exclude nodes whose queue is already full.
+ if (!cNode.isQueueFull()) {
+ retList.add(cNode.nodeId);
+ }
}
return retList;
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37ca4169/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
index dfd21ff..85eddaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
@@ -33,6 +33,8 @@ import java.util.List;
*/
public class TestNodeQueueLoadMonitor {
+ private final static int DEFAULT_MAX_QUEUE_LENGTH = 200;
+
static class FakeNodeId extends NodeId {
final String host;
final int port;
@@ -132,6 +134,17 @@ public class TestNodeQueueLoadMonitor {
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+
+ // Now update h3 and fill its queue.
+ selector.updateNode(createRMNode("h3", 3, -1,
+ DEFAULT_MAX_QUEUE_LENGTH));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ System.out.println("4-> "+ nodeIds);
+ Assert.assertEquals(3, nodeIds.size());
+ Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+ Assert.assertEquals("h4:4", nodeIds.get(2).toString());
}
@Test
@@ -180,6 +193,12 @@ public class TestNodeQueueLoadMonitor {
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength) {
+ return createRMNode(host, port, waitTime, queueLength,
+ DEFAULT_MAX_QUEUE_LENGTH);
+ }
+
+ private RMNode createRMNode(String host, int port,
+ int waitTime, int queueLength, int queueCapacity) {
RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port);
Mockito.when(node1.getNodeID()).thenReturn(nID1);
@@ -189,6 +208,8 @@ public class TestNodeQueueLoadMonitor {
.thenReturn(waitTime);
Mockito.when(status1.getWaitQueueLength())
.thenReturn(queueLength);
+ Mockito.when(status1.getOpportQueueCapacity())
+ .thenReturn(queueCapacity);
Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
return node1;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org