You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/09 17:54:24 UTC

[22/30] hadoop git commit: YARN-2825. Container leak on NM. Contributed by Jian He

YARN-2825. Container leak on NM. Contributed by Jian He


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3d47507
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3d47507
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3d47507

Branch: refs/heads/HDFS-EC
Commit: c3d475070a1ec54c4b05923f4782cef204effd2c
Parents: 68db5b3
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Nov 7 23:16:37 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Nov 7 23:16:37 2014 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  2 +
 .../nodemanager/NodeStatusUpdaterImpl.java      | 27 ++++++----
 .../nodemanager/TestNodeStatusUpdater.java      | 54 ++++++++++++++++++--
 3 files changed, 71 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d47507/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 6e5f59a..e4b116d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -906,6 +906,8 @@ Release 2.6.0 - UNRELEASED
     YARN-2753. Fixed a bunch of bugs in the NodeLabelsManager classes. (Zhihai xu
     via vinodkv)
 
+    YARN-2825. Container leak on NM (Jian He via jlowe)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d47507/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index ebbe503..fcdd2c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -66,8 +66,8 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
@@ -115,6 +115,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private Runnable statusUpdaterRunnable;
   private Thread  statusUpdater;
   private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
+  Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -446,19 +447,27 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   @VisibleForTesting
   @Private
-  public void removeCompletedContainersFromContext(
+  public void removeOrTrackCompletedContainersFromContext(
       List<ContainerId> containerIds) throws IOException {
     Set<ContainerId> removedContainers = new HashSet<ContainerId>();
 
-    // If the AM has pulled the completedContainer it can be removed
-    for (ContainerId containerId : containerIds) {
-      context.getContainers().remove(containerId);
-      removedContainers.add(containerId);
+    pendingContainersToRemove.addAll(containerIds);
+    Iterator<ContainerId> iter = pendingContainersToRemove.iterator();
+    while (iter.hasNext()) {
+      ContainerId containerId = iter.next();
+      // remove the container only if the container is at DONE state
+      Container nmContainer = context.getContainers().get(containerId);
+      if (nmContainer != null && nmContainer.getContainerState().equals(
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) {
+        context.getContainers().remove(containerId);
+        removedContainers.add(containerId);
+        iter.remove();
+      }
     }
 
     if (!removedContainers.isEmpty()) {
-      LOG.info("Removed completed containers from NM context: " +
-          removedContainers);
+      LOG.info("Removed completed containers from NM context: "
+          + removedContainers);
     }
   }
 
@@ -601,7 +610,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             // because these completed containers will be reported back to RM
             // when NM re-registers with RM.
             // Only remove the cleanedup containers that are acked
-            removeCompletedContainersFromContext(response
+            removeOrTrackCompletedContainersFromContext(response
                   .getContainersToBeRemovedFromNM());
 
             lastHeartBeatID = response.getResponseId();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d47507/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 5c2dd2c..925a249 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -30,9 +30,11 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
@@ -866,18 +868,57 @@ public class TestNodeStatusUpdater {
       public ContainerState getCurrentState() {
         return ContainerState.COMPLETE;
       }
+
+      @Override
+      public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() {
+        return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE;
+      }
     };
 
+    ContainerId runningContainerId =
+        ContainerId.newInstance(appAttemptId, 3);
+    Token runningContainerToken =
+        BuilderUtils.newContainerToken(runningContainerId, "anyHost",
+          1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
+          "password".getBytes(), 0);
+    Container runningContainer =
+        new ContainerImpl(conf, null, null, null, null, null,
+          BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) {
+          @Override
+          public ContainerState getCurrentState() {
+            return ContainerState.RUNNING;
+          }
+
+          @Override
+          public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() {
+            return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING;
+          }
+        };
+
     nm.getNMContext().getApplications().putIfAbsent(appId,
         mock(Application.class));
     nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
-    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
+    nm.getNMContext().getContainers()
+      .put(runningContainerId, runningContainer);
+
+    Assert.assertEquals(2, nodeStatusUpdater.getContainerStatuses().size());
 
     List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
     ackedContainers.add(cId);
+    ackedContainers.add(runningContainerId);
 
-    nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers);
-    Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty());
+    nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers);
+
+    Set<ContainerId> containerIdSet = new HashSet<ContainerId>();
+    for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) {
+      containerIdSet.add(status.getContainerId());
+    }
+
+    Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1);
+    // completed container is removed;
+    Assert.assertFalse(containerIdSet.contains(anyCompletedContainer));
+    // running container is not removed;
+    Assert.assertTrue(containerIdSet.contains(runningContainerId));
   }
 
   @Test
@@ -1467,6 +1508,13 @@ public class TestNodeStatusUpdater {
     when(container.getCurrentState()).thenReturn(containerStatus.getState());
     when(container.getContainerId()).thenReturn(
       containerStatus.getContainerId());
+    if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
+      when(container.getContainerState())
+        .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE);
+    } else if (containerStatus.getState().equals(ContainerState.RUNNING)) {
+      when(container.getContainerState())
+      .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING);
+    }
     return container;
   }