You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/02/12 20:33:36 UTC

hadoop git commit: YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager restart. (Contributed by Jason Lowe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 83be450ac -> 04f5ef18f


YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager restart. (Contributed by Jason Lowe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/04f5ef18
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/04f5ef18
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/04f5ef18

Branch: refs/heads/trunk
Commit: 04f5ef18f7877ce30b12b1a3c1e851c420531b72
Parents: 83be450
Author: Junping Du <ju...@apache.org>
Authored: Thu Feb 12 11:46:47 2015 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Thu Feb 12 11:46:47 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../containermanager/ContainerManagerImpl.java  |  4 +-
 .../loghandler/NonAggregatingLogHandler.java    | 63 ++++++++++++++--
 .../recovery/NMLeveldbStateStoreService.java    | 67 ++++++++++++++++-
 .../recovery/NMNullStateStoreService.java       | 16 ++++
 .../recovery/NMStateStoreService.java           | 35 +++++++++
 .../yarn_server_nodemanager_recovery.proto      |  4 +
 .../TestNonAggregatingLogHandler.java           | 79 ++++++++++++++++++--
 .../recovery/NMMemoryStateStoreService.java     | 79 +++++++++++++-------
 .../TestNMLeveldbStateStoreService.java         | 51 +++++++++++++
 10 files changed, 362 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 47ccbe9..d1b684e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -274,6 +274,9 @@ Release 2.7.0 - UNRELEASED
 
     YARN-3147. Clean up RM web proxy code. (Steve Loughran via xgong)
 
+    YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager
+    restart. (Jason Lowe via junping_du) 
+
   OPTIMIZATIONS
 
     YARN-2990. FairScheduler's delay-scheduling always waits for node-local and 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index bb277d9..acac600 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -135,7 +135,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -362,7 +361,8 @@ public class ContainerManagerImpl extends CompositeService implements
           deletionService, dirsHandler);
     } else {
       return new NonAggregatingLogHandler(this.dispatcher, deletionService,
-                                          dirsHandler);
+                                          dirsHandler,
+                                          context.getNMStateStore());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 0422ef9..471e994 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -62,15 +65,18 @@ public class NonAggregatingLogHandler extends AbstractService implements
   private final Map<ApplicationId, String> appOwners;
 
   private final LocalDirsHandlerService dirsHandler;
+  private final NMStateStoreService stateStore;
   private long deleteDelaySeconds;
   private ScheduledThreadPoolExecutor sched;
 
   public NonAggregatingLogHandler(Dispatcher dispatcher,
-      DeletionService delService, LocalDirsHandlerService dirsHandler) {
+      DeletionService delService, LocalDirsHandlerService dirsHandler,
+      NMStateStoreService stateStore) {
     super(NonAggregatingLogHandler.class.getName());
     this.dispatcher = dispatcher;
     this.delService = delService;
     this.dirsHandler = dirsHandler;
+    this.stateStore = stateStore;
     this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
   }
 
@@ -82,6 +88,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
                 YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
     sched = createScheduledThreadPoolExecutor(conf);
     super.serviceInit(conf);
+    recover();
   }
 
   @Override
@@ -110,6 +117,31 @@ public class NonAggregatingLogHandler extends AbstractService implements
     }
   }
 
+  private void recover() throws IOException {
+    if (stateStore.canRecover()) {
+      RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
+      long now = System.currentTimeMillis();
+      for (Map.Entry<ApplicationId, LogDeleterProto> entry :
+        state.getLogDeleterMap().entrySet()) {
+        ApplicationId appId = entry.getKey();
+        LogDeleterProto proto = entry.getValue();
+        long deleteDelayMsec = proto.getDeletionTime() - now;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Scheduling deletion of " + appId + " logs in "
+              + deleteDelayMsec + " msec");
+        }
+        LogDeleterRunnable logDeleter =
+            new LogDeleterRunnable(proto.getUser(), appId);
+        try {
+          sched.schedule(logDeleter, deleteDelayMsec, TimeUnit.MILLISECONDS);
+        } catch (RejectedExecutionException e) {
+          // Handling this event in local thread before starting threads
+          // or after calling sched.shutdownNow().
+          logDeleter.run();
+        }
+      }
+    }
+  }
 
   @SuppressWarnings("unchecked")
   @Override
