You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/02/12 20:33:36 UTC
hadoop git commit: YARN-2079. Recover NonAggregatingLogHandler state
upon nodemanager restart. (Contributed by Jason Lowe)
Repository: hadoop
Updated Branches:
refs/heads/trunk 83be450ac -> 04f5ef18f
YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager restart. (Contributed by Jason Lowe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04f5ef18
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04f5ef18
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04f5ef18
Branch: refs/heads/trunk
Commit: 04f5ef18f7877ce30b12b1a3c1e851c420531b72
Parents: 83be450
Author: Junping Du <ju...@apache.org>
Authored: Thu Feb 12 11:46:47 2015 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Thu Feb 12 11:46:47 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../containermanager/ContainerManagerImpl.java | 4 +-
.../loghandler/NonAggregatingLogHandler.java | 63 ++++++++++++++--
.../recovery/NMLeveldbStateStoreService.java | 67 ++++++++++++++++-
.../recovery/NMNullStateStoreService.java | 16 ++++
.../recovery/NMStateStoreService.java | 35 +++++++++
.../yarn_server_nodemanager_recovery.proto | 4 +
.../TestNonAggregatingLogHandler.java | 79 ++++++++++++++++++--
.../recovery/NMMemoryStateStoreService.java | 79 +++++++++++++-------
.../TestNMLeveldbStateStoreService.java | 51 +++++++++++++
10 files changed, 362 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 47ccbe9..d1b684e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -274,6 +274,9 @@ Release 2.7.0 - UNRELEASED
YARN-3147. Clean up RM web proxy code. (Steve Loughran via xgong)
+ YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager
+ restart. (Jason Lowe via junping_du)
+
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index bb277d9..acac600 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -135,7 +135,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -362,7 +361,8 @@ public class ContainerManagerImpl extends CompositeService implements
deletionService, dirsHandler);
} else {
return new NonAggregatingLogHandler(this.dispatcher, deletionService,
- dirsHandler);
+ dirsHandler,
+ context.getNMStateStore());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 0422ef9..471e994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -62,15 +65,18 @@ public class NonAggregatingLogHandler extends AbstractService implements
private final Map<ApplicationId, String> appOwners;
private final LocalDirsHandlerService dirsHandler;
+ private final NMStateStoreService stateStore;
private long deleteDelaySeconds;
private ScheduledThreadPoolExecutor sched;
public NonAggregatingLogHandler(Dispatcher dispatcher,
- DeletionService delService, LocalDirsHandlerService dirsHandler) {
+ DeletionService delService, LocalDirsHandlerService dirsHandler,
+ NMStateStoreService stateStore) {
super(NonAggregatingLogHandler.class.getName());
this.dispatcher = dispatcher;
this.delService = delService;
this.dirsHandler = dirsHandler;
+ this.stateStore = stateStore;
this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
}
@@ -82,6 +88,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
sched = createScheduledThreadPoolExecutor(conf);
super.serviceInit(conf);
+ recover();
}
@Override
@@ -110,6 +117,31 @@ public class NonAggregatingLogHandler extends AbstractService implements
}
}
+ private void recover() throws IOException {
+ if (stateStore.canRecover()) {
+ RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
+ long now = System.currentTimeMillis();
+ for (Map.Entry<ApplicationId, LogDeleterProto> entry :
+ state.getLogDeleterMap().entrySet()) {
+ ApplicationId appId = entry.getKey();
+ LogDeleterProto proto = entry.getValue();
+ long deleteDelayMsec = proto.getDeletionTime() - now;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling deletion of " + appId + " logs in "
+ + deleteDelayMsec + " msec");
+ }
+ LogDeleterRunnable logDeleter =
+ new LogDeleterRunnable(proto.getUser(), appId);
+ try {
+ sched.schedule(logDeleter, deleteDelayMsec, TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException e) {
+ // Handling this event in local thread before starting threads
+ // or after calling sched.shutdownNow().
+ logDeleter.run();
+ }
+ }
+ }
+ }
@SuppressWarnings("unchecked")
@Override
@@ -130,13 +162,28 @@ public class NonAggregatingLogHandler extends AbstractService implements
case APPLICATION_FINISHED:
LogHandlerAppFinishedEvent appFinishedEvent =
(LogHandlerAppFinishedEvent) event;
+ ApplicationId appId = appFinishedEvent.getApplicationId();
// Schedule - so that logs are available on the UI till they're deleted.
LOG.info("Scheduling Log Deletion for application: "
- + appFinishedEvent.getApplicationId() + ", with delay of "
+ + appId + ", with delay of "
+ this.deleteDelaySeconds + " seconds");
- LogDeleterRunnable logDeleter =
- new LogDeleterRunnable(appOwners.remove(appFinishedEvent
- .getApplicationId()), appFinishedEvent.getApplicationId());
+ String user = appOwners.remove(appId);
+ if (user == null) {
+ LOG.error("Unable to locate user for " + appId);
+ break;
+ }
+ LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId);
+ long deletionTimestamp = System.currentTimeMillis()
+ + this.deleteDelaySeconds * 1000;
+ LogDeleterProto deleterProto = LogDeleterProto.newBuilder()
+ .setUser(user)
+ .setDeletionTime(deletionTimestamp)
+ .build();
+ try {
+ stateStore.storeLogDeleter(appId, deleterProto);
+ } catch (IOException e) {
+ LOG.error("Unable to record log deleter state", e);
+ }
try {
sched.schedule(logDeleter, this.deleteDelaySeconds,
TimeUnit.SECONDS);
@@ -198,6 +245,12 @@ public class NonAggregatingLogHandler extends AbstractService implements
NonAggregatingLogHandler.this.delService.delete(user, null,
(Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
}
+ try {
+ NonAggregatingLogHandler.this.stateStore.removeLogDeleter(
+ this.applicationId);
+ } catch (IOException e) {
+ LOG.error("Error removing log deletion state", e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 5f349db..df58182 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -115,6 +116,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
+ private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/";
+
private static final byte[] EMPTY_VALUE = new byte[0];
private DB db;
@@ -852,6 +855,69 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
@Override
+ public RecoveredLogDeleterState loadLogDeleterState() throws IOException {
+ RecoveredLogDeleterState state = new RecoveredLogDeleterState();
+ state.logDeleterMap = new HashMap<ApplicationId, LogDeleterProto>();
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(LOG_DELETER_KEY_PREFIX));
+ final int logDeleterKeyPrefixLength = LOG_DELETER_KEY_PREFIX.length();
+ while (iter.hasNext()) {
+ Entry<byte[], byte[]> entry = iter.next();
+ String fullKey = asString(entry.getKey());
+ if (!fullKey.startsWith(LOG_DELETER_KEY_PREFIX)) {
+ break;
+ }
+
+ String appIdStr = fullKey.substring(logDeleterKeyPrefixLength);
+ ApplicationId appId = null;
+ try {
+ appId = ConverterUtils.toApplicationId(appIdStr);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Skipping unknown log deleter key " + fullKey);
+ continue;
+ }
+
+ LogDeleterProto proto = LogDeleterProto.parseFrom(entry.getValue());
+ state.logDeleterMap.put(appId, proto);
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ return state;
+ }
+
+ @Override
+ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto)
+ throws IOException {
+ String key = getLogDeleterKey(appId);
+ try {
+ db.put(bytes(key), proto.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void removeLogDeleter(ApplicationId appId) throws IOException {
+ String key = getLogDeleterKey(appId);
+ try {
+ db.delete(bytes(key));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private String getLogDeleterKey(ApplicationId appId) {
+ return LOG_DELETER_KEY_PREFIX + appId;
+ }
+
+ @Override
protected void initStorage(Configuration conf)
throws IOException {
Path storeRoot = createStorageDir(conf);
@@ -966,5 +1032,4 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
+ getCurrentVersion() + ", but loading version " + loadedVersion);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 6646969..ab49543 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
// The state store to use when state isn't being stored
@@ -192,6 +193,21 @@ public class NMNullStateStoreService extends NMStateStoreService {
}
@Override
+ public RecoveredLogDeleterState loadLogDeleterState() throws IOException {
+ throw new UnsupportedOperationException(
+ "Recovery not supported by this state store");
+ }
+
+ @Override
+ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto)
+ throws IOException {
+ }
+
+ @Override
+ public void removeLogDeleter(ApplicationId appId) throws IOException {
+ }
+
+ @Override
protected void initStorage(Configuration conf) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index b6ca336..fa66349 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@Private
@@ -189,6 +190,14 @@ public abstract class NMStateStoreService extends AbstractService {
}
}
+ public static class RecoveredLogDeleterState {
+ Map<ApplicationId, LogDeleterProto> logDeleterMap;
+
+ public Map<ApplicationId, LogDeleterProto> getLogDeleterMap() {
+ return logDeleterMap;
+ }
+ }
+
/** Initialize the state storage */
@Override
public void serviceInit(Configuration conf) throws IOException {
@@ -459,6 +468,32 @@ public abstract class NMStateStoreService extends AbstractService {
throws IOException;
+ /**
+ * Load the state of log deleters
+ * @return recovered log deleter state
+ * @throws IOException
+ */
+ public abstract RecoveredLogDeleterState loadLogDeleterState()
+ throws IOException;
+
+ /**
+ * Store the state of a log deleter
+ * @param appId the application ID for the log deleter
+ * @param proto the serialized state of the log deleter
+ * @throws IOException
+ */
+ public abstract void storeLogDeleter(ApplicationId appId,
+ LogDeleterProto proto) throws IOException;
+
+ /**
+ * Remove the state of a log deleter
+ * @param appId the application ID for the log deleter
+ * @throws IOException
+ */
+ public abstract void removeLogDeleter(ApplicationId appId)
+ throws IOException;
+
+
protected abstract void initStorage(Configuration conf) throws IOException;
protected abstract void startStorage() throws IOException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index d8fdd8b..ade8c1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -47,3 +47,7 @@ message LocalizedResourceProto {
optional int64 size = 3;
}
+message LogDeleterProto {
+ optional string user = 1;
+ optional int64 deletionTime = 2;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
index d0f6472..0bab5ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -65,10 +67,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.mockito.exceptions.verification.WantedButNotInvoked;
@@ -123,7 +129,8 @@ public class TestNonAggregatingLogHandler {
dirsHandler.init(conf);
NonAggregatingLogHandler rawLogHandler =
- new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler);
+ new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler,
+ new NMNullStateStoreService());
NonAggregatingLogHandler logHandler = spy(rawLogHandler);
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
@@ -209,7 +216,8 @@ public class TestNonAggregatingLogHandler {
@Test
public void testStop() throws Exception {
NonAggregatingLogHandler aggregatingLogHandler =
- new NonAggregatingLogHandler(null, null, null);
+ new NonAggregatingLogHandler(null, null, null,
+ new NMNullStateStoreService());
// It should not throw NullPointerException
aggregatingLogHandler.stop();
@@ -232,7 +240,8 @@ public class TestNonAggregatingLogHandler {
NonAggregatingLogHandler aggregatingLogHandler =
new NonAggregatingLogHandler(new InlineDispatcher(),
delService,
- dirsHandler);
+ dirsHandler,
+ new NMNullStateStoreService());
dirsHandler.init(conf);
dirsHandler.start();
@@ -258,7 +267,13 @@ public class TestNonAggregatingLogHandler {
public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
DeletionService delService, LocalDirsHandlerService dirsHandler) {
- super(dispatcher, delService, dirsHandler);
+ this(dispatcher, delService, dirsHandler, new NMNullStateStoreService());
+ }
+
+ public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
+ DeletionService delService, LocalDirsHandlerService dirsHandler,
+ NMStateStoreService stateStore) {
+ super(dispatcher, delService, dirsHandler, stateStore);
}
@Override
@@ -303,7 +318,8 @@ public class TestNonAggregatingLogHandler {
LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
NonAggregatingLogHandler rawLogHandler =
- new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler);
+ new NonAggregatingLogHandler(dispatcher, mockDelService,
+ mockDirsHandler, new NMNullStateStoreService());
NonAggregatingLogHandler logHandler = spy(rawLogHandler);
AbstractFileSystem spylfs =
spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
@@ -316,7 +332,58 @@ public class TestNonAggregatingLogHandler {
mockDirsHandler, conf, spylfs, lfs, localLogDirs);
logHandler.close();
}
-
+
+ @Test
+ public void testRecovery() throws Exception {
+ File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
+ String localLogDirsString =
+ localLogDirs[0].getAbsolutePath() + ","
+ + localLogDirs[1].getAbsolutePath();
+
+ conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
+ YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
+
+ dirsHandler.init(conf);
+
+ NMStateStoreService stateStore = new NMMemoryStateStoreService();
+ stateStore.init(conf);
+ stateStore.start();
+ NonAggregatingLogHandlerWithMockExecutor logHandler =
+ new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService,
+ dirsHandler, stateStore);
+ logHandler.init(conf);
+ logHandler.start();
+
+ logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+ logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
+ logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+
+ // simulate a restart and verify deletion is rescheduled
+ logHandler.close();
+ logHandler = new NonAggregatingLogHandlerWithMockExecutor(dispatcher,
+ mockDelService, dirsHandler, stateStore);
+ logHandler.init(conf);
+ logHandler.start();
+ ArgumentCaptor<Runnable> schedArg = ArgumentCaptor.forClass(Runnable.class);
+ verify(logHandler.mockSched).schedule(schedArg.capture(),
+ anyLong(), eq(TimeUnit.MILLISECONDS));
+
+ // execute the runnable and verify another restart has nothing scheduled
+ schedArg.getValue().run();
+ logHandler.close();
+ logHandler = new NonAggregatingLogHandlerWithMockExecutor(dispatcher,
+ mockDelService, dirsHandler, stateStore);
+ logHandler.init(conf);
+ logHandler.start();
+ verify(logHandler.mockSched, never()).schedule(any(Runnable.class),
+ anyLong(), any(TimeUnit.class));
+ logHandler.close();
+ }
+
/**
* Function to run a log handler with directories failing the getFileStatus
* call. The function accepts the log handler, setup the mocks to fail with
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index d404091..e0487e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -48,6 +49,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
private RecoveredNMTokensState nmTokenState;
private RecoveredContainerTokensState containerTokenState;
+ private Map<ApplicationId, LogDeleterProto> logDeleterState;
public NMMemoryStateStoreService() {
super(NMMemoryStateStoreService.class.getName());
@@ -65,6 +67,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
trackerStates = new HashMap<TrackerKey, TrackerState>();
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
+ logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
}
@Override
@@ -77,7 +80,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
- public RecoveredApplicationsState loadApplicationsState()
+ public synchronized RecoveredApplicationsState loadApplicationsState()
throws IOException {
RecoveredApplicationsState state = new RecoveredApplicationsState();
state.applications = new ArrayList<ContainerManagerApplicationProto>(
@@ -87,7 +90,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeApplication(ApplicationId appId,
+ public synchronized void storeApplication(ApplicationId appId,
ContainerManagerApplicationProto proto) throws IOException {
ContainerManagerApplicationProto protoCopy =
ContainerManagerApplicationProto.parseFrom(proto.toByteString());
@@ -95,18 +98,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeFinishedApplication(ApplicationId appId) {
+ public synchronized void storeFinishedApplication(ApplicationId appId) {
finishedApps.add(appId);
}
@Override
- public void removeApplication(ApplicationId appId) throws IOException {
+ public synchronized void removeApplication(ApplicationId appId)
+ throws IOException {
apps.remove(appId);
finishedApps.remove(appId);
}
@Override
- public List<RecoveredContainerState> loadContainersState()
+ public synchronized List<RecoveredContainerState> loadContainersState()
throws IOException {
// return a copy so caller can't modify our state
List<RecoveredContainerState> result =
@@ -124,7 +128,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeContainer(ContainerId containerId,
+ public synchronized void storeContainer(ContainerId containerId,
StartContainerRequest startRequest) throws IOException {
RecoveredContainerState rcs = new RecoveredContainerState();
rcs.startRequest = startRequest;
@@ -132,14 +136,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeContainerDiagnostics(ContainerId containerId,
+ public synchronized void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.diagnostics = diagnostics.toString();
}
@Override
- public void storeContainerLaunched(ContainerId containerId)
+ public synchronized void storeContainerLaunched(ContainerId containerId)
throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
if (rcs.exitCode != ContainerExitStatus.INVALID) {
@@ -149,22 +153,23 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeContainerKilled(ContainerId containerId)
+ public synchronized void storeContainerKilled(ContainerId containerId)
throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.killed = true;
}
@Override
- public void storeContainerCompleted(ContainerId containerId, int exitCode)
- throws IOException {
+ public synchronized void storeContainerCompleted(ContainerId containerId,
+ int exitCode) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.status = RecoveredContainerStatus.COMPLETED;
rcs.exitCode = exitCode;
}
@Override
- public void removeContainer(ContainerId containerId) throws IOException {
+ public synchronized void removeContainer(ContainerId containerId)
+ throws IOException {
containerStates.remove(containerId);
}
@@ -252,7 +257,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
- public RecoveredDeletionServiceState loadDeletionServiceState()
+ public synchronized RecoveredDeletionServiceState loadDeletionServiceState()
throws IOException {
RecoveredDeletionServiceState result =
new RecoveredDeletionServiceState();
@@ -274,7 +279,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
- public RecoveredNMTokensState loadNMTokensState() throws IOException {
+ public synchronized RecoveredNMTokensState loadNMTokensState()
+ throws IOException {
// return a copy so caller can't modify our state
RecoveredNMTokensState result = new RecoveredNMTokensState();
result.currentMasterKey = nmTokenState.currentMasterKey;
@@ -286,36 +292,36 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeNMTokenCurrentMasterKey(MasterKey key)
+ public synchronized void storeNMTokenCurrentMasterKey(MasterKey key)
throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto());
}
@Override
- public void storeNMTokenPreviousMasterKey(MasterKey key)
+ public synchronized void storeNMTokenPreviousMasterKey(MasterKey key)
throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto());
}
@Override
- public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
- MasterKey key) throws IOException {
+ public synchronized void storeNMTokenApplicationMasterKey(
+ ApplicationAttemptId attempt, MasterKey key) throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
nmTokenState.applicationMasterKeys.put(attempt,
new MasterKeyPBImpl(keypb.getProto()));
}
@Override
- public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
- throws IOException {
+ public synchronized void removeNMTokenApplicationMasterKey(
+ ApplicationAttemptId attempt) throws IOException {
nmTokenState.applicationMasterKeys.remove(attempt);
}
@Override
- public RecoveredContainerTokensState loadContainerTokensState()
+ public synchronized RecoveredContainerTokensState loadContainerTokensState()
throws IOException {
// return a copy so caller can't modify our state
RecoveredContainerTokensState result =
@@ -328,7 +334,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeContainerTokenCurrentMasterKey(MasterKey key)
+ public synchronized void storeContainerTokenCurrentMasterKey(MasterKey key)
throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
containerTokenState.currentMasterKey =
@@ -336,7 +342,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeContainerTokenPreviousMasterKey(MasterKey key)
+ public synchronized void storeContainerTokenPreviousMasterKey(MasterKey key)
throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
containerTokenState.previousMasterKey =
@@ -344,18 +350,41 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public void storeContainerToken(ContainerId containerId,
+ public synchronized void storeContainerToken(ContainerId containerId,
Long expirationTime) throws IOException {
containerTokenState.activeTokens.put(containerId, expirationTime);
}
@Override
- public void removeContainerToken(ContainerId containerId)
+ public synchronized void removeContainerToken(ContainerId containerId)
throws IOException {
containerTokenState.activeTokens.remove(containerId);
}
+ @Override
+ public synchronized RecoveredLogDeleterState loadLogDeleterState()
+ throws IOException {
+ RecoveredLogDeleterState state = new RecoveredLogDeleterState();
+ state.logDeleterMap = new HashMap<ApplicationId,LogDeleterProto>(
+ logDeleterState);
+ return state;
+ }
+
+ @Override
+ public synchronized void storeLogDeleter(ApplicationId appId,
+ LogDeleterProto proto)
+ throws IOException {
+ logDeleterState.put(appId, proto);
+ }
+
+ @Override
+ public synchronized void removeLogDeleter(ApplicationId appId)
+ throws IOException {
+ logDeleterState.remove(appId);
+ }
+
+
private static class TrackerState {
Map<Path, LocalResourceProto> inProgressMap =
new HashMap<Path, LocalResourceProto>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index f7f43cc..1804424 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
@@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.records.Version;
@@ -831,6 +833,55 @@ public class TestNMLeveldbStateStoreService {
assertEquals(expTime3, loadedActiveTokens.get(cid3));
}
+ @Test
+ public void testLogDeleterStorage() throws IOException {
+ // test empty when no state
+ RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
+ assertTrue(state.getLogDeleterMap().isEmpty());
+
+ // store log deleter state
+ final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+ LogDeleterProto proto1 = LogDeleterProto.newBuilder()
+ .setUser("user1")
+ .setDeletionTime(1234)
+ .build();
+ stateStore.storeLogDeleter(appId1, proto1);
+
+ // restart state store and verify recovered
+ restartStateStore();
+ state = stateStore.loadLogDeleterState();
+ assertEquals(1, state.getLogDeleterMap().size());
+ assertEquals(proto1, state.getLogDeleterMap().get(appId1));
+
+ // store another log deleter
+ final ApplicationId appId2 = ApplicationId.newInstance(2, 2);
+ LogDeleterProto proto2 = LogDeleterProto.newBuilder()
+ .setUser("user2")
+ .setDeletionTime(5678)
+ .build();
+ stateStore.storeLogDeleter(appId2, proto2);
+
+ // restart state store and verify recovered
+ restartStateStore();
+ state = stateStore.loadLogDeleterState();
+ assertEquals(2, state.getLogDeleterMap().size());
+ assertEquals(proto1, state.getLogDeleterMap().get(appId1));
+ assertEquals(proto2, state.getLogDeleterMap().get(appId2));
+
+ // remove a deleter and verify removed after restart and recovery
+ stateStore.removeLogDeleter(appId1);
+ restartStateStore();
+ state = stateStore.loadLogDeleterState();
+ assertEquals(1, state.getLogDeleterMap().size());
+ assertEquals(proto2, state.getLogDeleterMap().get(appId2));
+
+ // remove last deleter and verify empty after restart and recovery
+ stateStore.removeLogDeleter(appId2);
+ restartStateStore();
+ state = stateStore.loadLogDeleterState();
+ assertTrue(state.getLogDeleterMap().isEmpty());
+ }
+
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {