You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/12/30 00:59:25 UTC
hadoop git commit: YARN-3480. Remove attempts that are beyond
max-attempt limit from state store. Contributed by Jun Gong
Repository: hadoop
Updated Branches:
refs/heads/trunk 84a814779 -> 527341341
YARN-3480. Remove attempts that are beyond max-attempt limit from state store. Contributed by Jun Gong
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52734134
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52734134
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52734134
Branch: refs/heads/trunk
Commit: 52734134116eb4b18686e308d00e71e7e903383e
Parents: 84a8147
Author: Jian He <ji...@apache.org>
Authored: Tue Dec 29 15:58:39 2015 -0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Dec 29 15:58:39 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../server/resourcemanager/ResourceManager.java | 27 ++++++++++
.../resourcemanager/ResourceTrackerService.java | 5 ++
.../recovery/FileSystemRMStateStore.java | 12 +++++
.../recovery/LeveldbRMStateStore.java | 32 +++++++++++
.../recovery/MemoryRMStateStore.java | 13 +++++
.../recovery/NullRMStateStore.java | 6 +++
.../resourcemanager/recovery/RMStateStore.java | 53 ++++++++++++++++++
.../recovery/RMStateStoreEventType.java | 1 +
.../RMStateStoreRemoveAppAttemptEvent.java | 37 +++++++++++++
.../recovery/ZKRMStateStore.java | 16 ++++++
.../recovery/records/ApplicationStateData.java | 10 ++++
.../server/resourcemanager/rmapp/RMAppImpl.java | 25 ++++++++-
.../applicationsmanager/TestAMRestart.java | 6 +++
.../recovery/RMStateStoreTestBase.java | 57 +++++++++++++++++---
.../recovery/TestFSRMStateStore.java | 17 ++++++
.../recovery/TestLeveldbRMStateStore.java | 16 ++++++
.../recovery/TestZKRMStateStore.java | 12 +++++
18 files changed, 341 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index de17084..ecd7c9c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -79,6 +79,9 @@ Release 2.9.0 - UNRELEASED
YARN-4417. Make RM and Timeline-server REST APIs more consistent.
(wtan via jianhe)
+ YARN-3480. Remove attempts that are beyond max-attempt limit from state
+ store. (Jun Gong via jianhe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d392410..aada69f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -817,6 +817,33 @@ public class ResourceManager extends CompositeService implements Recoverable {
LOG.error("Error in handling event type " + event.getType()
+ " for applicationAttempt " + appAttemptId, t);
}
+ } else if (rmApp.getApplicationSubmissionContext() != null
+ && rmApp.getApplicationSubmissionContext()
+ .getKeepContainersAcrossApplicationAttempts()
+ && event.getType() == RMAppAttemptEventType.CONTAINER_FINISHED) {
+ // For work-preserving AM restart, failed attempts are still
+ // capturing CONTAINER_FINISHED events and record the finished
+ // containers which will be used by current attempt.
+ // We just keep 'yarn.resourcemanager.am.max-attempts' in
+ // RMStateStore. If the finished container's attempt is deleted, we
+ // use the first attempt in app.attempts to deal with these events.
+
+ RMAppAttempt previousFailedAttempt =
+ rmApp.getAppAttempts().values().iterator().next();
+ if (previousFailedAttempt != null) {
+ try {
+ LOG.debug("Event " + event.getType() + " handled by "
+ + previousFailedAttempt);
+ previousFailedAttempt.handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " for applicationAttempt " + appAttemptId
+ + " with " + previousFailedAttempt, t);
+ }
+ } else {
+ LOG.error("Event " + event.getType()
+ + " not handled, because previousFailedAttempt is null");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index bd24b25..902244b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -230,6 +230,11 @@ public class ResourceTrackerService extends AbstractService implements
}
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
+ if (rmAppAttempt == null) {
+ LOG.info("Ignoring not found attempt " + appAttemptId);
+ return;
+ }
+
Container masterContainer = rmAppAttempt.getMasterContainer();
if (masterContainer.getId().equals(containerStatus.getContainerId())
&& containerStatus.getContainerState() == ContainerState.COMPLETE) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index a1cebf5..021ca36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -482,6 +482,18 @@ public class FileSystemRMStateStore extends RMStateStore {
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId appAttemptId)
+ throws Exception {
+ Path appDirPath =
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId());
+ Path nodeRemovePath = getNodePath(appDirPath, appAttemptId.toString());
+ LOG.info("Removing info for attempt: " + appAttemptId + " at: "
+ + nodeRemovePath);
+ deleteFileWithRetries(nodeRemovePath);
+ }
+
+ @Override
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState)
throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index afc6721..7ec94fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -500,6 +500,22 @@ public class LeveldbRMStateStore extends RMStateStore {
return createApplicationState(appId.toString(), data);
}
+ @VisibleForTesting
+ ApplicationAttemptStateData loadRMAppAttemptState(
+ ApplicationAttemptId attemptId) throws IOException {
+ String attemptKey = getApplicationAttemptNodeKey(attemptId);
+ byte[] data = null;
+ try {
+ data = db.get(bytes(attemptKey));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ if (data == null) {
+ return null;
+ }
+ return createAttemptState(attemptId.toString(), data);
+ }
+
private ApplicationAttemptStateData createAttemptState(String itemName,
byte[] data) throws IOException {
ApplicationAttemptId attemptId =
@@ -575,6 +591,22 @@ public class LeveldbRMStateStore extends RMStateStore {
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId attemptId)
+ throws IOException {
+ String attemptKey = getApplicationAttemptNodeKey(attemptId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing state for attempt " + attemptId + " at "
+ + attemptKey);
+ }
+ try {
+ db.delete(bytes(attemptKey));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
protected void removeApplicationStateInternal(ApplicationStateData appState)
throws IOException {
ApplicationId appId =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index ce6addb..caaea7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -142,6 +142,19 @@ public class MemoryRMStateStore extends RMStateStore {
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId appAttemptId) throws Exception {
+ ApplicationStateData appState =
+ state.getApplicationState().get(appAttemptId.getApplicationId());
+ ApplicationAttemptStateData attemptState =
+ appState.attempts.remove(appAttemptId);
+ LOG.info("Removing state for attempt: " + appAttemptId);
+ if (attemptState == null) {
+ throw new YarnRuntimeException("Application doesn't exist");
+ }
+ }
+
+ @Override
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception {
ApplicationId appId =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
index 96f77f5..f6fd6fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
@@ -132,6 +132,12 @@ public class NullRMStateStore extends RMStateStore {
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId attemptId) throws Exception {
+ // Do nothing
+ }
+
+ @Override
public void checkVersion() throws Exception {
// Do nothing
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index ec42cbe..ae17aaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -137,6 +137,10 @@ public abstract class RMStateStore extends AbstractService {
new UpdateAppAttemptTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
+ RMStateStoreEventType.REMOVE_APP_ATTEMPT,
+ new RemoveAppAttemptTransition())
+ .addTransition(RMStateStoreState.ACTIVE,
+ EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_MASTERKEY,
new StoreRMDTMasterKeyTransition())
.addTransition(RMStateStoreState.ACTIVE,
@@ -552,6 +556,32 @@ public abstract class RMStateStore extends AbstractService {
return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
}
+ private static class RemoveAppAttemptTransition implements
+ MultipleArcTransition<RMStateStore, RMStateStoreEvent,
+ RMStateStoreState> {
+ @Override
+ public RMStateStoreState transition(RMStateStore store,
+ RMStateStoreEvent event) {
+ if (!(event instanceof RMStateStoreRemoveAppAttemptEvent)) {
+ // should never happen
+ LOG.error("Illegal event type: " + event.getClass());
+ return RMStateStoreState.ACTIVE;
+ }
+ boolean isFenced = false;
+ ApplicationAttemptId attemptId =
+ ((RMStateStoreRemoveAppAttemptEvent) event).getApplicationAttemptId();
+ ApplicationId appId = attemptId.getApplicationId();
+ LOG.info("Removing attempt " + attemptId + " from app: " + appId);
+ try {
+ store.removeApplicationAttemptInternal(attemptId);
+ } catch (Exception e) {
+ LOG.error("Error removing attempt: " + attemptId, e);
+ isFenced = store.notifyStoreOperationFailedInternal(e);
+ }
+ return finalState(isFenced);
+ }
+ }
+
public RMStateStore() {
super(RMStateStore.class.getName());
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -983,6 +1013,29 @@ public abstract class RMStateStore extends AbstractService {
protected abstract void removeApplicationStateInternal(
ApplicationStateData appState) throws Exception;
+ /**
+ * Non-blocking API
+ * ResourceManager services call this to remove an attempt from the state
+ * store
+ * This does not block the dispatcher threads
+ * There is no notification of completion for this operation.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void removeApplicationAttempt(
+ ApplicationAttemptId applicationAttemptId) {
+ dispatcher.getEventHandler().handle(
+ new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId));
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to remove the state of specified
+ * attempt.
+ */
+ protected abstract void removeApplicationAttemptInternal(
+ ApplicationAttemptId attemptId) throws Exception;
+
+
// TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See
// YARN-1779
public static final Text AM_RM_TOKEN_SERVICE = new Text(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
index 492826d..b34634d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
@@ -24,6 +24,7 @@ public enum RMStateStoreEventType {
UPDATE_APP,
UPDATE_APP_ATTEMPT,
REMOVE_APP,
+ REMOVE_APP_ATTEMPT,
FENCED,
// Below events should be called synchronously
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java
new file mode 100644
index 0000000..7455c39
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+
+/**
+ * A event used to remove an attempt.
+ */
+public class RMStateStoreRemoveAppAttemptEvent extends RMStateStoreEvent {
+ private ApplicationAttemptId applicationAttemptId;
+
+ RMStateStoreRemoveAppAttemptEvent(ApplicationAttemptId applicationAttemptId) {
+ super(RMStateStoreEventType.REMOVE_APP_ATTEMPT);
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index ca0f4ac..ddb8a0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -659,6 +659,22 @@ public class ZKRMStateStore extends RMStateStore {
}
@Override
+ public synchronized void removeApplicationAttemptInternal(
+ ApplicationAttemptId appAttemptId)
+ throws Exception {
+ String appId = appAttemptId.getApplicationId().toString();
+ String appIdRemovePath = getNodePath(rmAppRoot, appId);
+ String attemptIdRemovePath = getNodePath(appIdRemovePath,
+ appAttemptId.toString());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
+ + attemptIdRemovePath);
+ }
+ safeDelete(attemptIdRemovePath);
+ }
+
+ @Override
public synchronized void removeApplicationStateInternal(
ApplicationStateData appState)
throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
index 1d199ed..2348380 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
@@ -80,6 +80,16 @@ public abstract class ApplicationStateData {
return attempts.get(attemptId);
}
+ public int getFirstAttemptId() {
+ int min = Integer.MAX_VALUE;
+ for(ApplicationAttemptId attemptId : attempts.keySet()) {
+ if (attemptId.getAttemptId() < min) {
+ min = attemptId.getAttemptId();
+ }
+ }
+ return min == Integer.MAX_VALUE ? 1 : min;
+ }
+
public abstract ApplicationStateDataProto getProto();
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index c4c8d2e..f1d55a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -149,6 +149,8 @@ public class RMAppImpl implements RMApp, Recoverable {
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
+ private int firstAttemptIdInStateStore = 1;
+ private int nextAttemptId = 1;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@@ -809,6 +811,11 @@ public class RMAppImpl implements RMApp, Recoverable {
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
this.callerContext = appState.getCallerContext();
+ // If interval > 0, some attempts might have been deleted.
+ if (submissionContext.getAttemptFailuresValidityInterval() > 0) {
+ this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
+ this.nextAttemptId = firstAttemptIdInStateStore;
+ }
// send the ATS create Event
sendATSCreateEvent(this, this.startTime);
@@ -822,7 +829,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
+ ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
BlacklistManager currentAMBlacklist;
if (currentAttempt != null) {
@@ -1304,6 +1311,9 @@ public class RMAppImpl implements RMApp, Recoverable {
+ app.attemptFailuresValidityInterval + " milliseconds " : " ")
+ "is " + numberOfFailure + ". The max attempts is "
+ app.maxAppAttempts);
+
+ removeExcessAttempts(app);
+
if (!app.submissionContext.getUnmanagedAM()
&& numberOfFailure < app.maxAppAttempts) {
if (initialState.equals(RMAppState.KILLING)) {
@@ -1340,6 +1350,19 @@ public class RMAppImpl implements RMApp, Recoverable {
return RMAppState.FINAL_SAVING;
}
}
+
+ private void removeExcessAttempts(RMAppImpl app) {
+ while (app.nextAttemptId - app.firstAttemptIdInStateStore
+ > app.maxAppAttempts) {
+ // attempts' first element is oldest attempt because it is a
+ // LinkedHashMap
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
+ app.getApplicationId(), app.firstAttemptIdInStateStore);
+ app.firstAttemptIdInStateStore++;
+ LOG.info("Remove attempt from state store : " + attemptId);
+ app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 6732039..f1fe1ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -859,6 +859,9 @@ public class TestAMRestart {
@SuppressWarnings("resource")
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
+ ApplicationStateData app1State =
+ memStore.getState().getApplicationState().get(app1.getApplicationId());
+ Assert.assertEquals(1, app1State.getFirstAttemptId());
// re-register the NM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
@@ -871,6 +874,7 @@ public class TestAMRestart {
nm1.registerNode(Collections.singletonList(status), null);
rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED);
+ Assert.assertEquals(2, app1State.getAttemptCount());
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
@@ -884,6 +888,7 @@ public class TestAMRestart {
nm1
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am4.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertEquals(2, app1State.getAttemptCount());
// can launch the 5th attempt successfully
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
@@ -897,6 +902,7 @@ public class TestAMRestart {
nm1
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
am5.waitForState(RMAppAttemptState.FAILED);
+ Assert.assertEquals(2, app1State.getAttemptCount());
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
rm1.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 32824ef..2ddaec2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -134,6 +135,7 @@ public class RMStateStoreTestBase {
void writeVersion(Version version) throws Exception;
Version getCurrentVersion() throws Exception;
boolean appExists(RMApp app) throws Exception;
+ boolean attemptExists(RMAppAttempt attempt) throws Exception;
}
void waitNotify(TestDispatcher dispatcher) {
@@ -172,7 +174,7 @@ public class RMStateStoreTestBase {
return mockApp;
}
- protected ContainerId storeAttempt(RMStateStore store,
+ protected RMAppAttempt storeAttempt(RMStateStore store,
ApplicationAttemptId attemptId,
String containerIdStr, Token<AMRMTokenIdentifier> appToken,
SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
@@ -195,7 +197,7 @@ public class RMStateStoreTestBase {
dispatcher.attemptId = attemptId;
store.storeNewApplicationAttempt(mockAttempt);
waitNotify(dispatcher);
- return container.getId();
+ return mockAttempt;
}
void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
@@ -238,8 +240,9 @@ public class RMStateStoreTestBase {
clientToAMTokenMgr.createMasterKey(attemptId1);
ContainerId containerId1 = storeAttempt(store, attemptId1,
- "container_1352994193343_0001_01_000001",
- appAttemptToken1, clientTokenKey1, dispatcher);
+ "container_1352994193343_0001_01_000001",
+ appAttemptToken1, clientTokenKey1, dispatcher)
+ .getMasterContainer().getId();
String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
ApplicationAttemptId attemptId2 =
@@ -252,8 +255,9 @@ public class RMStateStoreTestBase {
clientToAMTokenMgr.createMasterKey(attemptId2);
ContainerId containerId2 = storeAttempt(store, attemptId2,
- "container_1352994193343_0001_02_000001",
- appAttemptToken2, clientTokenKey2, dispatcher);
+ "container_1352994193343_0001_02_000001",
+ appAttemptToken2, clientTokenKey2, dispatcher)
+ .getMasterContainer().getId();
ApplicationAttemptId attemptIdRemoved = ConverterUtils
.toApplicationAttemptId("appattempt_1352994193343_0002_000001");
@@ -633,6 +637,47 @@ public class RMStateStoreTestBase {
Assert.assertTrue(stateStoreHelper.appExists(rmApp2));
}
+ public void testRemoveAttempt(RMStateStoreHelper stateStoreHelper)
+ throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ ApplicationId appId = ApplicationId.newInstance(1383183339, 6);
+ storeApp(store, appId, 123456, 564321);
+
+ ApplicationAttemptId attemptId1 =
+ ApplicationAttemptId.newInstance(appId, 1);
+ RMAppAttempt attempt1 = storeAttempt(store, attemptId1,
+ ContainerId.newContainerId(attemptId1, 1).toString(),
+ null, null, dispatcher);
+ ApplicationAttemptId attemptId2 =
+ ApplicationAttemptId.newInstance(appId, 2);
+ RMAppAttempt attempt2 = storeAttempt(store, attemptId2,
+ ContainerId.newContainerId(attemptId2, 1).toString(),
+ null, null, dispatcher);
+ store.removeApplicationAttemptInternal(attemptId1);
+ Assert.assertFalse(stateStoreHelper.attemptExists(attempt1));
+ Assert.assertTrue(stateStoreHelper.attemptExists(attempt2));
+
+ // let things settle down
+ Thread.sleep(1000);
+ store.close();
+
+ // load state
+ store = stateStoreHelper.getRMStateStore();
+ RMState state = store.loadState();
+ Map<ApplicationId, ApplicationStateData> rmAppState =
+ state.getApplicationState();
+
+ ApplicationStateData appState = rmAppState.get(appId);
+ // app is loaded
+ assertNotNull(appState);
+ assertEquals(2, appState.getFirstAttemptId());
+ assertNull(appState.getAttempt(attemptId1));
+ assertNotNull(appState.getAttempt(attemptId2));
+ }
+
protected void modifyAppState() throws Exception {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index a2ff4b3..a51ccb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -88,6 +89,12 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
Path appDir = new Path(appRootDir, appId);
return appDir;
}
+
+ public Path getAttemptDir(String appId, String attemptId) {
+ Path appDir = getAppDir(appId);
+ Path attemptDir = new Path(appDir, attemptId);
+ return attemptDir;
+ }
}
public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable) throws Exception {
@@ -151,6 +158,15 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
store.getAppDir(app.getApplicationId().toString());
return fs.exists(nodePath);
}
+
+ public boolean attemptExists(RMAppAttempt attempt) throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ ApplicationAttemptId attemptId = attempt.getAppAttemptId();
+ Path nodePath =
+ store.getAttemptDir(attemptId.getApplicationId().toString(),
+ attemptId.toString());
+ return fs.exists(nodePath);
+ }
}
@Test(timeout = 60000)
@@ -185,6 +201,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
testAppDeletion(fsTester);
testDeleteStore(fsTester);
testRemoveApplication(fsTester);
+ testRemoveAttempt(fsTester);
testAMRMTokenSecretManagerStateStore(fsTester);
testReservationStateStore(fsTester);
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
index 4666142..ce186e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -97,6 +98,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
}
@Test(timeout = 60000)
+ public void testRemoveAttempt() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testRemoveAttempt(tester);
+ }
+
+ @Test(timeout = 60000)
public void testAMTokens() throws Exception {
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
testAMRMTokenSecretManagerStateStore(tester);
@@ -147,5 +154,14 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
}
return stateStore.loadRMAppState(app.getApplicationId()) != null;
}
+
+ @Override
+ public boolean attemptExists(RMAppAttempt attempt) throws Exception {
+ if (stateStore.isClosed()) {
+ getRMStateStore();
+ }
+ return stateStore.loadRMAppAttemptState(attempt.getAppAttemptId())
+ != null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52734134/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 66b023c..406fcf6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -123,6 +123,10 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
+ appId;
}
+ public String getAttemptNode(String appId, String attemptId) {
+ return getAppNode(appId) + "/" + attemptId;
+ }
+
/**
* Emulating retrying createRootDir not to raise NodeExist exception
* @throws Exception
@@ -165,6 +169,13 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
return null != curatorFramework.checkExists()
.forPath(store.getAppNode(app.getApplicationId().toString()));
}
+
+ public boolean attemptExists(RMAppAttempt attempt) throws Exception {
+ ApplicationAttemptId attemptId = attempt.getAppAttemptId();
+ return null != curatorFramework.checkExists()
+ .forPath(store.getAttemptNode(
+ attemptId.getApplicationId().toString(), attemptId.toString()));
+ }
}
@Test (timeout = 60000)
@@ -177,6 +188,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
testAppDeletion(zkTester);
testDeleteStore(zkTester);
testRemoveApplication(zkTester);
+ testRemoveAttempt(zkTester);
testAMRMTokenSecretManagerStateStore(zkTester);
testReservationStateStore(zkTester);
((TestZKRMStateStoreTester.TestZKRMStateStoreInternal)