You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zt...@apache.org on 2019/06/17 09:12:11 UTC

[hadoop] branch trunk updated: YARN-9608. DecommissioningNodesWatcher should get lists of running applications on node from RMNode. Contributed by Abhishek Modi.

This is an automated email from the ASF dual-hosted git repository.

ztang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 304a47e  YARN-9608. DecommissioningNodesWatcher should get lists of running applications on node from RMNode. Contributed by Abhishek Modi.
304a47e is described below

commit 304a47e22cb836cfde227803c853ecf4def870e1
Author: Zhankun Tang <zt...@apache.org>
AuthorDate: Mon Jun 17 17:08:23 2019 +0800

    YARN-9608. DecommissioningNodesWatcher should get lists of running applications on node from RMNode. Contributed by Abhishek Modi.
---
 .../DecommissioningNodesWatcher.java               |  47 ++--------
 .../TestDecommissioningNodesWatcher.java           | 101 +++++++++++++++++----
 2 files changed, 92 insertions(+), 56 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
index b0cec5a..c476c61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
@@ -36,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -58,13 +59,8 @@ import org.apache.hadoop.yarn.util.MonotonicClock;
  * a DECOMMISSIONING node will be DECOMMISSIONED no later than
  * DECOMMISSIONING_TIMEOUT regardless of running containers or applications.
  *
- * To be efficient, DecommissioningNodesWatcher skip tracking application
- * containers on a particular node before the node is in DECOMMISSIONING state.
- * It only tracks containers once the node is in DECOMMISSIONING state.
  * DecommissioningNodesWatcher basically is no cost when no node is
- * DECOMMISSIONING. This sacrifices the possibility that the node once
- * host containers of an application that is still running
- * (the affected map tasks will be rescheduled).
+ * DECOMMISSIONING.
  */
 public class DecommissioningNodesWatcher {
   private static final Logger LOG =
@@ -88,8 +84,8 @@ public class DecommissioningNodesWatcher {
     // number of running containers at the moment.
     private int numActiveContainers;
 
-    // All applications run on the node at or after decommissioningStartTime.
-    private Set<ApplicationId> appIds;
+    // All applications run on the node.
+    private List<ApplicationId> appIds;
 
     // First moment the node is observed in DECOMMISSIONED state.
     private long decommissionedTime;
@@ -102,7 +98,7 @@ public class DecommissioningNodesWatcher {
 
     public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) {
       this.nodeId = nodeId;
-      this.appIds = new HashSet<ApplicationId>();
+      this.appIds = new ArrayList<>();
       this.decommissioningStartTime = mclock.getTime();
       this.timeoutMs = 1000L * timeoutSec;
     }
@@ -164,9 +160,7 @@ public class DecommissioningNodesWatcher {
       context.updateTimeout(rmNode.getDecommissioningTimeout());
       context.lastUpdateTime = now;
 
-      if (remoteNodeStatus.getKeepAliveApplications() != null) {
-        context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications());
-      }
+      context.appIds = rmNode.getRunningApps();
 
       // Count number of active containers.
       int numActiveContainers = 0;
@@ -176,14 +170,7 @@ public class DecommissioningNodesWatcher {
             newState == ContainerState.NEW) {
           numActiveContainers++;
         }
-        context.numActiveContainers = numActiveContainers;
-        ApplicationId aid = cs.getContainerId()
-            .getApplicationAttemptId().getApplicationId();
-        if (!context.appIds.contains(aid)) {
-          context.appIds.add(aid);
-        }
       }
-
       context.numActiveContainers = numActiveContainers;
 
       // maintain lastContainerFinishTime.
@@ -254,7 +241,6 @@ public class DecommissioningNodesWatcher {
           DecommissioningNodeStatus.TIMEOUT;
     }
 
-    removeCompletedApps(context);
     if (context.appIds.size() == 0) {
       return DecommissioningNodeStatus.READY;
     } else {
@@ -336,25 +322,6 @@ public class DecommissioningNodesWatcher {
     return rmNode;
   }
 
-  private void removeCompletedApps(DecommissioningNodeContext context) {
-    Iterator<ApplicationId> it = context.appIds.iterator();
-    while (it.hasNext()) {
-      ApplicationId appId = it.next();
-      RMApp rmApp = rmContext.getRMApps().get(appId);
-      if (rmApp == null) {
-        LOG.debug("Consider non-existing app {} as completed", appId);
-        it.remove();
-        continue;
-      }
-      if (rmApp.getState() == RMAppState.FINISHED ||
-          rmApp.getState() == RMAppState.FAILED ||
-          rmApp.getState() == RMAppState.KILLED) {
-        LOG.debug("Remove {} app {}", rmApp.getState(), appId);
-        it.remove();
-      }
-    }
-  }
-
   // Time in second to be decommissioned.
   private int getTimeoutInSec(DecommissioningNodeContext context) {
     if (context.nodeState == NodeState.DECOMMISSIONED) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
index 4371156..0695689 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
@@ -19,11 +19,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -35,7 +35,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -58,38 +59,106 @@ public class TestDecommissioningNodesWatcher {
         new DecommissioningNodesWatcher(rm.getRMContext());
 
     MockNM nm1 = rm.registerNode("host1:1234", 10240);
-    RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNodeImpl node1 =
+        (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
     NodeId id1 = nm1.getNodeId();
 
     rm.waitForState(id1, NodeState.RUNNING);
-    Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
 
     RMApp app = rm.submitApp(2000);
     MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
 
+    NodeStatus nodeStatus = createNodeStatus(id1, app, 3);
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+
     // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
     rm.sendNodeGracefulDecommission(nm1,
         YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
     rm.waitForState(id1, NodeState.DECOMMISSIONING);
 
     // Update status with decreasing number of running containers until 0.
-    watcher.update(node1, createNodeStatus(id1, app, 12));
-    watcher.update(node1, createNodeStatus(id1, app, 11));
+    nodeStatus = createNodeStatus(id1, app, 3);
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+    watcher.update(node1, nodeStatus);
+
+    nodeStatus = createNodeStatus(id1, app, 2);
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+    watcher.update(node1, nodeStatus);
     Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
 
-    watcher.update(node1, createNodeStatus(id1, app, 1));
+    nodeStatus = createNodeStatus(id1, app, 1);
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+    watcher.update(node1, nodeStatus);
     Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER,
-                        watcher.checkDecommissioningStatus(id1));
+        watcher.checkDecommissioningStatus(id1));
+
+    nodeStatus = createNodeStatus(id1, app, 0);
+    watcher.update(node1, nodeStatus);
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+    Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
+        watcher.checkDecommissioningStatus(id1));
+
+    // Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
+    MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
+    rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+    watcher.update(node1, nodeStatus);
+    Assert.assertEquals(DecommissioningNodeStatus.READY,
+        watcher.checkDecommissioningStatus(id1));
+  }
+
+  @Test
+  public void testDecommissioningNodesWatcherWithPreviousRunningApps()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40");
+
+    rm = new MockRM(conf);
+    rm.start();
+
+    DecommissioningNodesWatcher watcher =
+        new DecommissioningNodesWatcher(rm.getRMContext());
+
+    MockNM nm1 = rm.registerNode("host1:1234", 10240);
+    RMNodeImpl node1 =
+        (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    NodeId id1 = nm1.getNodeId();
+
+    rm.waitForState(id1, NodeState.RUNNING);
+
+    RMApp app = rm.submitApp(2000);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
 
-    watcher.update(node1, createNodeStatus(id1, app, 0));
+    NodeStatus nodeStatus = createNodeStatus(id1, app, 3);
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+
+    Assert.assertEquals(1, node1.getRunningApps().size());
+
+    // update node with 0 running containers
+    nodeStatus = createNodeStatus(id1, app, 0);
+    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
+
+    Assert.assertEquals(1, node1.getRunningApps().size());
+
+    // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. Right now
+    // there is no container running on the node.
+    rm.sendNodeGracefulDecommission(nm1,
+        YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+    rm.waitForState(id1, NodeState.DECOMMISSIONING);
+
+    // we should still get WAIT_APP as container for a running app previously
+    // ran on this node.
+    watcher.update(node1, nodeStatus);
+    Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
     Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
-                        watcher.checkDecommissioningStatus(id1));
+        watcher.checkDecommissioningStatus(id1));
 
     // Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
     MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
     rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+    Assert.assertEquals(0, node1.getRunningApps().size());
+    watcher.update(node1, nodeStatus);
     Assert.assertEquals(DecommissioningNodeStatus.READY,
-                        watcher.checkDecommissioningStatus(id1));
+        watcher.checkDecommissioningStatus(id1));
   }
 
   @After
@@ -103,7 +172,7 @@ public class TestDecommissioningNodesWatcher {
       NodeId nodeId, RMApp app, int numRunningContainers) {
     return NodeStatus.newInstance(
         nodeId, 0, getContainerStatuses(app, numRunningContainers),
-        new ArrayList<ApplicationId>(),
+        Collections.emptyList(),
         NodeHealthStatus.newInstance(
             true,  "", System.currentTimeMillis() - 1000),
         null, null, null);
@@ -113,8 +182,8 @@ public class TestDecommissioningNodesWatcher {
   // where numRunningContainers are RUNNING.
   private List<ContainerStatus> getContainerStatuses(
       RMApp app, int numRunningContainers) {
-    // Total 12 containers
-    final int total = 12;
+    // Total 3 containers
+    final int total = 3;
     numRunningContainers = Math.min(total, numRunningContainers);
     List<ContainerStatus> output = new ArrayList<ContainerStatus>();
     for (int i = 0; i < total; i++) {
@@ -122,8 +191,8 @@ public class TestDecommissioningNodesWatcher {
           ContainerState.COMPLETE : ContainerState.RUNNING;
       output.add(ContainerStatus.newInstance(
           ContainerId.newContainerId(
-              ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1),
-          cstate, "Dummy", 0));
+              ApplicationAttemptId.newInstance(app.getApplicationId(), 0), i),
+          cstate, "", 0));
     }
     return output;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org