You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/03/21 19:05:43 UTC
[hadoop] branch trunk updated: YARN-9402. Opportunistic containers
should not be scheduled on Decommissioning nodes. Contributed by Abhishek
Modi.
This is an automated email from the ASF dual-hosted git repository.
gifuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 548997d YARN-9402. Opportunistic containers should not be scheduled on Decommissioning nodes. Contributed by Abhishek Modi.
548997d is described below
commit 548997d6c9c5a1b9734ee00d065ce48a189458e6
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Thu Mar 21 12:04:05 2019 -0700
YARN-9402. Opportunistic containers should not be scheduled on Decommissioning nodes. Contributed by Abhishek Modi.
---
.../distributed/NodeQueueLoadMonitor.java | 11 +++--
.../distributed/TestNodeQueueLoadMonitor.java | 50 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 4 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index ca35886..e093b2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -230,8 +231,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
try {
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
if (currentNode == null) {
- if (estimatedQueueWaitTime != -1
- || comparator == LoadComparator.QUEUE_LENGTH) {
+ if (rmNode.getState() != NodeState.DECOMMISSIONING &&
+ (estimatedQueueWaitTime != -1 ||
+ comparator == LoadComparator.QUEUE_LENGTH)) {
this.clusterNodes.put(rmNode.getNodeID(),
new ClusterNode(rmNode.getNodeID())
.setQueueWaitTime(estimatedQueueWaitTime)
@@ -246,8 +248,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
"wait queue length [" + waitQueueLength + "]");
}
} else {
- if (estimatedQueueWaitTime != -1
- || comparator == LoadComparator.QUEUE_LENGTH) {
+ if (rmNode.getState() != NodeState.DECOMMISSIONING &&
+ (estimatedQueueWaitTime != -1 ||
+ comparator == LoadComparator.QUEUE_LENGTH)) {
currentNode
.setQueueWaitTime(estimatedQueueWaitTime)
.setQueueLength(waitQueueLength)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
index 85eddaa..bbc0086 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -99,6 +100,23 @@ public class TestNodeQueueLoadMonitor {
Assert.assertEquals("h3:3", nodeIds.get(0).toString());
Assert.assertEquals("h2:2", nodeIds.get(1).toString());
Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+ // Now update node 2 to DECOMMISSIONING state
+ selector
+ .updateNode(createRMNode("h2", 2, 1, 10, NodeState.DECOMMISSIONING));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ Assert.assertEquals(2, nodeIds.size());
+ Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+
+ // Now update node 2 back to RUNNING state
+ selector.updateNode(createRMNode("h2", 2, 1, 10, NodeState.RUNNING));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+ Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(2).toString());
}
@Test
@@ -145,6 +163,25 @@ public class TestNodeQueueLoadMonitor {
Assert.assertEquals("h2:2", nodeIds.get(0).toString());
Assert.assertEquals("h1:1", nodeIds.get(1).toString());
Assert.assertEquals("h4:4", nodeIds.get(2).toString());
+
+ // Now update h2 to Decommissioning state
+ selector.updateNode(createRMNode("h2", 2, -1,
+ 5, NodeState.DECOMMISSIONING));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ Assert.assertEquals(2, nodeIds.size());
+ Assert.assertEquals("h1:1", nodeIds.get(0).toString());
+ Assert.assertEquals("h4:4", nodeIds.get(1).toString());
+
+ // Now update h2 back to Running state
+ selector.updateNode(createRMNode("h2", 2, -1,
+ 5, NodeState.RUNNING));
+ selector.computeTask.run();
+ nodeIds = selector.selectNodes();
+ Assert.assertEquals(3, nodeIds.size());
+ Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+ Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+ Assert.assertEquals("h4:4", nodeIds.get(2).toString());
}
@Test
@@ -198,10 +235,23 @@ public class TestNodeQueueLoadMonitor {
}
private RMNode createRMNode(String host, int port,
+ int waitTime, int queueLength, NodeState state) {
+ return createRMNode(host, port, waitTime, queueLength,
+ DEFAULT_MAX_QUEUE_LENGTH, state);
+ }
+
+ private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, int queueCapacity) {
+ return createRMNode(host, port, waitTime, queueLength, queueCapacity,
+ NodeState.RUNNING);
+ }
+
+ private RMNode createRMNode(String host, int port,
+ int waitTime, int queueLength, int queueCapacity, NodeState state) {
RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port);
Mockito.when(node1.getNodeID()).thenReturn(nID1);
+ Mockito.when(node1.getState()).thenReturn(state);
OpportunisticContainersStatus status1 =
Mockito.mock(OpportunisticContainersStatus.class);
Mockito.when(status1.getEstimatedQueueWaitTime())
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org