@@ -130,13 +162,28 @@ public class NonAggregatingLogHandler extends AbstractService implements
       case APPLICATION_FINISHED:
         LogHandlerAppFinishedEvent appFinishedEvent =
             (LogHandlerAppFinishedEvent) event;
+        ApplicationId appId = appFinishedEvent.getApplicationId();
         // Schedule - so that logs are available on the UI till they're deleted.
         LOG.info("Scheduling Log Deletion for application: "
-            + appFinishedEvent.getApplicationId() + ", with delay of "
+            + appId + ", with delay of "
             + this.deleteDelaySeconds + " seconds");
-        LogDeleterRunnable logDeleter =
-            new LogDeleterRunnable(appOwners.remove(appFinishedEvent
-                  .getApplicationId()), appFinishedEvent.getApplicationId());
+        String user = appOwners.remove(appId);
+        if (user == null) {
+          LOG.error("Unable to locate user for " + appId);
+          break;
+        }
+        LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId);
+        long deletionTimestamp = System.currentTimeMillis()
+            + this.deleteDelaySeconds * 1000;
+        LogDeleterProto deleterProto = LogDeleterProto.newBuilder()
+            .setUser(user)
+            .setDeletionTime(deletionTimestamp)
+            .build();
+        try {
+          stateStore.storeLogDeleter(appId, deleterProto);
+        } catch (IOException e) {
+          LOG.error("Unable to record log deleter state", e);
+        }
         try {
           sched.schedule(logDeleter, this.deleteDelaySeconds,
               TimeUnit.SECONDS);
@@ -198,6 +245,12 @@ public class NonAggregatingLogHandler extends AbstractService implements
         NonAggregatingLogHandler.this.delService.delete(user, null,
           (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
       }
+      try {
+        NonAggregatingLogHandler.this.stateStore.removeLogDeleter(
+            this.applicationId);
+      } catch (IOException e) {
+        LOG.error("Error removing log deletion state", e);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 5f349db..df58182 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -115,6 +116,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
       CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
 
+  private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/";
+
   private static final byte[] EMPTY_VALUE = new byte[0];
 
   private DB db;
@@ -852,6 +855,69 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
 
   @Override
+  public RecoveredLogDeleterState loadLogDeleterState() throws IOException {
+    RecoveredLogDeleterState state = new RecoveredLogDeleterState();
+    state.logDeleterMap = new HashMap<ApplicationId, LogDeleterProto>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(LOG_DELETER_KEY_PREFIX));
+      final int logDeleterKeyPrefixLength = LOG_DELETER_KEY_PREFIX.length();
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String fullKey = asString(entry.getKey());
+        if (!fullKey.startsWith(LOG_DELETER_KEY_PREFIX)) {
+          break;
+        }
+
+        String appIdStr = fullKey.substring(logDeleterKeyPrefixLength);
+        ApplicationId appId = null;
+        try {
+          appId = ConverterUtils.toApplicationId(appIdStr);
+        } catch (IllegalArgumentException e) {
+          LOG.warn("Skipping unknown log deleter key " + fullKey);
+          continue;
+        }
+
+        LogDeleterProto proto = LogDeleterProto.parseFrom(entry.getValue());
+        state.logDeleterMap.put(appId, proto);
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return state;
+  }
+
+  @Override
+  public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto)
+      throws IOException {
+    String key = getLogDeleterKey(appId);
+    try {
+      db.put(bytes(key), proto.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeLogDeleter(ApplicationId appId) throws IOException {
+    String key = getLogDeleterKey(appId);
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String getLogDeleterKey(ApplicationId appId) {
+    return LOG_DELETER_KEY_PREFIX + appId;
+  }
+
+  @Override
   protected void initStorage(Configuration conf)
       throws IOException {
     Path storeRoot = createStorageDir(conf);
@@ -966,5 +1032,4 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
             + getCurrentVersion() + ", but loading version " + loadedVersion);
     }
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 6646969..ab49543 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 // The state store to use when state isn't being stored
@@ -192,6 +193,21 @@ public class NMNullStateStoreService extends NMStateStoreService {
   }
 
   @Override
+  public RecoveredLogDeleterState loadLogDeleterState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto)
+      throws IOException {
+  }
+
+  @Override
+  public void removeLogDeleter(ApplicationId appId) throws IOException {
+  }
+
+  @Override
   protected void initStorage(Configuration conf) throws IOException {
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index b6ca336..fa66349 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 @Private
@@ -189,6 +190,14 @@ public abstract class NMStateStoreService extends AbstractService {
     }
   }
 
+  public static class RecoveredLogDeleterState {
+    Map<ApplicationId, LogDeleterProto> logDeleterMap;
+
+    public Map<ApplicationId, LogDeleterProto> getLogDeleterMap() {
+      return logDeleterMap;
+    }
+  }
+
   /** Initialize the state storage */
   @Override
   public void serviceInit(Configuration conf) throws IOException {
@@ -459,6 +468,32 @@ public abstract class NMStateStoreService extends AbstractService {
       throws IOException;
 
 
+  /**
+   * Load the state of log deleters
+   * @return recovered log deleter state
+   * @throws IOException
+   */
+  public abstract RecoveredLogDeleterState loadLogDeleterState()
+      throws IOException;
+
+  /**
+   * Store the state of a log deleter
+   * @param appId the application ID for the log deleter
+   * @param proto the serialized state of the log deleter
+   * @throws IOException
+   */
+  public abstract void storeLogDeleter(ApplicationId appId,
+      LogDeleterProto proto) throws IOException;
+
+  /**
+   * Remove the state of a log deleter
+   * @param appId the application ID for the log deleter
+   * @throws IOException
+   */
+  public abstract void removeLogDeleter(ApplicationId appId)
+      throws IOException;
+
+
   protected abstract void initStorage(Configuration conf) throws IOException;
 
   protected abstract void startStorage() throws IOException;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index d8fdd8b..ade8c1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -47,3 +47,7 @@ message LocalizedResourceProto {
   optional int64 size = 3;
 }
 
+message LogDeleterProto {
+  optional string user = 1;
+  optional int64 deletionTime = 2;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
index d0f6472..0bab5ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -18,10 +18,12 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -65,10 +67,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.mockito.exceptions.verification.WantedButNotInvoked;
@@ -123,7 +129,8 @@ public class TestNonAggregatingLogHandler {
     dirsHandler.init(conf);
 
     NonAggregatingLogHandler rawLogHandler =
-        new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler);
+        new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler,
+            new NMNullStateStoreService());
     NonAggregatingLogHandler logHandler = spy(rawLogHandler);
     AbstractFileSystem spylfs =
         spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
@@ -209,7 +216,8 @@ public class TestNonAggregatingLogHandler {
   @Test
   public void testStop() throws Exception {
     NonAggregatingLogHandler aggregatingLogHandler = 
-        new NonAggregatingLogHandler(null, null, null);
+        new NonAggregatingLogHandler(null, null, null,
+            new NMNullStateStoreService());
 
     // It should not throw NullPointerException
     aggregatingLogHandler.stop();
@@ -232,7 +240,8 @@ public class TestNonAggregatingLogHandler {
     NonAggregatingLogHandler aggregatingLogHandler =
         new NonAggregatingLogHandler(new InlineDispatcher(),
             delService,
-            dirsHandler);
+            dirsHandler,
+            new NMNullStateStoreService());
 
     dirsHandler.init(conf);
     dirsHandler.start();
@@ -258,7 +267,13 @@ public class TestNonAggregatingLogHandler {
 
     public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
         DeletionService delService, LocalDirsHandlerService dirsHandler) {
-      super(dispatcher, delService, dirsHandler);
+      this(dispatcher, delService, dirsHandler, new NMNullStateStoreService());
+    }
+
+    public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
+        DeletionService delService, LocalDirsHandlerService dirsHandler,
+        NMStateStoreService stateStore) {
+      super(dispatcher, delService, dirsHandler, stateStore);
     }
 
     @Override
@@ -303,7 +318,8 @@ public class TestNonAggregatingLogHandler {
     LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
 
     NonAggregatingLogHandler rawLogHandler =
-        new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler);
+        new NonAggregatingLogHandler(dispatcher, mockDelService,
+            mockDirsHandler, new NMNullStateStoreService());
     NonAggregatingLogHandler logHandler = spy(rawLogHandler);
     AbstractFileSystem spylfs =
         spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
@@ -316,7 +332,58 @@ public class TestNonAggregatingLogHandler {
       mockDirsHandler, conf, spylfs, lfs, localLogDirs);
     logHandler.close();
   }
-  
+
+  @Test
+  public void testRecovery() throws Exception {
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
+    String localLogDirsString =
+        localLogDirs[0].getAbsolutePath() + ","
+            + localLogDirs[1].getAbsolutePath();
+
+    conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
+            YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
+
+    dirsHandler.init(conf);
+
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    NonAggregatingLogHandlerWithMockExecutor logHandler =
+        new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService,
+                                                     dirsHandler, stateStore);
+    logHandler.init(conf);
+    logHandler.start();
+
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
+        ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+    logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+
+    // simulate a restart and verify deletion is rescheduled
+    logHandler.close();
+    logHandler = new NonAggregatingLogHandlerWithMockExecutor(dispatcher,
+        mockDelService, dirsHandler, stateStore);
+    logHandler.init(conf);
+    logHandler.start();
+    ArgumentCaptor<Runnable> schedArg = ArgumentCaptor.forClass(Runnable.class);
+    verify(logHandler.mockSched).schedule(schedArg.capture(),
+        anyLong(), eq(TimeUnit.MILLISECONDS));
+
+    // execute the runnable and verify another restart has nothing scheduled
+    schedArg.getValue().run();
+    logHandler.close();
+    logHandler = new NonAggregatingLogHandlerWithMockExecutor(dispatcher,
+        mockDelService, dirsHandler, stateStore);
+    logHandler.init(conf);
+    logHandler.start();
+    verify(logHandler.mockSched, never()).schedule(any(Runnable.class),
+        anyLong(), any(TimeUnit.class));
+    logHandler.close();
+   }
+
   /**
    * Function to run a log handler with directories failing the getFileStatus
    * call. The function accepts the log handler, setup the mocks to fail with

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index d404091..e0487e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
@@ -48,6 +49,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
   private RecoveredNMTokensState nmTokenState;
   private RecoveredContainerTokensState containerTokenState;
+  private Map<ApplicationId, LogDeleterProto> logDeleterState;
 
   public NMMemoryStateStoreService() {
     super(NMMemoryStateStoreService.class.getName());
@@ -65,6 +67,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
     trackerStates = new HashMap<TrackerKey, TrackerState>();
     deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
+    logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
   }
 
   @Override
@@ -77,7 +80,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
 
 
   @Override
-  public RecoveredApplicationsState loadApplicationsState()
+  public synchronized RecoveredApplicationsState loadApplicationsState()
       throws IOException {
     RecoveredApplicationsState state = new RecoveredApplicationsState();
     state.applications = new ArrayList<ContainerManagerApplicationProto>(
@@ -87,7 +90,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeApplication(ApplicationId appId,
+  public synchronized void storeApplication(ApplicationId appId,
       ContainerManagerApplicationProto proto) throws IOException {
     ContainerManagerApplicationProto protoCopy =
         ContainerManagerApplicationProto.parseFrom(proto.toByteString());
@@ -95,18 +98,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeFinishedApplication(ApplicationId appId) {
+  public synchronized void storeFinishedApplication(ApplicationId appId) {
     finishedApps.add(appId);
   }
 
   @Override
-  public void removeApplication(ApplicationId appId) throws IOException {
+  public synchronized void removeApplication(ApplicationId appId)
+      throws IOException {
     apps.remove(appId);
     finishedApps.remove(appId);
   }
 
   @Override
-  public List<RecoveredContainerState> loadContainersState()
+  public synchronized List<RecoveredContainerState> loadContainersState()
       throws IOException {
     // return a copy so caller can't modify our state
     List<RecoveredContainerState> result =
@@ -124,7 +128,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainer(ContainerId containerId,
+  public synchronized void storeContainer(ContainerId containerId,
       StartContainerRequest startRequest) throws IOException {
     RecoveredContainerState rcs = new RecoveredContainerState();
     rcs.startRequest = startRequest;
@@ -132,14 +136,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerDiagnostics(ContainerId containerId,
+  public synchronized void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
     rcs.diagnostics = diagnostics.toString();
   }
 
   @Override
-  public void storeContainerLaunched(ContainerId containerId)
+  public synchronized void storeContainerLaunched(ContainerId containerId)
       throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
     if (rcs.exitCode != ContainerExitStatus.INVALID) {
@@ -149,22 +153,23 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerKilled(ContainerId containerId)
+  public synchronized void storeContainerKilled(ContainerId containerId)
       throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
     rcs.killed = true;
   }
 
   @Override
-  public void storeContainerCompleted(ContainerId containerId, int exitCode)
-      throws IOException {
+  public synchronized void storeContainerCompleted(ContainerId containerId,
+      int exitCode) throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
     rcs.status = RecoveredContainerStatus.COMPLETED;
     rcs.exitCode = exitCode;
   }
 
   @Override
-  public void removeContainer(ContainerId containerId) throws IOException {
+  public synchronized void removeContainer(ContainerId containerId)
+      throws IOException {
     containerStates.remove(containerId);
   }
 
@@ -252,7 +257,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
 
 
   @Override
-  public RecoveredDeletionServiceState loadDeletionServiceState()
+  public synchronized RecoveredDeletionServiceState loadDeletionServiceState()
       throws IOException {
     RecoveredDeletionServiceState result =
         new RecoveredDeletionServiceState();
@@ -274,7 +279,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
 
 
   @Override
-  public RecoveredNMTokensState loadNMTokensState() throws IOException {
+  public synchronized RecoveredNMTokensState loadNMTokensState()
+      throws IOException {
     // return a copy so caller can't modify our state
     RecoveredNMTokensState result = new RecoveredNMTokensState();
     result.currentMasterKey = nmTokenState.currentMasterKey;
@@ -286,36 +292,36 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeNMTokenCurrentMasterKey(MasterKey key)
+  public synchronized void storeNMTokenCurrentMasterKey(MasterKey key)
       throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto());
   }
 
   @Override
-  public void storeNMTokenPreviousMasterKey(MasterKey key)
+  public synchronized void storeNMTokenPreviousMasterKey(MasterKey key)
       throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto());
   }
 
   @Override
-  public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
-      MasterKey key) throws IOException {
+  public synchronized void storeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt, MasterKey key) throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     nmTokenState.applicationMasterKeys.put(attempt,
         new MasterKeyPBImpl(keypb.getProto()));
   }
 
   @Override
-  public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
-      throws IOException {
+  public synchronized void removeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt) throws IOException {
     nmTokenState.applicationMasterKeys.remove(attempt);
   }
 
 
   @Override
-  public RecoveredContainerTokensState loadContainerTokensState()
+  public synchronized RecoveredContainerTokensState loadContainerTokensState()
       throws IOException {
     // return a copy so caller can't modify our state
     RecoveredContainerTokensState result =
@@ -328,7 +334,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerTokenCurrentMasterKey(MasterKey key)
+  public synchronized void storeContainerTokenCurrentMasterKey(MasterKey key)
       throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     containerTokenState.currentMasterKey =
@@ -336,7 +342,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerTokenPreviousMasterKey(MasterKey key)
+  public synchronized void storeContainerTokenPreviousMasterKey(MasterKey key)
       throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     containerTokenState.previousMasterKey =
@@ -344,18 +350,41 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerToken(ContainerId containerId,
+  public synchronized void storeContainerToken(ContainerId containerId,
       Long expirationTime) throws IOException {
     containerTokenState.activeTokens.put(containerId, expirationTime);
   }
 
   @Override
-  public void removeContainerToken(ContainerId containerId)
+  public synchronized void removeContainerToken(ContainerId containerId)
       throws IOException {
     containerTokenState.activeTokens.remove(containerId);
   }
 
 
+  @Override
+  public synchronized RecoveredLogDeleterState loadLogDeleterState()
+      throws IOException {
+    RecoveredLogDeleterState state = new RecoveredLogDeleterState();
+    state.logDeleterMap = new HashMap<ApplicationId,LogDeleterProto>(
+        logDeleterState);
+    return state;
+  }
+
+  @Override
+  public synchronized void storeLogDeleter(ApplicationId appId,
+      LogDeleterProto proto)
+      throws IOException {
+    logDeleterState.put(appId, proto);
+  }
+
+  @Override
+  public synchronized void removeLogDeleter(ApplicationId appId)
+      throws IOException {
+    logDeleterState.remove(appId);
+  }
+
+
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/04f5ef18/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index f7f43cc..1804424 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
@@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
 import org.apache.hadoop.yarn.server.records.Version;
@@ -831,6 +833,55 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(expTime3, loadedActiveTokens.get(cid3));
   }
 
+  @Test
+  public void testLogDeleterStorage() throws IOException {
+    // test empty when no state
+    RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
+    assertTrue(state.getLogDeleterMap().isEmpty());
+
+    // store log deleter state
+    final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+    LogDeleterProto proto1 = LogDeleterProto.newBuilder()
+        .setUser("user1")
+        .setDeletionTime(1234)
+        .build();
+    stateStore.storeLogDeleter(appId1, proto1);
+
+    // restart state store and verify recovered
+    restartStateStore();
+    state = stateStore.loadLogDeleterState();
+    assertEquals(1, state.getLogDeleterMap().size());
+    assertEquals(proto1, state.getLogDeleterMap().get(appId1));
+
+    // store another log deleter
+    final ApplicationId appId2 = ApplicationId.newInstance(2, 2);
+    LogDeleterProto proto2 = LogDeleterProto.newBuilder()
+        .setUser("user2")
+        .setDeletionTime(5678)
+        .build();
+    stateStore.storeLogDeleter(appId2, proto2);
+
+    // restart state store and verify recovered
+    restartStateStore();
+    state = stateStore.loadLogDeleterState();
+    assertEquals(2, state.getLogDeleterMap().size());
+    assertEquals(proto1, state.getLogDeleterMap().get(appId1));
+    assertEquals(proto2, state.getLogDeleterMap().get(appId2));
+
+    // remove a deleter and verify removed after restart and recovery
+    stateStore.removeLogDeleter(appId1);
+    restartStateStore();
+    state = stateStore.loadLogDeleterState();
+    assertEquals(1, state.getLogDeleterMap().size());
+    assertEquals(proto2, state.getLogDeleterMap().get(appId2));
+
+    // remove last deleter and verify empty after restart and recovery
+    stateStore.removeLogDeleter(appId2);
+    restartStateStore();
+    state = stateStore.loadLogDeleterState();
+    assertTrue(state.getLogDeleterMap().isEmpty());
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {