You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/01/08 20:28:36 UTC

hadoop git commit: YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed container statuses on heartbeat. Contributed by Chengbing Liu (cherry picked from commit cc2a745f7e82c9fa6de03242952347c54c52dccc)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 501246e24 -> e7e617304


YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed container statuses on heartbeat. Contributed by Chengbing Liu
(cherry picked from commit cc2a745f7e82c9fa6de03242952347c54c52dccc)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e7e61730
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e7e61730
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e7e61730

Branch: refs/heads/branch-2
Commit: e7e6173049adca2a2ae0e1231adcaca8168bec27
Parents: 501246e
Author: Jian He <ji...@apache.org>
Authored: Thu Jan 8 11:12:54 2015 -0800
Committer: Jian He <ji...@apache.org>
Committed: Thu Jan 8 11:28:24 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../nodemanager/NodeStatusUpdaterImpl.java      | 39 ++++++++++-----
 .../nodemanager/TestNodeStatusUpdater.java      | 52 +++++++++++++-------
 3 files changed, 64 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7e61730/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ee50cfe..8294853 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -304,6 +304,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2936. Changed YARNDelegationTokenIdentifier to set proto fields on
     getProto method. (Varun Saxena via jianhe)
 
+    YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed
+    container statuses on heartbeat. (Chengbing Liu via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7e61730/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index f561dbb..6ddd7e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -106,6 +106,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   // the AM finishes it informs the RM to stop the may-be-already-completed
   // containers.
   private final Map<ContainerId, Long> recentlyStoppedContainers;
+  // Save the reported completed containers in case of lost heartbeat responses.
+  // These completed containers will be sent again till a successful response.
+  private final Map<ContainerId, ContainerStatus> pendingCompletedContainers;
   // Duration for which to track recently stopped container.
   private long durationToTrackStoppedContainers;
 
@@ -126,6 +129,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     this.metrics = metrics;
     this.recentlyStoppedContainers =
         new LinkedHashMap<ContainerId, Long>();
+    this.pendingCompletedContainers =
+        new HashMap<ContainerId, ContainerStatus>();
   }
 
   @Override
@@ -358,11 +363,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
     for (Container container : this.context.getContainers().values()) {
       ContainerId containerId = container.getContainerId();
-      ApplicationId applicationId = container.getContainerId()
-          .getApplicationAttemptId().getApplicationId();
+      ApplicationId applicationId = containerId.getApplicationAttemptId()
+          .getApplicationId();
       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
           container.cloneAndGetContainerStatus();
-      containerStatuses.add(containerStatus);
       if (containerStatus.getState() == ContainerState.COMPLETE) {
         if (isApplicationStopped(applicationId)) {
           if (LOG.isDebugEnabled()) {
@@ -370,14 +374,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                 + containerId + " from NM context.");
           }
           context.getContainers().remove(containerId);
+          pendingCompletedContainers.put(containerId, containerStatus);
         } else {
-          // Adding to finished containers cache. Cache will keep it around at
-          // least for #durationToTrackStoppedContainers duration. In the
-          // subsequent call to stop container it will get removed from cache.
-          addCompletedContainer(container.getContainerId());
+          if (!isContainerRecentlyStopped(containerId)) {
+            pendingCompletedContainers.put(containerId, containerStatus);
+            // Adding to finished containers cache. Cache will keep it around at
+            // least for #durationToTrackStoppedContainers duration. In the
+            // subsequent call to stop container it will get removed from cache.
+            addCompletedContainer(containerId);
+          }
         }
+      } else {
+        containerStatuses.add(containerStatus);
       }
     }
+    containerStatuses.addAll(pendingCompletedContainers.values());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending out " + containerStatuses.size()
           + " container statuses: " + containerStatuses);
