You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/01/08 20:28:36 UTC
hadoop git commit: YARN-2997. Fixed NodeStatusUpdater to not send
alreay-sent completed container statuses on heartbeat. Contributed by
Chengbing Liu (cherry picked from commit
cc2a745f7e82c9fa6de03242952347c54c52dccc)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 501246e24 -> e7e617304
YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed container statuses on heartbeat. Contributed by Chengbing Liu
(cherry picked from commit cc2a745f7e82c9fa6de03242952347c54c52dccc)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e7e61730
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e7e61730
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e7e61730
Branch: refs/heads/branch-2
Commit: e7e6173049adca2a2ae0e1231adcaca8168bec27
Parents: 501246e
Author: Jian He <ji...@apache.org>
Authored: Thu Jan 8 11:12:54 2015 -0800
Committer: Jian He <ji...@apache.org>
Committed: Thu Jan 8 11:28:24 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../nodemanager/NodeStatusUpdaterImpl.java | 39 ++++++++++-----
.../nodemanager/TestNodeStatusUpdater.java | 52 +++++++++++++-------
3 files changed, 64 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7e61730/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ee50cfe..8294853 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -304,6 +304,9 @@ Release 2.7.0 - UNRELEASED
YARN-2936. Changed YARNDelegationTokenIdentifier to set proto fields on
getProto method. (Varun Saxena via jianhe)
+ YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed
+ container statuses on heartbeat. (Chengbing Liu via jianhe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7e61730/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index f561dbb..6ddd7e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -106,6 +106,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// the AM finishes it informs the RM to stop the may-be-already-completed
// containers.
private final Map<ContainerId, Long> recentlyStoppedContainers;
+ // Save the reported completed containers in case of lost heartbeat responses.
+ // These completed containers will be sent again till a successful response.
+ private final Map<ContainerId, ContainerStatus> pendingCompletedContainers;
// Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers;
@@ -126,6 +129,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.metrics = metrics;
this.recentlyStoppedContainers =
new LinkedHashMap<ContainerId, Long>();
+ this.pendingCompletedContainers =
+ new HashMap<ContainerId, ContainerStatus>();
}
@Override
@@ -358,11 +363,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Container container : this.context.getContainers().values()) {
ContainerId containerId = container.getContainerId();
- ApplicationId applicationId = container.getContainerId()
- .getApplicationAttemptId().getApplicationId();
+ ApplicationId applicationId = containerId.getApplicationAttemptId()
+ .getApplicationId();
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
- containerStatuses.add(containerStatus);
if (containerStatus.getState() == ContainerState.COMPLETE) {
if (isApplicationStopped(applicationId)) {
if (LOG.isDebugEnabled()) {
@@ -370,14 +374,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
+ containerId + " from NM context.");
}
context.getContainers().remove(containerId);
+ pendingCompletedContainers.put(containerId, containerStatus);
} else {
- // Adding to finished containers cache. Cache will keep it around at
- // least for #durationToTrackStoppedContainers duration. In the
- // subsequent call to stop container it will get removed from cache.
- addCompletedContainer(container.getContainerId());
+ if (!isContainerRecentlyStopped(containerId)) {
+ pendingCompletedContainers.put(containerId, containerStatus);
+ // Adding to finished containers cache. Cache will keep it around at
+ // least for #durationToTrackStoppedContainers duration. In the
+ // subsequent call to stop container it will get removed from cache.
+ addCompletedContainer(containerId);
+ }
}
+ } else {
+ containerStatuses.add(containerStatus);
}
}
+ containerStatuses.addAll(pendingCompletedContainers.values());
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out " + containerStatuses.size()
+ " container statuses: " + containerStatuses);
@@ -397,8 +408,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new ArrayList<NMContainerStatus>();
for (Container container : this.context.getContainers().values()) {
ContainerId containerId = container.getContainerId();
- ApplicationId applicationId = container.getContainerId()
- .getApplicationAttemptId().getApplicationId();
+ ApplicationId applicationId = containerId.getApplicationAttemptId()
+ .getApplicationId();
if (!this.context.getApplications().containsKey(applicationId)) {
context.getContainers().remove(containerId);
continue;
@@ -410,7 +421,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
- addCompletedContainer(container.getContainerId());
+ addCompletedContainer(containerId);
}
}
LOG.info("Sending out " + containerStatuses.size()
@@ -457,7 +468,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
ContainerId containerId = iter.next();
// remove the container only if the container is at DONE state
Container nmContainer = context.getContainers().get(containerId);
- if (nmContainer != null && nmContainer.getContainerState().equals(
+ if (nmContainer == null) {
+ iter.remove();
+ } else if (nmContainer.getContainerState().equals(
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) {
context.getContainers().remove(containerId);
removedContainers.add(containerId);
@@ -469,6 +482,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
LOG.info("Removed completed containers from NM context: "
+ removedContainers);
}
+ pendingCompletedContainers.clear();
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -507,7 +521,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
recentlyStoppedContainers.clear();
}
}
-
+
@Private
@VisibleForTesting
public void removeVeryOldStoppedContainersFromCache() {
@@ -605,6 +619,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
ResourceManagerConstants.RM_INVALID_IDENTIFIER;
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.RESYNC));
+ pendingCompletedContainers.clear();
break;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e7e61730/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index e367085..46d7b10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -610,14 +610,14 @@ public class TestNodeStatusUpdater {
<ContainerId>();
try {
if (heartBeatID == 0) {
- Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
- .size(), 0);
- Assert.assertEquals(context.getContainers().size(), 0);
+ Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses()
+ .size());
+ Assert.assertEquals(0, context.getContainers().size());
} else if (heartBeatID == 1) {
List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses();
- Assert.assertEquals(statuses.size(), 2);
- Assert.assertEquals(context.getContainers().size(), 2);
+ Assert.assertEquals(2, statuses.size());
+ Assert.assertEquals(2, context.getContainers().size());
boolean container2Exist = false, container3Exist = false;
for (ContainerStatus status : statuses) {
@@ -643,8 +643,16 @@ public class TestNodeStatusUpdater {
} else if (heartBeatID == 2 || heartBeatID == 3) {
List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses();
- Assert.assertEquals(statuses.size(), 4);
- Assert.assertEquals(context.getContainers().size(), 4);
+ if (heartBeatID == 2) {
+ // NM should send completed containers again, since the last
+ // heartbeat is lost.
+ Assert.assertEquals(4, statuses.size());
+ } else {
+ // NM should not send completed containers again, since the last
+ // heartbeat is successful.
+ Assert.assertEquals(2, statuses.size());
+ }
+ Assert.assertEquals(4, context.getContainers().size());
boolean container2Exist = false, container3Exist = false,
container4Exist = false, container5Exist = false;
@@ -674,8 +682,14 @@ public class TestNodeStatusUpdater {
container5Exist = true;
}
}
- Assert.assertTrue(container2Exist && container3Exist
- && container4Exist && container5Exist);
+ if (heartBeatID == 2) {
+ Assert.assertTrue(container2Exist && container3Exist
+ && container4Exist && container5Exist);
+ } else {
+ // NM do not send completed containers again
+ Assert.assertTrue(container2Exist && !container3Exist
+ && container4Exist && !container5Exist);
+ }
if (heartBeatID == 3) {
finishedContainersPulledByAM.add(containerStatus3.getContainerId());
@@ -683,8 +697,9 @@ public class TestNodeStatusUpdater {
} else if (heartBeatID == 4) {
List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses();
- Assert.assertEquals(statuses.size(), 3);
- Assert.assertEquals(context.getContainers().size(), 3);
+ Assert.assertEquals(2, statuses.size());
+ // Container 3 is acked by AM, hence removed from context
+ Assert.assertEquals(3, context.getContainers().size());
boolean container3Exist = false;
for (ContainerStatus status : statuses) {
@@ -917,13 +932,14 @@ public class TestNodeStatusUpdater {
nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers);
Set<ContainerId> containerIdSet = new HashSet<ContainerId>();
- for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) {
+ List<ContainerStatus> containerStatuses = nodeStatusUpdater.getContainerStatuses();
+ for (ContainerStatus status : containerStatuses) {
containerIdSet.add(status.getContainerId());
}
- Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1);
+ Assert.assertEquals(1, containerStatuses.size());
// completed container is removed;
- Assert.assertFalse(containerIdSet.contains(anyCompletedContainer));
+ Assert.assertFalse(containerIdSet.contains(cId));
// running container is not removed;
Assert.assertTrue(containerIdSet.contains(runningContainerId));
}
@@ -967,15 +983,15 @@ public class TestNodeStatusUpdater {
when(application.getApplicationState()).thenReturn(
ApplicationState.FINISHING_CONTAINERS_WAIT);
- // The completed container will be sent one time. Then we will delete it.
+ // The completed container will be saved in case of lost heartbeat.
+ Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
- Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
nm.getNMContext().getApplications().remove(appId);
- // The completed container will be sent one time. Then we will delete it.
+ // The completed container will be saved in case of lost heartbeat.
+ Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
- Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
}
@Test