You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/04/24 21:14:04 UTC
hadoop git commit: YARN-3387. Previous AM's container completed
status couldn't pass to current AM if AM and RM restarted during the same
time. Contributed by Sandflee
Repository: hadoop
Updated Branches:
refs/heads/trunk c7d9ad68e -> d03dcb963
YARN-3387. Previous AM's container completed status couldn't pass to current AM if AM and RM restarted during the same time. Contributed by Sandflee
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d03dcb96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d03dcb96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d03dcb96
Branch: refs/heads/trunk
Commit: d03dcb9635dbd79a45d229d1cab5fd28e5e49f49
Parents: c7d9ad6
Author: Jian He <ji...@apache.org>
Authored: Fri Apr 24 12:12:28 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Fri Apr 24 12:13:29 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/resourcemanager/rmapp/RMAppImpl.java | 2 +-
.../rmapp/attempt/RMAppAttemptImpl.java | 9 ++-
.../TestWorkPreservingRMRestart.java | 60 ++++++++++++++++++++
4 files changed, 72 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d03dcb96/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3311a2e..19e3e27 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -262,6 +262,9 @@ Release 2.8.0 - UNRELEASED
YARN-3516. killing ContainerLocalizer action doesn't take effect when
private localizer receives FETCH_FAILURE status.(zhihai xu via xgong)
+ YARN-3387. Previous AM's container completed status couldn't pass to current
+ AM if AM and RM restarted during the same time. (sandflee via jianhe)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d03dcb96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index b4e4965..8abc478 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1273,7 +1273,7 @@ public class RMAppImpl implements RMApp, Recoverable {
// finished containers so that they can be acked to NM,
// but when pulling finished container we will check this flag again.
((RMAppAttemptImpl) app.currentAttempt)
- .transferStateFromPreviousAttempt(oldAttempt);
+ .transferStateFromAttempt(oldAttempt);
return initialState;
} else {
if (numberOfFailure >= app.maxAppAttempts) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d03dcb96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 913d06b..8abc65a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -845,7 +845,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
attemptState.getMemorySeconds(),attemptState.getVcoreSeconds());
}
- public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
+ public void transferStateFromAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainersReference();
this.finishedContainersSentToAM =
attempt.getFinishedContainersSentToAMReference();
@@ -1044,6 +1044,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.progress = 1.0f;
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
appAttempt.getAppAttemptId().getApplicationId());
+
+ if (appAttempt.submissionContext
+ .getKeepContainersAcrossApplicationAttempts()
+ && !appAttempt.submissionContext.getUnmanagedAM()
+ && rmApp.getCurrentAppAttempt() != appAttempt) {
+ appAttempt.transferStateFromAttempt(rmApp.getCurrentAppAttempt());
+ }
// We will replay the final attempt only if last attempt is in final
// state but application is not in final state.
if (rmApp.getCurrentAppAttempt() == appAttempt
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d03dcb96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 0566f3d..c6fe371 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -1037,4 +1037,64 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.start();
}
+
+ @Test(timeout = 20000)
+ public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // submit app with keepContainersAcrossApplicationAttempts true
+ RMApp app0 = rm1.submitApp(200, "", UserGroupInformation.getCurrentUser()
+ .getShortUserName(), null, false, null, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS,
+ null, null, true, true, false, null, 0, null, true);
+ MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+ am0.allocate("127.0.0.1", 1000, 2, new ArrayList<ContainerId>());
+ nm1.nodeHeartbeat(true);
+ List<Container> conts = am0.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers();
+ while (conts.size() == 0) {
+ nm1.nodeHeartbeat(true);
+ conts.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers());
+ Thread.sleep(500);
+ }
+
+ // am failed,and relaunch it
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am1 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+ // rm failover
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ // container launched by first am completed
+ NMContainerStatus amContainer =
+ TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 1,
+ ContainerState.RUNNING);
+ NMContainerStatus completedContainer=
+ TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 2,
+ ContainerState.COMPLETE);
+ NMContainerStatus runningContainer =
+ TestRMRestart.createNMContainerStatus(am0.getApplicationAttemptId(), 3,
+ ContainerState.RUNNING);
+ nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+ completedContainer), null);
+ Thread.sleep(200);
+
+ // check whether current am could get containerCompleteMsg
+ RMApp recoveredApp0 =
+ rm2.getRMContext().getRMApps().get(app0.getApplicationId());
+ RMAppAttempt loadedAttempt1 = recoveredApp0.getCurrentAppAttempt();
+ assertEquals(1,loadedAttempt1.getJustFinishedContainers().size());
+ }
+
}