You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2014/10/02 00:38:59 UTC
git commit: YARN-2630. Prevented previous AM container status from
being acquired by the current restarted AM. Contributed by Jian He.
Repository: hadoop
Updated Branches:
refs/heads/trunk dd1b8f2ed -> 52bbe0f11
YARN-2630. Prevented previous AM container status from being acquired by the current restarted AM. Contributed by Jian He.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52bbe0f1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52bbe0f1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52bbe0f1
Branch: refs/heads/trunk
Commit: 52bbe0f11bc8e97df78a1ab9b63f4eff65fd7a76
Parents: dd1b8f2
Author: Zhijie Shen <zj...@apache.org>
Authored: Wed Oct 1 15:38:11 2014 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Wed Oct 1 15:38:11 2014 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../distributedshell/ApplicationMaster.java | 1 +
.../protocolrecords/NodeHeartbeatResponse.java | 9 +--
.../impl/pb/NodeHeartbeatResponsePBImpl.java | 45 ++++++-------
.../yarn_server_common_service_protos.proto | 2 +-
.../nodemanager/NodeStatusUpdaterImpl.java | 2 +-
.../nodemanager/TestNodeStatusUpdater.java | 2 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 69 +++++++++++++-------
.../resourcemanager/rmnode/RMNodeImpl.java | 25 +++----
.../applicationsmanager/TestAMRestart.java | 24 ++-----
.../attempt/TestRMAppAttemptTransitions.java | 12 ++--
11 files changed, 108 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3878ca1..4d05757 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -503,6 +503,9 @@ Release 2.6.0 - UNRELEASED
YARN-2602. Fixed possible NPE in ApplicationHistoryManagerOnTimelineStore.
(Zhijie Shen via jianhe)
+ YARN-2630. Prevented previous AM container status from being acquired by the
+ current restarted AM. (Jian He via zjshen)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 2451030..df9f34b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -668,6 +668,7 @@ public class ApplicationMaster {
+ ", completed=" + numCompletedContainers.get() + ", allocated="
+ numAllocatedContainers.get() + ", failed="
+ numFailedContainers.get();
+ LOG.info(appMessage);
success = false;
}
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 9887acc..12e1f54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -30,7 +30,7 @@ public interface NodeHeartbeatResponse {
NodeAction getNodeAction();
List<ContainerId> getContainersToCleanup();
- List<ContainerId> getFinishedContainersPulledByAM();
+ List<ContainerId> getContainersToBeRemovedFromNM();
List<ApplicationId> getApplicationsToCleanup();
@@ -45,9 +45,10 @@ public interface NodeHeartbeatResponse {
void addAllContainersToCleanup(List<ContainerId> containers);
- // This tells NM to remove finished containers only after the AM
- // has actually received it in a previous allocate response
- void addFinishedContainersPulledByAM(List<ContainerId> containers);
+ // This tells NM to remove finished containers from its context. Currently, NM
+ // will remove finished containers from its context only after AM has actually
+ // received the finished containers in a previous allocate response
+ void addContainersToBeRemovedFromNM(List<ContainerId> containers);
void addAllApplicationsToCleanup(List<ApplicationId> applications);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index e9296f4..78979d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -40,13 +40,14 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
-public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
+public class NodeHeartbeatResponsePBImpl extends
+ ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance();
NodeHeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
private List<ContainerId> containersToCleanup = null;
- private List<ContainerId> finishedContainersPulledByAM = null;
+ private List<ContainerId> containersToBeRemovedFromNM = null;
private List<ApplicationId> applicationsToCleanup = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@@ -74,8 +75,8 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
if (this.applicationsToCleanup != null) {
addApplicationsToCleanupToProto();
}
- if (this.finishedContainersPulledByAM != null) {
- addFinishedContainersPulledByAMToProto();
+ if (this.containersToBeRemovedFromNM != null) {
+ addContainersToBeRemovedFromNMToProto();
}
if (this.containerTokenMasterKey != null) {
builder.setContainerTokenMasterKey(
@@ -204,9 +205,9 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
}
@Override
- public List<ContainerId> getFinishedContainersPulledByAM() {
- initFinishedContainersPulledByAM();
- return this.finishedContainersPulledByAM;
+ public List<ContainerId> getContainersToBeRemovedFromNM() {
+ initContainersToBeRemovedFromNM();
+ return this.containersToBeRemovedFromNM;
}
private void initContainersToCleanup() {
@@ -222,16 +223,16 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
}
}
- private void initFinishedContainersPulledByAM() {
- if (this.finishedContainersPulledByAM != null) {
+ private void initContainersToBeRemovedFromNM() {
+ if (this.containersToBeRemovedFromNM != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
- List<ContainerIdProto> list = p.getFinishedContainersPulledByAmList();
- this.finishedContainersPulledByAM = new ArrayList<ContainerId>();
+ List<ContainerIdProto> list = p.getContainersToBeRemovedFromNmList();
+ this.containersToBeRemovedFromNM = new ArrayList<ContainerId>();
for (ContainerIdProto c : list) {
- this.finishedContainersPulledByAM.add(convertFromProtoFormat(c));
+ this.containersToBeRemovedFromNM.add(convertFromProtoFormat(c));
}
}
@@ -245,12 +246,12 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
}
@Override
- public void addFinishedContainersPulledByAM(
- final List<ContainerId> finishedContainersPulledByAM) {
- if (finishedContainersPulledByAM == null)
+ public void
+ addContainersToBeRemovedFromNM(final List<ContainerId> containers) {
+ if (containers == null)
return;
- initFinishedContainersPulledByAM();
- this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM);
+ initContainersToBeRemovedFromNM();
+ this.containersToBeRemovedFromNM.addAll(containers);
}
private void addContainersToCleanupToProto() {
@@ -288,10 +289,10 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
builder.addAllContainersToCleanup(iterable);
}
- private void addFinishedContainersPulledByAMToProto() {
+ private void addContainersToBeRemovedFromNMToProto() {
maybeInitBuilder();
- builder.clearFinishedContainersPulledByAm();
- if (finishedContainersPulledByAM == null)
+ builder.clearContainersToBeRemovedFromNm();
+ if (containersToBeRemovedFromNM == null)
return;
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
@@ -299,7 +300,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
public Iterator<ContainerIdProto> iterator() {
return new Iterator<ContainerIdProto>() {
- Iterator<ContainerId> iter = finishedContainersPulledByAM.iterator();
+ Iterator<ContainerId> iter = containersToBeRemovedFromNM.iterator();
@Override
public boolean hasNext() {
@@ -320,7 +321,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
}
};
- builder.addAllFinishedContainersPulledByAm(iterable);
+ builder.addAllContainersToBeRemovedFromNm(iterable);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 600f54d..d0990fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -58,7 +58,7 @@ message NodeHeartbeatResponseProto {
repeated ApplicationIdProto applications_to_cleanup = 6;
optional int64 nextHeartBeatInterval = 7;
optional string diagnostics_message = 8;
- repeated ContainerIdProto finished_containers_pulled_by_am = 9;
+ repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
}
message NMContainerStatusProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index b4dcf1f..eecba39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -558,7 +558,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// when NM re-registers with RM.
// Only remove the cleanedup containers that are acked
removeCompletedContainersFromContext(response
- .getFinishedContainersPulledByAM());
+ .getContainersToBeRemovedFromNM());
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 8fb51a3..7837846 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -692,7 +692,7 @@ public class TestNodeStatusUpdater {
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, null, null, null, null, 1000L);
- nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM);
+ nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
return nhResponse;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index d75a871..fbcb7d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -687,20 +687,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// A new allocate means the AM received the previously sent
// finishedContainers. We can ack this to NM now
- for (NodeId nodeId:finishedContainersSentToAM.keySet()) {
-
- // Clear and get current values
- List<ContainerStatus> currentSentContainers =
- finishedContainersSentToAM
- .put(nodeId, new ArrayList<ContainerStatus>());
- List<ContainerId> containerIdList = new ArrayList<ContainerId>
- (currentSentContainers.size());
- for (ContainerStatus containerStatus:currentSentContainers) {
- containerIdList.add(containerStatus.getContainerId());
- }
- eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
- nodeId, containerIdList));
- }
+ sendFinishedContainersToNM();
// Mark every containerStatus as being sent to AM though we may return
// only the ones that belong to the current attempt
@@ -1592,14 +1579,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
- // Add all finished containers so that they can be acked to NM
- addJustFinishedContainer(appAttempt, containerFinishedEvent);
-
// Is this container the AmContainer? If the finished container is same as
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer != null
&& appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
+ appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
// Remember the follow up transition and save the final attempt state.
appAttempt.rememberTargetTransitionsAndStoreState(event,
@@ -1607,10 +1592,46 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
return RMAppAttemptState.FINAL_SAVING;
}
+ // Add all finished containers so that they can be acked to NM
+ addJustFinishedContainer(appAttempt, containerFinishedEvent);
return this.currentState;
}
}
+
+ // Ack NM to remove finished containers from context.
+ private void sendFinishedContainersToNM() {
+ for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
+
+ // Clear and get current values
+ List<ContainerStatus> currentSentContainers =
+ finishedContainersSentToAM.put(nodeId,
+ new ArrayList<ContainerStatus>());
+ List<ContainerId> containerIdList =
+ new ArrayList<ContainerId>(currentSentContainers.size());
+ for (ContainerStatus containerStatus : currentSentContainers) {
+ containerIdList.add(containerStatus.getContainerId());
+ }
+ eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
+ containerIdList));
+ }
+ }
+
+ // Add am container to the list so that am container instance will be
+ // removed from NMContext.
+ private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
+ RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
+ NodeId nodeId = containerFinishedEvent.getNodeId();
+ finishedContainersSentToAM.putIfAbsent(nodeId,
+ new ArrayList<ContainerStatus>());
+ appAttempt.finishedContainersSentToAM.get(nodeId).add(
+ containerFinishedEvent.getContainerStatus());
+ if (!appAttempt.getSubmissionContext()
+ .getKeepContainersAcrossApplicationAttempts()) {
+ appAttempt.sendFinishedContainersToNM();
+ }
+ }
+
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
@@ -1661,16 +1682,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
- // Add all finished containers so that they can be acked to NM.
- addJustFinishedContainer(appAttempt, containerFinishedEvent);
-
// Is this container the ApplicationMaster container?
if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
new FinalTransition(RMAppAttemptState.FINISHED).transition(
appAttempt, containerFinishedEvent);
+ appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
return RMAppAttemptState.FINISHED;
}
+ // Add all finished containers so that they can be acked to NM.
+ addJustFinishedContainer(appAttempt, containerFinishedEvent);
return RMAppAttemptState.FINISHING;
}
@@ -1686,14 +1707,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
- // Add all finished containers so that they can be acked to NM.
- addJustFinishedContainer(appAttempt, containerFinishedEvent);
-
// If this is the AM container, it means the AM container is finished,
// but we are not yet acknowledged that the final state has been saved.
// Thus, we still return FINAL_SAVING state here.
if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
+ appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
+
if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
|| appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
// ignore Container_Finished Event if we were supposed to reach
@@ -1708,6 +1728,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
return;
}
+
+ // Add all finished containers so that they can be acked to NM.
+ addJustFinishedContainer(appAttempt, containerFinishedEvent);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 1123a98..c960b50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -112,8 +112,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
new ContainerIdComparator());
- /* set of containers that were notified to AM about their completion */
- private final Set<ContainerId> finishedContainersPulledByAM =
+ /*
+ * set of containers to notify NM to remove them from its context. Currently,
+ * this includes containers that were notified to AM about their completion
+ */
+ private final Set<ContainerId> containersToBeRemovedFromNM =
new HashSet<ContainerId>();
/* the list of applications that have finished and need to be purged */
@@ -157,7 +160,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new FinishedContainersPulledByAMTransition())
+ new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
@@ -174,7 +177,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new FinishedContainersPulledByAMTransition())
+ new AddContainersToBeRemovedFromNMTransition())
//Transitions from LOST state
.addTransition(NodeState.LOST, NodeState.LOST,
@@ -182,7 +185,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new FinishedContainersPulledByAMTransition())
+ new AddContainersToBeRemovedFromNMTransition())
//Transitions from UNHEALTHY state
.addTransition(NodeState.UNHEALTHY,
@@ -208,7 +211,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
- new FinishedContainersPulledByAMTransition())
+ new AddContainersToBeRemovedFromNMTransition())
// create the topology tables
.installTopology();
@@ -382,11 +385,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
response.addAllContainersToCleanup(
new ArrayList<ContainerId>(this.containersToClean));
response.addAllApplicationsToCleanup(this.finishedApplications);
- response.addFinishedContainersPulledByAM(
- new ArrayList<ContainerId>(this.finishedContainersPulledByAM));
+ response.addContainersToBeRemovedFromNM(
+ new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
this.containersToClean.clear();
this.finishedApplications.clear();
- this.finishedContainersPulledByAM.clear();
+ this.containersToBeRemovedFromNM.clear();
} finally {
this.writeLock.unlock();
}
@@ -659,12 +662,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
- public static class FinishedContainersPulledByAMTransition implements
+ public static class AddContainersToBeRemovedFromNMTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
- rmNode.finishedContainersPulledByAM.addAll(((
+ rmNode.containersToBeRemovedFromNM.addAll(((
RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index ba592fc..fcb4e45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -98,9 +98,6 @@ public class TestAMRestart {
Thread.sleep(200);
}
- ContainerId amContainerId = ContainerId.newInstance(am1
- .getApplicationAttemptId(), 1);
-
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 =
@@ -199,15 +196,11 @@ public class TestAMRestart {
// completed containerId4 is also transferred to the new attempt.
RMAppAttempt newAttempt =
app1.getRMAppAttempt(am2.getApplicationAttemptId());
- // 4 containers finished, acquired/allocated/reserved/completed + AM
- // container.
- waitForContainersToFinish(5, newAttempt);
+ // 4 containers finished, acquired/allocated/reserved/completed.
+ waitForContainersToFinish(4, newAttempt);
boolean container3Exists = false, container4Exists = false, container5Exists =
- false, container6Exists = false, amContainerExists = false;
+ false, container6Exists = false;
for(ContainerStatus status : newAttempt.getJustFinishedContainers()) {
- if(status.getContainerId().equals(amContainerId)) {
- amContainerExists = true;
- }
if(status.getContainerId().equals(containerId3)) {
// containerId3 is the container ran by previous attempt but finished by the
// new attempt.
@@ -227,11 +220,8 @@ public class TestAMRestart {
container6Exists = true;
}
}
- Assert.assertTrue(amContainerExists);
- Assert.assertTrue(container3Exists);
- Assert.assertTrue(container4Exists);
- Assert.assertTrue(container5Exists);
- Assert.assertTrue(container6Exists);
+ Assert.assertTrue(container3Exists && container4Exists && container5Exists
+ && container6Exists);
// New SchedulerApplicationAttempt also has the containers info.
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
@@ -250,14 +240,14 @@ public class TestAMRestart {
// all 4 normal containers finished.
System.out.println("New attempt's just finished containers: "
+ newAttempt.getJustFinishedContainers());
- waitForContainersToFinish(6, newAttempt);
+ waitForContainersToFinish(5, newAttempt);
rm1.stop();
}
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
throws InterruptedException {
int count = 0;
- while (attempt.getJustFinishedContainers().size() < expectedNum
+ while (attempt.getJustFinishedContainers().size() != expectedNum
&& count < 500) {
Thread.sleep(100);
count++;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bbe0f1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 15028f9..7f27f4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -965,7 +965,7 @@ public class TestRMAppAttemptTransitions {
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
- assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -1003,7 +1003,7 @@ public class TestRMAppAttemptTransitions {
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.KILLED,
applicationAttempt.getAppAttemptState());
- assertEquals(1,applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -1192,7 +1192,7 @@ public class TestRMAppAttemptTransitions {
BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, "", 0), anyNodeId));
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
- diagnostics, 1, false);
+ diagnostics, 0, false);
}
// While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
@@ -1225,7 +1225,7 @@ public class TestRMAppAttemptTransitions {
// send attempt_saved
sendAttemptUpdateSavedEvent(applicationAttempt);
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
- diagnostics, 1, false);
+ diagnostics, 0, false);
}
// While attempt is at FINAL_SAVING, Expire event may come before
@@ -1381,13 +1381,13 @@ public class TestRMAppAttemptTransitions {
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
// failed attempt captured the container finished event.
- assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
ContainerStatus cs2 =
ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
ContainerState.COMPLETE, "", 0);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs2, anyNodeId));
- assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
boolean found = false;
for (ContainerStatus containerStatus:applicationAttempt
.getJustFinishedContainers()) {