@@ -397,8 +408,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         new ArrayList<NMContainerStatus>();
     for (Container container : this.context.getContainers().values()) {
       ContainerId containerId = container.getContainerId();
-      ApplicationId applicationId = container.getContainerId()
-          .getApplicationAttemptId().getApplicationId();
+      ApplicationId applicationId = containerId.getApplicationAttemptId()
+          .getApplicationId();
       if (!this.context.getApplications().containsKey(applicationId)) {
         context.getContainers().remove(containerId);
         continue;
@@ -410,7 +421,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         // Adding to finished containers cache. Cache will keep it around at
         // least for #durationToTrackStoppedContainers duration. In the
         // subsequent call to stop container it will get removed from cache.
-        addCompletedContainer(container.getContainerId());
+        addCompletedContainer(containerId);
       }
     }
     LOG.info("Sending out " + containerStatuses.size()
@@ -457,7 +468,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       ContainerId containerId = iter.next();
       // remove the container only if the container is at DONE state
       Container nmContainer = context.getContainers().get(containerId);
-      if (nmContainer != null && nmContainer.getContainerState().equals(
+      if (nmContainer == null) {
+        iter.remove();
+      } else if (nmContainer.getContainerState().equals(
         org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) {
         context.getContainers().remove(containerId);
         removedContainers.add(containerId);
@@ -469,6 +482,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       LOG.info("Removed completed containers from NM context: "
           + removedContainers);
     }
+    pendingCompletedContainers.clear();
   }
 
   private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -507,7 +521,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       recentlyStoppedContainers.clear();
     }
   }
-  
+
   @Private
   @VisibleForTesting
   public void removeVeryOldStoppedContainersFromCache() {
@@ -605,6 +619,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                   ResourceManagerConstants.RM_INVALID_IDENTIFIER;
               dispatcher.getEventHandler().handle(
                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
+              pendingCompletedContainers.clear();
               break;
             }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7e61730/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index e367085..46d7b10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -610,14 +610,14 @@ public class TestNodeStatusUpdater {
           <ContainerId>();
       try {
         if (heartBeatID == 0) {
-          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
-            .size(), 0);
-          Assert.assertEquals(context.getContainers().size(), 0);
+          Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses()
+            .size());
+          Assert.assertEquals(0, context.getContainers().size());
         } else if (heartBeatID == 1) {
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
-          Assert.assertEquals(statuses.size(), 2);
-          Assert.assertEquals(context.getContainers().size(), 2);
+          Assert.assertEquals(2, statuses.size());
+          Assert.assertEquals(2, context.getContainers().size());
 
           boolean container2Exist = false, container3Exist = false;
           for (ContainerStatus status : statuses) {
@@ -643,8 +643,16 @@ public class TestNodeStatusUpdater {
         } else if (heartBeatID == 2 || heartBeatID == 3) {
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
-          Assert.assertEquals(statuses.size(), 4);
-          Assert.assertEquals(context.getContainers().size(), 4);
+          if (heartBeatID == 2) {
+            // NM should send completed containers again, since the last
+            // heartbeat is lost.
+            Assert.assertEquals(4, statuses.size());
+          } else {
+            // NM should not send completed containers again, since the last
+            // heartbeat is successful.
+            Assert.assertEquals(2, statuses.size());
+          }
+          Assert.assertEquals(4, context.getContainers().size());
 
           boolean container2Exist = false, container3Exist = false,
               container4Exist = false, container5Exist = false;
@@ -674,8 +682,14 @@ public class TestNodeStatusUpdater {
               container5Exist = true;
             }
           }
-          Assert.assertTrue(container2Exist && container3Exist
-              && container4Exist && container5Exist);
+          if (heartBeatID == 2) {
+            Assert.assertTrue(container2Exist && container3Exist
+                && container4Exist && container5Exist);
+          } else {
+            // NM do not send completed containers again
+            Assert.assertTrue(container2Exist && !container3Exist
+                && container4Exist && !container5Exist);
+          }
 
           if (heartBeatID == 3) {
             finishedContainersPulledByAM.add(containerStatus3.getContainerId());
@@ -683,8 +697,9 @@ public class TestNodeStatusUpdater {
         } else if (heartBeatID == 4) {
           List<ContainerStatus> statuses =
               request.getNodeStatus().getContainersStatuses();
-          Assert.assertEquals(statuses.size(), 3);
-          Assert.assertEquals(context.getContainers().size(), 3);
+          Assert.assertEquals(2, statuses.size());
+          // Container 3 is acked by AM, hence removed from context
+          Assert.assertEquals(3, context.getContainers().size());
 
           boolean container3Exist = false;
           for (ContainerStatus status : statuses) {
@@ -917,13 +932,14 @@ public class TestNodeStatusUpdater {
     nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers);
 
     Set<ContainerId> containerIdSet = new HashSet<ContainerId>();
-    for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) {
+    List<ContainerStatus> containerStatuses = nodeStatusUpdater.getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
       containerIdSet.add(status.getContainerId());
     }
 
-    Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1);
+    Assert.assertEquals(1, containerStatuses.size());
     // completed container is removed;
-    Assert.assertFalse(containerIdSet.contains(anyCompletedContainer));
+    Assert.assertFalse(containerIdSet.contains(cId));
     // running container is not removed;
     Assert.assertTrue(containerIdSet.contains(runningContainerId));
   }
@@ -967,15 +983,15 @@ public class TestNodeStatusUpdater {
 
     when(application.getApplicationState()).thenReturn(
         ApplicationState.FINISHING_CONTAINERS_WAIT);
-    // The completed container will be sent one time. Then we will delete it.
+    // The completed container will be saved in case of lost heartbeat.
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
-    Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
 
     nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
     nm.getNMContext().getApplications().remove(appId);
-    // The completed container will be sent one time. Then we will delete it.
+    // The completed container will be saved in case of lost heartbeat.
+    Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
-    Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
   }
 
   @Test