You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/03/21 19:05:43 UTC

[hadoop] branch trunk updated: YARN-9402. Opportunistic containers should not be scheduled on Decommissioning nodes. Contributed by Abhishek Modi.

This is an automated email from the ASF dual-hosted git repository.

gifuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 548997d  YARN-9402. Opportunistic containers should not be scheduled on Decommissioning nodes. Contributed by Abhishek Modi.
548997d is described below

commit 548997d6c9c5a1b9734ee00d065ce48a189458e6
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Thu Mar 21 12:04:05 2019 -0700

    YARN-9402. Opportunistic containers should not be scheduled on Decommissioning nodes. Contributed by Abhishek Modi.
---
 .../distributed/NodeQueueLoadMonitor.java          | 11 +++--
 .../distributed/TestNodeQueueLoadMonitor.java      | 50 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index ca35886..e093b2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -230,8 +231,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
     try {
       ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
       if (currentNode == null) {
-        if (estimatedQueueWaitTime != -1
-            || comparator == LoadComparator.QUEUE_LENGTH) {
+        if (rmNode.getState() != NodeState.DECOMMISSIONING &&
+            (estimatedQueueWaitTime != -1 ||
+                comparator == LoadComparator.QUEUE_LENGTH)) {
           this.clusterNodes.put(rmNode.getNodeID(),
               new ClusterNode(rmNode.getNodeID())
                   .setQueueWaitTime(estimatedQueueWaitTime)
@@ -246,8 +248,9 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
               "wait queue length [" + waitQueueLength + "]");
         }
       } else {
-        if (estimatedQueueWaitTime != -1
-            || comparator == LoadComparator.QUEUE_LENGTH) {
+        if (rmNode.getState() != NodeState.DECOMMISSIONING &&
+            (estimatedQueueWaitTime != -1 ||
+                comparator == LoadComparator.QUEUE_LENGTH)) {
           currentNode
               .setQueueWaitTime(estimatedQueueWaitTime)
               .setQueueLength(waitQueueLength)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
index 85eddaa..bbc0086 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -99,6 +100,23 @@ public class TestNodeQueueLoadMonitor {
     Assert.assertEquals("h3:3", nodeIds.get(0).toString());
     Assert.assertEquals("h2:2", nodeIds.get(1).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now update node 2 to DECOMMISSIONING state
+    selector
+        .updateNode(createRMNode("h2", 2, 1, 10, NodeState.DECOMMISSIONING));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals(2, nodeIds.size());
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+
+    // Now update node 2 back to RUNNING state
+    selector.updateNode(createRMNode("h2", 2, 1, 10, NodeState.RUNNING));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
   }
 
   @Test
@@ -145,6 +163,25 @@ public class TestNodeQueueLoadMonitor {
     Assert.assertEquals("h2:2", nodeIds.get(0).toString());
     Assert.assertEquals("h1:1", nodeIds.get(1).toString());
     Assert.assertEquals("h4:4", nodeIds.get(2).toString());
+
+    // Now update h2 to Decommissioning state
+    selector.updateNode(createRMNode("h2", 2, -1,
+        5, NodeState.DECOMMISSIONING));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals(2, nodeIds.size());
+    Assert.assertEquals("h1:1", nodeIds.get(0).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(1).toString());
+
+    // Now update h2 back to Running state
+    selector.updateNode(createRMNode("h2", 2, -1,
+        5, NodeState.RUNNING));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    Assert.assertEquals(3, nodeIds.size());
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(1).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(2).toString());
   }
 
   @Test
@@ -198,10 +235,23 @@ public class TestNodeQueueLoadMonitor {
   }
 
   private RMNode createRMNode(String host, int port,
+      int waitTime, int queueLength, NodeState state) {
+    return createRMNode(host, port, waitTime, queueLength,
+        DEFAULT_MAX_QUEUE_LENGTH, state);
+  }
+
+  private RMNode createRMNode(String host, int port,
       int waitTime, int queueLength, int queueCapacity) {
+    return createRMNode(host, port, waitTime, queueLength, queueCapacity,
+        NodeState.RUNNING);
+  }
+
+  private RMNode createRMNode(String host, int port,
+      int waitTime, int queueLength, int queueCapacity, NodeState state) {
     RMNode node1 = Mockito.mock(RMNode.class);
     NodeId nID1 = new FakeNodeId(host, port);
     Mockito.when(node1.getNodeID()).thenReturn(nID1);
+    Mockito.when(node1.getState()).thenReturn(state);
     OpportunisticContainersStatus status1 =
         Mockito.mock(OpportunisticContainersStatus.class);
     Mockito.when(status1.getEstimatedQueueWaitTime())


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org