You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2018/08/20 15:19:43 UTC

hadoop git commit: YARN-8242. YARN NM: OOM error while reading back the state store on recovery. Contributed by Pradeep Ambati and Kanwaljeet Sachdev

Repository: hadoop
Updated Branches:
  refs/heads/trunk 01ff81781 -> 65e746971


YARN-8242. YARN NM: OOM error while reading back the state store on recovery. Contributed by Pradeep Ambati and Kanwaljeet Sachdev


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/65e74697
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/65e74697
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/65e74697

Branch: refs/heads/trunk
Commit: 65e7469712be6cf393e29ef73cc94727eec81227
Parents: 01ff817
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Aug 20 10:14:40 2018 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Aug 20 10:14:40 2018 -0500

----------------------------------------------------------------------
 .../server/nodemanager/DeletionService.java     |  25 +-
 .../containermanager/ContainerManagerImpl.java  |  26 +-
 .../localizer/ResourceLocalizationService.java  |  56 +--
 .../recovery/NMLeveldbStateStoreService.java    | 412 ++++++++++++-------
 .../recovery/NMNullStateStoreService.java       |   2 +-
 .../recovery/NMStateStoreService.java           |  55 +--
 .../nodemanager/recovery/RecoveryIterator.java  |  41 ++
 .../security/NMContainerTokenSecretManager.java |  27 +-
 .../security/NMTokenSecretManagerInNM.java      |  15 +-
 .../recovery/NMMemoryStateStoreService.java     |  82 +++-
 .../TestNMLeveldbStateStoreService.java         | 216 +++++++---
 11 files changed, 647 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
index ae81dc1..e665c5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
@@ -19,13 +19,14 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -96,16 +97,20 @@ public class DeletionService extends AbstractService {
 
   private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
       throws IOException {
-    List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
     Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
-        new HashMap<>(taskProtos.size());
-    Set<Integer> successorTasks = new HashSet<>();
-    for (DeletionServiceDeleteTaskProto proto : taskProtos) {
-      DeletionTaskRecoveryInfo info =
-          NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
-      idToInfoMap.put(info.getTask().getTaskId(), info);
-      nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
-      successorTasks.addAll(info.getSuccessorTaskIds());
+        new HashMap<Integer, DeletionTaskRecoveryInfo>();
+    Set<Integer> successorTasks = new HashSet<Integer>();
+
+    try (RecoveryIterator<DeletionServiceDeleteTaskProto> it =
+             state.getIterator()) {
+      while (it.hasNext()) {
+        DeletionServiceDeleteTaskProto proto = it.next();
+        DeletionTaskRecoveryInfo info =
+            NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
+        idToInfoMap.put(info.getTask().getTaskId(), info);
+        nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
+        successorTasks.addAll(info.getSuccessorTaskIds());
+      }
     }
 
     // restore the task dependencies and schedule the deletion tasks that

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 8b35258..b89e2dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -23,6 +23,7 @@ import com.google.protobuf.ByteString;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -356,19 +357,26 @@ public class ContainerManagerImpl extends CompositeService implements
           stateStore.loadLocalizationState());
 
       RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
-      for (ContainerManagerApplicationProto proto :
-           appsState.getApplications()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Recovering application with state: " + proto.toString());
+      try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator =
+               appsState.getIterator()) {
+        while (rasIterator.hasNext()) {
+          ContainerManagerApplicationProto proto = rasIterator.next();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovering application with state: " + proto.toString());
+          }
+          recoverApplication(proto);
         }
-        recoverApplication(proto);
       }
 
-      for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Recovering container with state: " + rcs);
+      try (RecoveryIterator<RecoveredContainerState> rcsIterator =
+               stateStore.getContainerStateIterator()) {
+        while (rcsIterator.hasNext()) {
+          RecoveredContainerState rcs = rcsIterator.next();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovering container with state: " + rcs);
+          }
+          recoverContainer(rcs);
         }
-        recoverContainer(rcs);
       }
 
       // Recovery AMRMProxy state after apps and containers are recovered

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 3834ece..2892d1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -295,42 +297,46 @@ public class ResourceLocalizationService extends CompositeService
 
   //Recover localized resources after an NM restart
   public void recoverLocalizedResources(RecoveredLocalizationState state)
-      throws URISyntaxException {
+      throws URISyntaxException, IOException {
     LocalResourceTrackerState trackerState = state.getPublicTrackerState();
     recoverTrackerResources(publicRsrc, trackerState);
 
-    for (Map.Entry<String, RecoveredUserResources> userEntry :
-         state.getUserResources().entrySet()) {
-      String user = userEntry.getKey();
-      RecoveredUserResources userResources = userEntry.getValue();
-      trackerState = userResources.getPrivateTrackerState();
-      if (!trackerState.isEmpty()) {
-        LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-            null, dispatcher, true, super.getConfig(), stateStore, dirsHandler);
-        LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
-            tracker);
-        if (oldTracker != null) {
-          tracker = oldTracker;
-        }
-        recoverTrackerResources(tracker, trackerState);
-      }
-
-      for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
-           userResources.getAppTrackerStates().entrySet()) {
-        trackerState = appEntry.getValue();
+    try (RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it
+             = state.getIterator()) {
+      while (it.hasNext()) {
+        Map.Entry<String, RecoveredUserResources> userEntry = it.next();
+        String user = userEntry.getKey();
+        RecoveredUserResources userResources = userEntry.getValue();
+        trackerState = userResources.getPrivateTrackerState();
         if (!trackerState.isEmpty()) {
-          ApplicationId appId = appEntry.getKey();
-          String appIdStr = appId.toString();
           LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-              appId, dispatcher, false, super.getConfig(), stateStore,
+              null, dispatcher, true, super.getConfig(), stateStore,
               dirsHandler);
-          LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+          LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
               tracker);
           if (oldTracker != null) {
             tracker = oldTracker;
           }
           recoverTrackerResources(tracker, trackerState);
         }
+
+        for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
+            userResources.getAppTrackerStates().entrySet()) {
+          trackerState = appEntry.getValue();
+          if (!trackerState.isEmpty()) {
+            ApplicationId appId = appEntry.getKey();
+            String appIdStr = appId.toString();
+            LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+                appId, dispatcher, false, super.getConfig(), stateStore,
+                dirsHandler);
+            LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+                tracker);
+            if (oldTracker != null) {
+              tracker = oldTracker;
+            }
+            recoverTrackerResources(tracker, trackerState);
+          }
+        }
       }
     }
   }
@@ -556,7 +562,7 @@ public class ResourceLocalizationService extends CompositeService
       rsrcCleanup.getResources();
     for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
          rsrcs.entrySet()) {
-      LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), 
+      LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
           c.getContainerId().getApplicationAttemptId()
           .getApplicationId());
       for (LocalResourceRequest req : e.getValue()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 67f642d..5d4253d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -73,6 +74,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -225,68 +227,119 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     return isHealthy;
   }
 
-  @Override
-  public List<RecoveredContainerState> loadContainersState()
+  // LeveldbIterator starting at startkey
+  private LeveldbIterator getLevelDBIterator(String startKey)
       throws IOException {
-    ArrayList<RecoveredContainerState> containers =
-        new ArrayList<RecoveredContainerState>();
-    ArrayList<ContainerId> containersToRemove =
-              new ArrayList<ContainerId>();
-    LeveldbIterator iter = null;
     try {
-      iter = new LeveldbIterator(db);
-      iter.seek(bytes(CONTAINERS_KEY_PREFIX));
+      LeveldbIterator it = new LeveldbIterator(db);
+      it.seek(bytes(startKey));
+      return it;
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
 
-      while (iter.hasNext()) {
-        Entry<byte[], byte[]> entry = iter.peekNext();
+  // Base Recovery Iterator
+  private abstract class BaseRecoveryIterator<T> implements
+      RecoveryIterator<T> {
+    LeveldbIterator it;
+    T nextItem;
+
+    BaseRecoveryIterator(String dbKey) throws IOException {
+      this.it = getLevelDBIterator(dbKey);
+      this.nextItem = null;
+    }
+
+    protected abstract T getNextItem(LeveldbIterator it) throws IOException;
+
+    @Override
+    public boolean hasNext() throws IOException {
+      if (nextItem == null) {
+        nextItem = getNextItem(it);
+      }
+      return (nextItem != null);
+    }
+
+    @Override
+    public T next() throws IOException, NoSuchElementException {
+      T tmp = nextItem;
+      if (tmp != null) {
+        nextItem = null;
+        return tmp;
+      } else {
+        tmp = getNextItem(it);
+        if (tmp == null) {
+          throw new NoSuchElementException();
+        }
+        return tmp;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  //  Container Recovery Iterator
+  private class ContainerStateIterator extends
+      BaseRecoveryIterator<RecoveredContainerState> {
+    ContainerStateIterator() throws IOException {
+      super(CONTAINERS_KEY_PREFIX);
+    }
+
+    @Override
+    protected RecoveredContainerState getNextItem(LeveldbIterator it)
+        throws IOException {
+      return getNextRecoveredContainer(it);
+    }
+  }
+
+  private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it)
+      throws IOException {
+    RecoveredContainerState rcs = null;
+    try {
+      while (it.hasNext()) {
+        Entry<byte[], byte[]> entry = it.peekNext();
         String key = asString(entry.getKey());
         if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
-          break;
+          return null;
         }
 
         int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length());
         if (idEndPos < 0) {
           throw new IOException("Unable to determine container in key: " + key);
         }
-        ContainerId containerId = ContainerId.fromString(
-            key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
-        String keyPrefix = key.substring(0, idEndPos+1);
-        RecoveredContainerState rcs = loadContainerState(containerId,
-            iter, keyPrefix);
-        // Don't load container without StartContainerRequest
+        String keyPrefix = key.substring(0, idEndPos + 1);
+        rcs = loadContainerState(it, keyPrefix);
         if (rcs.startRequest != null) {
-          containers.add(rcs);
+          break;
         } else {
-          containersToRemove.add(containerId);
+          removeContainer(rcs.getContainerId());
+          rcs = null;
         }
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
+    return rcs;
+  }
 
-    // remove container without StartContainerRequest
-    for (ContainerId containerId : containersToRemove) {
-      LOG.warn("Remove container " + containerId +
-          " with incomplete records");
-      try {
-        removeContainer(containerId);
-        // TODO: kill and cleanup the leaked container
-      } catch (IOException e) {
-        LOG.error("Unable to remove container " + containerId +
-            " in store", e);
-      }
-    }
 
-    return containers;
+  @Override
+  public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
+      throws IOException {
+    return new ContainerStateIterator();
   }
 
-  private RecoveredContainerState loadContainerState(ContainerId containerId,
-      LeveldbIterator iter, String keyPrefix) throws IOException {
-    RecoveredContainerState rcs = new RecoveredContainerState();
+  private RecoveredContainerState loadContainerState(LeveldbIterator iter,
+       String keyPrefix) throws IOException {
+    ContainerId containerId = ContainerId.fromString(
+        keyPrefix.substring(CONTAINERS_KEY_PREFIX.length(),
+            keyPrefix.length()-1));
+    RecoveredContainerState rcs = new RecoveredContainerState(containerId);
     rcs.status = RecoveredContainerStatus.REQUESTED;
     while (iter.hasNext()) {
       Entry<byte[],byte[]> entry = iter.peekNext();
@@ -680,35 +733,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   }
 
 
-  @Override
-  public RecoveredApplicationsState loadApplicationsState()
-      throws IOException {
-    RecoveredApplicationsState state = new RecoveredApplicationsState();
-    state.applications = new ArrayList<ContainerManagerApplicationProto>();
-    String keyPrefix = APPLICATIONS_KEY_PREFIX;
-    LeveldbIterator iter = null;
+  // Application Recovery Iterator
+  private class ApplicationStateIterator extends
+      BaseRecoveryIterator<ContainerManagerApplicationProto> {
+    ApplicationStateIterator() throws IOException {
+      super(APPLICATIONS_KEY_PREFIX);
+    }
+
+    @Override
+    protected ContainerManagerApplicationProto getNextItem(LeveldbIterator it)
+        throws IOException {
+      return getNextRecoveredApplication(it);
+    }
+  }
+
+  private ContainerManagerApplicationProto getNextRecoveredApplication(
+      LeveldbIterator it) throws IOException {
+    ContainerManagerApplicationProto applicationProto = null;
     try {
-      iter = new LeveldbIterator(db);
-      iter.seek(bytes(keyPrefix));
-      while (iter.hasNext()) {
-        Entry<byte[], byte[]> entry = iter.next();
+      if (it.hasNext()) {
+        Entry<byte[], byte[]> entry = it.next();
         String key = asString(entry.getKey());
-        if (!key.startsWith(keyPrefix)) {
-          break;
+        if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) {
+          return null;
         }
-        state.applications.add(
-            ContainerManagerApplicationProto.parseFrom(entry.getValue()));
+        applicationProto = ContainerManagerApplicationProto.parseFrom(
+            entry.getValue());
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
+    return applicationProto;
+  }
 
+  @Override
+  public RecoveredApplicationsState loadApplicationsState()
+      throws IOException {
+    RecoveredApplicationsState state = new RecoveredApplicationsState();
+    state.it = new ApplicationStateIterator();
     cleanupDeprecatedFinishedApps();
-
     return state;
   }
 
@@ -752,24 +815,29 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   }
 
 
-  @Override
-  public RecoveredLocalizationState loadLocalizationState()
-      throws IOException {
-    RecoveredLocalizationState state = new RecoveredLocalizationState();
+  // User Resource Recovery Iterator.
+  private class UserResourcesIterator extends
+      BaseRecoveryIterator<Entry<String, RecoveredUserResources>> {
+    UserResourcesIterator() throws IOException {
+      super(LOCALIZATION_PRIVATE_KEY_PREFIX);
+    }
 
-    LeveldbIterator iter = null;
-    try {
-      iter = new LeveldbIterator(db);
-      iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX));
-      state.publicTrackerState = loadResourceTrackerState(iter,
-          LOCALIZATION_PUBLIC_KEY_PREFIX);
+    @Override
+    protected Entry<String, RecoveredUserResources> getNextItem(
+        LeveldbIterator it) throws IOException {
+      return getNextRecoveredPrivateLocalizationEntry(it);
+    }
+  }
 
-      iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX));
-      while (iter.hasNext()) {
-        Entry<byte[],byte[]> entry = iter.peekNext();
+  private Entry<String, RecoveredUserResources> getNextRecoveredPrivateLocalizationEntry(
+      LeveldbIterator it) throws IOException {
+    Entry<String, RecoveredUserResources> localEntry = null;
+    try {
+      if (it.hasNext()) {
+        Entry<byte[], byte[]> entry = it.peekNext();
         String key = asString(entry.getKey());
         if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) {
-          break;
+          return null;
         }
 
         int userEndPos = key.indexOf('/',
@@ -780,17 +848,24 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         }
         String user = key.substring(
             LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos);
-        state.userResources.put(user, loadUserLocalizedResources(iter,
-            key.substring(0, userEndPos+1)));
+        RecoveredUserResources val = loadUserLocalizedResources(it,
+            key.substring(0, userEndPos+1));
+        localEntry = new AbstractMap.SimpleEntry<>(user, val);
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
+    return localEntry;
+  }
 
+  @Override
+  public RecoveredLocalizationState loadLocalizationState()
+      throws IOException {
+    RecoveredLocalizationState state = new RecoveredLocalizationState();
+    LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX);
+    state.publicTrackerState = loadResourceTrackerState(it,
+        LOCALIZATION_PUBLIC_KEY_PREFIX);
+    state.it = new UserResourcesIterator();
     return state;
   }
 
@@ -800,7 +875,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
     LocalResourceTrackerState state = new LocalResourceTrackerState();
     while (iter.hasNext()) {
-      Entry<byte[],byte[]> entry = iter.peekNext();
+      Entry<byte[], byte[]> entry = iter.peekNext();
       String key = asString(entry.getKey());
       if (!key.startsWith(keyPrefix)) {
         break;
@@ -981,32 +1056,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         + LOCALIZATION_APPCACHE_SUFFIX + appId + "/";
   }
 
+  // Deletion State Recovery Iterator.
+  private class DeletionStateIterator extends
+      BaseRecoveryIterator<DeletionServiceDeleteTaskProto> {
+    DeletionStateIterator() throws IOException {
+      super(DELETION_TASK_KEY_PREFIX);
+    }
 
-  @Override
-  public RecoveredDeletionServiceState loadDeletionServiceState()
-      throws IOException {
-    RecoveredDeletionServiceState state = new RecoveredDeletionServiceState();
-    state.tasks = new ArrayList<DeletionServiceDeleteTaskProto>();
-    LeveldbIterator iter = null;
+    @Override
+    protected DeletionServiceDeleteTaskProto getNextItem(LeveldbIterator it)
+        throws IOException {
+      return getNextRecoveredDeletionService(it);
+    }
+  }
+
+  private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService(
+      LeveldbIterator it) throws IOException {
+    DeletionServiceDeleteTaskProto deleteProto = null;
     try {
-      iter = new LeveldbIterator(db);
-      iter.seek(bytes(DELETION_TASK_KEY_PREFIX));
-      while (iter.hasNext()) {
-        Entry<byte[], byte[]> entry = iter.next();
+      if (it.hasNext()) {
+        Entry<byte[], byte[]> entry = it.next();
         String key = asString(entry.getKey());
         if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) {
-          break;
+          return null;
         }
-        state.tasks.add(
-            DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
+        deleteProto = DeletionServiceDeleteTaskProto.parseFrom(
+            entry.getValue());
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
+    return deleteProto;
+  }
+
+  @Override
+  public RecoveredDeletionServiceState loadDeletionServiceState()
+      throws IOException {
+    RecoveredDeletionServiceState state = new RecoveredDeletionServiceState();
+    state.it = new DeletionStateIterator();
     return state;
   }
 
@@ -1033,29 +1120,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     }
   }
 
+  private MasterKey getMasterKey(String dbKey) throws IOException {
+    try{
+      byte[] data = db.get(bytes(dbKey));
+      if (data == null || data.length == 0) {
+        return null;
+      }
+      return parseMasterKey(data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
 
-  @Override
-  public RecoveredNMTokensState loadNMTokensState() throws IOException {
-    RecoveredNMTokensState state = new RecoveredNMTokensState();
-    state.applicationMasterKeys =
-        new HashMap<ApplicationAttemptId, MasterKey>();
-    LeveldbIterator iter = null;
+  // Recover NMTokens Iterator
+  private class NMTokensStateIterator extends
+      BaseRecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> {
+    NMTokensStateIterator() throws IOException {
+      super(NM_TOKENS_KEY_PREFIX);
+    }
+
+    @Override
+    protected Entry<ApplicationAttemptId, MasterKey> getNextItem(
+        LeveldbIterator it) throws IOException {
+      return getNextMasterKeyEntry(it);
+    }
+  }
+
+  private Entry<ApplicationAttemptId, MasterKey> getNextMasterKeyEntry(
+      LeveldbIterator it) throws IOException {
+    Entry<ApplicationAttemptId, MasterKey> masterKeyentry = null;
     try {
-      iter = new LeveldbIterator(db);
-      iter.seek(bytes(NM_TOKENS_KEY_PREFIX));
-      while (iter.hasNext()) {
-        Entry<byte[], byte[]> entry = iter.next();
+      while (it.hasNext()) {
+        Entry<byte[], byte[]> entry = it.next();
         String fullKey = asString(entry.getKey());
         if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) {
           break;
         }
         String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length());
-        if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
-          state.currentMasterKey = parseMasterKey(entry.getValue());
-        } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
-          state.previousMasterKey = parseMasterKey(entry.getValue());
-        } else if (key.startsWith(
-            ApplicationAttemptId.appAttemptIdStrPrefix)) {
+        if (key.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
           ApplicationAttemptId attempt;
           try {
             attempt = ApplicationAttemptId.fromString(key);
@@ -1063,17 +1165,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
             throw new IOException("Bad application master key state for "
                 + fullKey, e);
           }
-          state.applicationMasterKeys.put(attempt,
+          masterKeyentry = new AbstractMap.SimpleEntry<>(attempt,
               parseMasterKey(entry.getValue()));
+          break;
         }
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
+    return masterKeyentry;
+  }
+
+  @Override
+  public RecoveredNMTokensState loadNMTokensState() throws IOException {
+    RecoveredNMTokensState state = new RecoveredNMTokensState();
+    state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX
+                                          + CURRENT_MASTER_KEY_SUFFIX);
+    state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX
+                                            + PREV_MASTER_KEY_SUFFIX);
+    state.it = new NMTokensStateIterator();
     return state;
   }
 
@@ -1122,45 +1232,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     }
   }
 
+  // Recover ContainersToken Iterator.
+  private class ContainerTokensStateIterator extends
+      BaseRecoveryIterator<Entry<ContainerId, Long>> {
+    ContainerTokensStateIterator() throws IOException {
+      super(CONTAINER_TOKENS_KEY_PREFIX);
+    }
 
-  @Override
-  public RecoveredContainerTokensState loadContainerTokensState()
+    @Override
+    protected Entry<ContainerId, Long> getNextItem(LeveldbIterator it)
+        throws IOException {
+      return getNextContainerToken(it);
+    }
+  }
+
+  private Entry<ContainerId, Long> getNextContainerToken(LeveldbIterator it)
       throws IOException {
-    RecoveredContainerTokensState state = new RecoveredContainerTokensState();
-    state.activeTokens = new HashMap<ContainerId, Long>();
-    LeveldbIterator iter = null;
+    Entry<ContainerId, Long> containerTokenEntry = null;
     try {
-      iter = new LeveldbIterator(db);
-      iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX));
-      final int containerTokensKeyPrefixLength =
-          CONTAINER_TOKENS_KEY_PREFIX.length();
-      while (iter.hasNext()) {
-        Entry<byte[], byte[]> entry = iter.next();
+      while (it.hasNext()) {
+        Entry<byte[], byte[]> entry = it.next();
         String fullKey = asString(entry.getKey());
         if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) {
           break;
         }
-        String key = fullKey.substring(containerTokensKeyPrefixLength);
-        if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
-          state.currentMasterKey = parseMasterKey(entry.getValue());
-        } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
-          state.previousMasterKey = parseMasterKey(entry.getValue());
-        } else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
-          loadContainerToken(state, fullKey, key, entry.getValue());
+        String key = fullKey.substring(CONTAINER_TOKENS_KEY_PREFIX.length());
+        if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+          containerTokenEntry = loadContainerToken(fullKey, key,
+              entry.getValue());
+          break;
         }
       }
     } catch (DBException e) {
       throw new IOException(e);
-    } finally {
-      if (iter != null) {
-        iter.close();
-      }
     }
-    return state;
+    return containerTokenEntry;
   }
 
-  private static void loadContainerToken(RecoveredContainerTokensState state,
-      String key, String containerIdStr, byte[] value) throws IOException {
+  private static Entry<ContainerId, Long> loadContainerToken(String key,
+      String containerIdStr, byte[] value) throws IOException {
     ContainerId containerId;
     Long expTime;
     try {
@@ -1169,7 +1279,19 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     } catch (IllegalArgumentException e) {
       throw new IOException("Bad container token state for " + key, e);
     }
-    state.activeTokens.put(containerId, expTime);
+    return new AbstractMap.SimpleEntry<>(containerId, expTime);
+  }
+
+  @Override
+  public RecoveredContainerTokensState loadContainerTokensState()
+      throws IOException {
+    RecoveredContainerTokensState state = new RecoveredContainerTokensState();
+    state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX
+        + CURRENT_MASTER_KEY_SUFFIX);
+    state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX
+        + PREV_MASTER_KEY_SUFFIX);
+    state.it = new ContainerTokensStateIterator();
+    return state;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index dfad9cf..3ae00f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -65,7 +65,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public List<RecoveredContainerState> loadContainersState()
+  public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
       throws IOException {
     throw new UnsupportedOperationException(
         "Recovery not supported by this state store");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 70decdb..35caec9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -67,12 +68,11 @@ public abstract class NMStateStoreService extends AbstractService {
   }
 
   public static class RecoveredApplicationsState {
-    List<ContainerManagerApplicationProto> applications;
+    RecoveryIterator<ContainerManagerApplicationProto> it = null;
 
-    public List<ContainerManagerApplicationProto> getApplications() {
-      return applications;
+    public RecoveryIterator<ContainerManagerApplicationProto> getIterator() {
+      return it;
     }
-
   }
 
   /**
@@ -106,6 +106,15 @@ public abstract class NMStateStoreService extends AbstractService {
         RecoveredContainerType.RECOVER;
     private long startTime;
     private ResourceMappings resMappings = new ResourceMappings();
+    private final ContainerId containerId;
+
+    RecoveredContainerState(ContainerId containerId){
+      this.containerId = containerId;
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
 
     public RecoveredContainerStatus getStatus() {
       return status;
@@ -248,30 +257,33 @@ public abstract class NMStateStoreService extends AbstractService {
   public static class RecoveredLocalizationState {
     LocalResourceTrackerState publicTrackerState =
         new LocalResourceTrackerState();
-    Map<String, RecoveredUserResources> userResources =
-        new HashMap<String, RecoveredUserResources>();
+    RecoveryIterator<Entry<String, RecoveredUserResources>> it = null;
 
     public LocalResourceTrackerState getPublicTrackerState() {
       return publicTrackerState;
     }
 
-    public Map<String, RecoveredUserResources> getUserResources() {
-      return userResources;
+    public RecoveryIterator<Entry<String, RecoveredUserResources>> getIterator() {
+      return it;
     }
   }
 
   public static class RecoveredDeletionServiceState {
-    List<DeletionServiceDeleteTaskProto> tasks;
+    RecoveryIterator<DeletionServiceDeleteTaskProto> it = null;
 
-    public List<DeletionServiceDeleteTaskProto> getTasks() {
-      return tasks;
+    public RecoveryIterator<DeletionServiceDeleteTaskProto> getIterator(){
+      return it;
     }
   }
 
   public static class RecoveredNMTokensState {
     MasterKey currentMasterKey;
     MasterKey previousMasterKey;
-    Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
+    RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> it = null;
+
+    public RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> getIterator() {
+      return it;
+    }
 
     public MasterKey getCurrentMasterKey() {
       return currentMasterKey;
@@ -281,15 +293,16 @@ public abstract class NMStateStoreService extends AbstractService {
       return previousMasterKey;
     }
 
-    public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() {
-      return applicationMasterKeys;
-    }
   }
 
   public static class RecoveredContainerTokensState {
     MasterKey currentMasterKey;
     MasterKey previousMasterKey;
-    Map<ContainerId, Long> activeTokens;
+    RecoveryIterator<Entry<ContainerId, Long>> it = null;
+
+    public RecoveryIterator<Entry<ContainerId, Long>> getIterator() {
+      return it;
+    }
 
     public MasterKey getCurrentMasterKey() {
       return currentMasterKey;
@@ -299,9 +312,6 @@ public abstract class NMStateStoreService extends AbstractService {
       return previousMasterKey;
     }
 
-    public Map<ContainerId, Long> getActiveTokens() {
-      return activeTokens;
-    }
   }
 
   public static class RecoveredLogDeleterState {
@@ -400,11 +410,10 @@ public abstract class NMStateStoreService extends AbstractService {
 
 
   /**
-   * Load the state of containers
-   * @return recovered state for containers
-   * @throws IOException
+   * get the Recovered Container State Iterator
+   * @return recovery iterator
    */
-  public abstract List<RecoveredContainerState> loadContainersState()
+  public abstract RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java
new file mode 100644
index 0000000..0bb262a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.recovery;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * A wrapper for a Iterator to translate the raw RuntimeExceptions that
+ * can be thrown into IOException.
+ */
+public interface RecoveryIterator<T> extends Closeable {
+
+  /**
+   * Returns true if the iteration has more elements.
+   */
+  boolean hasNext() throws IOException;
+
+  /**
+   * Returns the next element in the iteration.
+   */
+  T next() throws IOException, NoSuchElementException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
index 256f649..b3df69b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,17 +92,20 @@ public class NMContainerTokenSecretManager extends
       super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
     }
 
-    for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
-      ContainerId containerId = entry.getKey();
-      Long expTime = entry.getValue();
-      List<ContainerId> containerList =
-          recentlyStartedContainerTracker.get(expTime);
-      if (containerList == null) {
-        containerList = new ArrayList<ContainerId>();
-        recentlyStartedContainerTracker.put(expTime, containerList);
-      }
-      if (!containerList.contains(containerId)) {
-        containerList.add(containerId);
+    try (RecoveryIterator<Entry<ContainerId, Long>> it = state.getIterator()) {
+      while (it.hasNext()) {
+        Entry<ContainerId, Long> entry = it.next();
+        ContainerId containerId = entry.getKey();
+        Long expTime = entry.getValue();
+        List<ContainerId> containerList =
+            recentlyStartedContainerTracker.get(expTime);
+        if (containerList == null) {
+          containerList = new ArrayList<ContainerId>();
+          recentlyStartedContainerTracker.put(expTime, containerList);
+        }
+        if (!containerList.contains(containerId)) {
+          containerList.add(containerId);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
index 0956e77..f895791 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,11 +89,14 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
       super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
     }
 
-    for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
-         state.getApplicationMasterKeys().entrySet()) {
-      key = entry.getValue();
-      oldMasterKeys.put(entry.getKey(),
-          new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+    try (RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it =
+             state.getIterator()) {
+      while (it.hasNext()) {
+        Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
+        key = entry.getValue();
+        oldMasterKeys.put(entry.getKey(),
+            new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+      }
     }
 
     // reconstruct app to app attempts map

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index c5428d1..9658ecd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -56,6 +57,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
   private RecoveredNMTokensState nmTokenState;
   private RecoveredContainerTokensState containerTokenState;
+  private Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
+  private Map<ContainerId, Long> activeTokens;
   private Map<ApplicationId, LogDeleterProto> logDeleterState;
   private RecoveredAMRMProxyState amrmProxyState;
 
@@ -68,10 +71,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
     containerStates = new HashMap<ContainerId, RecoveredContainerState>();
     nmTokenState = new RecoveredNMTokensState();
-    nmTokenState.applicationMasterKeys =
-        new HashMap<ApplicationAttemptId, MasterKey>();
+    applicationMasterKeys = new HashMap<ApplicationAttemptId, MasterKey>();
     containerTokenState = new RecoveredContainerTokensState();
-    containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
+    activeTokens = new HashMap<ContainerId, Long>();
     trackerStates = new HashMap<TrackerKey, TrackerState>();
     deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
     logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
@@ -86,13 +88,39 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   protected void closeStorage() {
   }
 
+  // Recovery Iterator Implementation.
+  private class NMMemoryRecoveryIterator<T> implements RecoveryIterator<T> {
+
+    private Iterator<T> it;
+
+    NMMemoryRecoveryIterator(Iterator<T> it){
+      this.it = it;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return it.hasNext();
+    }
+
+    @Override
+    public T next() throws IOException {
+      return it.next();
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
 
   @Override
   public synchronized RecoveredApplicationsState loadApplicationsState()
       throws IOException {
     RecoveredApplicationsState state = new RecoveredApplicationsState();
-    state.applications = new ArrayList<ContainerManagerApplicationProto>(
-        apps.values());
+    List<ContainerManagerApplicationProto> containerList =
+        new ArrayList<ContainerManagerApplicationProto>(apps.values());
+    state.it = new NMMemoryRecoveryIterator<ContainerManagerApplicationProto>(
+        containerList.iterator());
     return state;
   }
 
@@ -111,13 +139,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public synchronized List<RecoveredContainerState> loadContainersState()
+  public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
       throws IOException {
     // return a copy so caller can't modify our state
     List<RecoveredContainerState> result =
         new ArrayList<RecoveredContainerState>(containerStates.size());
     for (RecoveredContainerState rcs : containerStates.values()) {
-      RecoveredContainerState rcsCopy = new RecoveredContainerState();
+      RecoveredContainerState rcsCopy = new RecoveredContainerState(rcs.getContainerId());
       rcsCopy.status = rcs.status;
       rcsCopy.exitCode = rcs.exitCode;
       rcsCopy.killed = rcs.killed;
@@ -131,13 +159,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
       rcsCopy.setResourceMappings(rcs.getResourceMappings());
       result.add(rcsCopy);
     }
-    return result;
+    return new NMMemoryRecoveryIterator<RecoveredContainerState>(
+        result.iterator());
   }
 
   @Override
   public synchronized void storeContainer(ContainerId containerId,
       int version, long startTime, StartContainerRequest startRequest) {
-    RecoveredContainerState rcs = new RecoveredContainerState();
+    RecoveredContainerState rcs = new RecoveredContainerState(containerId);
     rcs.startRequest = startRequest;
     rcs.version = version;
     try {
@@ -284,6 +313,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   @Override
   public synchronized RecoveredLocalizationState loadLocalizationState() {
     RecoveredLocalizationState result = new RecoveredLocalizationState();
+    Map<String, RecoveredUserResources> userResources =
+        new HashMap<String, RecoveredUserResources>();
     for (Map.Entry<TrackerKey, TrackerState> e : trackerStates.entrySet()) {
       TrackerKey tk = e.getKey();
       TrackerState ts = e.getValue();
@@ -294,10 +325,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
       if (tk.user == null) {
         result.publicTrackerState = loadTrackerState(ts);
       } else {
-        RecoveredUserResources rur = result.userResources.get(tk.user);
+        RecoveredUserResources rur = userResources.get(tk.user);
         if (rur == null) {
           rur = new RecoveredUserResources();
-          result.userResources.put(tk.user, rur);
+          userResources.put(tk.user, rur);
         }
         if (tk.appId == null) {
           rur.privateTrackerState = loadTrackerState(ts);
@@ -306,6 +337,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
         }
       }
     }
+    result.it = new NMMemoryRecoveryIterator<Map.Entry<String, RecoveredUserResources>>(
+        userResources.entrySet().iterator());
     return result;
   }
 
@@ -341,8 +374,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
       throws IOException {
     RecoveredDeletionServiceState result =
         new RecoveredDeletionServiceState();
-    result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>(
-        deleteTasks.values());
+    List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
+        new ArrayList<DeletionServiceDeleteTaskProto>(deleteTasks.values());
+    result.it = new NMMemoryRecoveryIterator<DeletionServiceDeleteTaskProto>(
+        deleteTaskProtos.iterator());
     return result;
   }
 
@@ -365,9 +400,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     RecoveredNMTokensState result = new RecoveredNMTokensState();
     result.currentMasterKey = nmTokenState.currentMasterKey;
     result.previousMasterKey = nmTokenState.previousMasterKey;
-    result.applicationMasterKeys =
-        new HashMap<ApplicationAttemptId, MasterKey>(
-            nmTokenState.applicationMasterKeys);
+    Map<ApplicationAttemptId, MasterKey> masterKeysMap =
+        new HashMap<ApplicationAttemptId, MasterKey>(applicationMasterKeys);
+    result.it = new NMMemoryRecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>>(
+        masterKeysMap.entrySet().iterator());
     return result;
   }
 
@@ -389,14 +425,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   public synchronized void storeNMTokenApplicationMasterKey(
       ApplicationAttemptId attempt, MasterKey key) throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
-    nmTokenState.applicationMasterKeys.put(attempt,
+    applicationMasterKeys.put(attempt,
         new MasterKeyPBImpl(keypb.getProto()));
   }
 
   @Override
   public synchronized void removeNMTokenApplicationMasterKey(
       ApplicationAttemptId attempt) throws IOException {
-    nmTokenState.applicationMasterKeys.remove(attempt);
+    applicationMasterKeys.remove(attempt);
   }
 
 
@@ -408,8 +444,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
         new RecoveredContainerTokensState();
     result.currentMasterKey = containerTokenState.currentMasterKey;
     result.previousMasterKey = containerTokenState.previousMasterKey;
-    result.activeTokens =
-        new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
+    Map<ContainerId, Long> containersTokenMap =
+        new HashMap<ContainerId, Long>(activeTokens);
+    result.it = new NMMemoryRecoveryIterator<Map.Entry<ContainerId, Long>>(
+        containersTokenMap.entrySet().iterator());
     return result;
   }
 
@@ -432,13 +470,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   @Override
   public synchronized void storeContainerToken(ContainerId containerId,
       Long expirationTime) throws IOException {
-    containerTokenState.activeTokens.put(containerId, expirationTime);
+    activeTokens.put(containerId, expirationTime);
   }
 
   @Override
   public synchronized void removeContainerToken(ContainerId containerId)
       throws IOException {
-    containerTokenState.activeTokens.remove(containerId);
+    activeTokens.remove(containerId);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 8a8cfa2..fcbbc52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -125,6 +125,73 @@ public class TestNMLeveldbStateStoreService {
     FileUtil.fullyDelete(TMP_DIR);
   }
 
+  private List<RecoveredContainerState> loadContainersState(
+      RecoveryIterator<RecoveredContainerState> it) throws IOException {
+    List<RecoveredContainerState> containers =
+        new ArrayList<RecoveredContainerState>();
+    while (it.hasNext()) {
+      RecoveredContainerState rcs = it.next();
+      containers.add(rcs);
+    }
+    return containers;
+  }
+
+  private List<ContainerManagerApplicationProto> loadApplicationProtos(
+      RecoveryIterator<ContainerManagerApplicationProto> it)
+      throws IOException {
+    List<ContainerManagerApplicationProto> applicationProtos =
+        new ArrayList<ContainerManagerApplicationProto>();
+    while (it.hasNext()) {
+      applicationProtos.add(it.next());
+    }
+    return applicationProtos;
+  }
+
+  private List<DeletionServiceDeleteTaskProto> loadDeletionTaskProtos(
+      RecoveryIterator<DeletionServiceDeleteTaskProto> it) throws IOException {
+    List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
+        new ArrayList<DeletionServiceDeleteTaskProto>();
+    while (it.hasNext()) {
+      deleteTaskProtos.add(it.next());
+    }
+    return deleteTaskProtos;
+  }
+
+  private Map<String, RecoveredUserResources> loadUserResources(
+      RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it)
+      throws IOException {
+    Map<String, RecoveredUserResources> userResources =
+        new HashMap<String, RecoveredUserResources>();
+    while (it.hasNext()) {
+      Map.Entry<String, RecoveredUserResources> entry = it.next();
+      userResources.put(entry.getKey(), entry.getValue());
+    }
+    return userResources;
+  }
+
+  private Map<ApplicationAttemptId, MasterKey> loadNMTokens(
+      RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it)
+      throws IOException {
+    Map<ApplicationAttemptId, MasterKey> nmTokens =
+        new HashMap<ApplicationAttemptId, MasterKey>();
+    while (it.hasNext()) {
+      Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
+      nmTokens.put(entry.getKey(), entry.getValue());
+    }
+    return nmTokens;
+  }
+
+  private Map<ContainerId, Long> loadContainerTokens(
+      RecoveryIterator<Map.Entry<ContainerId, Long>> it) throws IOException {
+    Map<ContainerId, Long> containerTokens =
+        new HashMap<ContainerId, Long>();
+    while (it.hasNext()) {
+      Map.Entry<ContainerId, Long> entry = it.next();
+      containerTokens.put(entry.getKey(), entry.getValue());
+    }
+    return containerTokens;
+  }
+
   private void restartStateStore() throws IOException {
     // need to close so leveldb releases database lock
     if (stateStore != null) {
@@ -142,7 +209,7 @@ public class TestNMLeveldbStateStoreService {
     assertNotNull(pubts);
     assertTrue(pubts.getLocalizedResources().isEmpty());
     assertTrue(pubts.getInProgressResources().isEmpty());
-    assertTrue(state.getUserResources().isEmpty());
+    assertTrue(loadUserResources(state.getIterator()).isEmpty());
   }
 
   @Test
@@ -183,7 +250,7 @@ public class TestNMLeveldbStateStoreService {
       restartStateStore();
       Assert.fail("Incompatible version, should expect fail here.");
     } catch (ServiceStateException e) {
-      Assert.assertTrue("Exception message mismatch", 
+      Assert.assertTrue("Exception message mismatch",
         e.getMessage().contains("Incompatible version for NM state:"));
     }
   }
@@ -192,7 +259,9 @@ public class TestNMLeveldbStateStoreService {
   public void testApplicationStorage() throws IOException {
     // test empty when no state
     RecoveredApplicationsState state = stateStore.loadApplicationsState();
-    assertTrue(state.getApplications().isEmpty());
+    List<ContainerManagerApplicationProto> apps =
+        loadApplicationProtos(state.getIterator());
+    assertTrue(apps.isEmpty());
 
     // store an application and verify recovered
     final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
@@ -204,8 +273,9 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeApplication(appId1, appProto1);
     restartStateStore();
     state = stateStore.loadApplicationsState();
-    assertEquals(1, state.getApplications().size());
-    assertEquals(appProto1, state.getApplications().get(0));
+    apps = loadApplicationProtos(state.getIterator());
+    assertEquals(1, apps.size());
+    assertEquals(appProto1, apps.get(0));
 
     // add a new app
     final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
@@ -216,23 +286,25 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeApplication(appId2, appProto2);
     restartStateStore();
     state = stateStore.loadApplicationsState();
-    assertEquals(2, state.getApplications().size());
-    assertTrue(state.getApplications().contains(appProto1));
-    assertTrue(state.getApplications().contains(appProto2));
+    apps = loadApplicationProtos(state.getIterator());
+    assertEquals(2, apps.size());
+    assertTrue(apps.contains(appProto1));
+    assertTrue(apps.contains(appProto2));
 
     // test removing an application
     stateStore.removeApplication(appId2);
     restartStateStore();
     state = stateStore.loadApplicationsState();
-    assertEquals(1, state.getApplications().size());
-    assertEquals(appProto1, state.getApplications().get(0));
+    apps = loadApplicationProtos(state.getIterator());
+    assertEquals(1, apps.size());
+    assertEquals(appProto1, apps.get(0));
   }
 
   @Test
   public void testContainerStorage() throws IOException {
     // test empty when no state
     List<RecoveredContainerState> recoveredContainers =
-        stateStore.loadContainersState();
+        loadContainersState(stateStore.getContainerStateIterator());
     assertTrue(recoveredContainers.isEmpty());
 
     // create a container request
@@ -254,7 +326,8 @@ public class TestNMLeveldbStateStoreService {
         stateStore.getContainerVersionKey(containerId.toString()))));
 
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(0, rcs.getVersion());
@@ -269,14 +342,16 @@ public class TestNMLeveldbStateStoreService {
     // store a new container record without StartContainerRequest
     ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
     stateStore.storeContainerLaunched(containerId1);
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     // check whether the new container record is discarded
     assertEquals(1, recoveredContainers.size());
 
     // queue the container, and verify recovered
     stateStore.storeContainerQueued(containerId);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
@@ -292,7 +367,8 @@ public class TestNMLeveldbStateStoreService {
     diags.append("some diags for container");
     stateStore.storeContainerDiagnostics(containerId, diags);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
@@ -305,7 +381,8 @@ public class TestNMLeveldbStateStoreService {
     // pause the container, and verify recovered
     stateStore.storeContainerPaused(containerId);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
@@ -316,7 +393,8 @@ public class TestNMLeveldbStateStoreService {
     // Resume the container
     stateStore.removeContainerPaused(containerId);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
 
     // increase the container size, and verify recovered
@@ -328,7 +406,8 @@ public class TestNMLeveldbStateStoreService {
     stateStore
         .storeContainerUpdateToken(containerId, updateTokenIdentifier);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     rcs = recoveredContainers.get(0);
     assertEquals(0, rcs.getVersion());
@@ -342,7 +421,8 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeContainerDiagnostics(containerId, diags);
     stateStore.storeContainerKilled(containerId);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
@@ -358,7 +438,8 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeContainerDiagnostics(containerId, diags);
     stateStore.storeContainerCompleted(containerId, 21);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
@@ -371,7 +452,8 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeContainerWorkDir(containerId, "/test/workdir");
     stateStore.storeContainerLogDir(containerId, "/test/logdir");
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     rcs = recoveredContainers.get(0);
     assertEquals(6, rcs.getRemainingRetryAttempts());
@@ -382,12 +464,13 @@ public class TestNMLeveldbStateStoreService {
     // remove the container and verify not recovered
     stateStore.removeContainer(containerId);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertTrue(recoveredContainers.isEmpty());
     // recover again to check remove clears all containers
     restartStateStore();
     NMStateStoreService nmStoreSpy = spy(stateStore);
-    nmStoreSpy.loadContainersState();
+    loadContainersState(nmStoreSpy.getContainerStateIterator());
     verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class));
   }
 
@@ -399,7 +482,8 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeContainerRestartTimes(containerId,
         finishTimeForRetryAttempts);
     restartStateStore();
-    RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
+    RecoveredContainerState rcs =
+        loadContainersState(stateStore.getContainerStateIterator()).get(0);
     List<Long> recoveredRestartTimes = rcs.getRestartTimes();
     assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
     assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
@@ -481,7 +565,7 @@ public class TestNMLeveldbStateStoreService {
     assertTrue(pubts.getLocalizedResources().isEmpty());
     assertTrue(pubts.getInProgressResources().isEmpty());
     Map<String, RecoveredUserResources> userResources =
-        state.getUserResources();
+        loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
     RecoveredUserResources rur = userResources.get(user);
     LocalResourceTrackerState privts = rur.getPrivateTrackerState();
@@ -535,7 +619,7 @@ public class TestNMLeveldbStateStoreService {
         pubts.getInProgressResources().get(pubRsrcProto1));
     assertEquals(pubRsrcLocalPath2,
         pubts.getInProgressResources().get(pubRsrcProto2));
-    userResources = state.getUserResources();
+    userResources = loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
     rur = userResources.get(user);
     privts = rur.getPrivateTrackerState();
@@ -584,7 +668,7 @@ public class TestNMLeveldbStateStoreService {
     assertTrue(pubts.getLocalizedResources().isEmpty());
     assertTrue(pubts.getInProgressResources().isEmpty());
     Map<String, RecoveredUserResources> userResources =
-        state.getUserResources();
+        loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
     RecoveredUserResources rur = userResources.get(user);
     LocalResourceTrackerState privts = rur.getPrivateTrackerState();
@@ -654,7 +738,7 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(1, pubts.getInProgressResources().size());
     assertEquals(pubRsrcLocalPath2,
         pubts.getInProgressResources().get(pubRsrcProto2));
-    userResources = state.getUserResources();
+    userResources = loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
     rur = userResources.get(user);
     privts = rur.getPrivateTrackerState();
@@ -762,7 +846,7 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(pubLocalizedProto1,
         pubts.getLocalizedResources().iterator().next());
     Map<String, RecoveredUserResources> userResources =
-        state.getUserResources();
+        loadUserResources(state.getIterator());
     assertTrue(userResources.isEmpty());
   }
 
@@ -771,7 +855,9 @@ public class TestNMLeveldbStateStoreService {
     // test empty when no state
     RecoveredDeletionServiceState state =
         stateStore.loadDeletionServiceState();
-    assertTrue(state.getTasks().isEmpty());
+    List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
+        loadDeletionTaskProtos(state.getIterator());
+    assertTrue(deleteTaskProtos.isEmpty());
 
     // store a deletion task and verify recovered
     DeletionServiceDeleteTaskProto proto =
@@ -788,8 +874,9 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeDeletionTask(proto.getId(), proto);
     restartStateStore();
     state = stateStore.loadDeletionServiceState();
-    assertEquals(1, state.getTasks().size());
-    assertEquals(proto, state.getTasks().get(0));
+    deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
+    assertEquals(1, deleteTaskProtos.size());
+    assertEquals(proto, deleteTaskProtos.get(0));
 
     // store another deletion task
     DeletionServiceDeleteTaskProto proto2 =
@@ -802,31 +889,36 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeDeletionTask(proto2.getId(), proto2);
     restartStateStore();
     state = stateStore.loadDeletionServiceState();
-    assertEquals(2, state.getTasks().size());
-    assertTrue(state.getTasks().contains(proto));
-    assertTrue(state.getTasks().contains(proto2));
+    deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
+    assertEquals(2, deleteTaskProtos.size());
+    assertTrue(deleteTaskProtos.contains(proto));
+    assertTrue(deleteTaskProtos.contains(proto2));
+
 
     // delete a task and verify gone after recovery
     stateStore.removeDeletionTask(proto2.getId());
     restartStateStore();
-    state = stateStore.loadDeletionServiceState();
-    assertEquals(1, state.getTasks().size());
-    assertEquals(proto, state.getTasks().get(0));
+    state =  stateStore.loadDeletionServiceState();
+    deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
+    assertEquals(1, deleteTaskProtos.size());
+    assertEquals(proto, deleteTaskProtos.get(0));
 
     // delete the last task and verify none left
     stateStore.removeDeletionTask(proto.getId());
     restartStateStore();
     state = stateStore.loadDeletionServiceState();
-    assertTrue(state.getTasks().isEmpty());
-  }
+    deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
+    assertTrue(deleteTaskProtos.isEmpty());  }
 
   @Test
   public void testNMTokenStorage() throws IOException {
     // test empty when no state
     RecoveredNMTokensState state = stateStore.loadNMTokensState();
+    Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
+        loadNMTokens(state.getIterator());
     assertNull(state.getCurrentMasterKey());
     assertNull(state.getPreviousMasterKey());
-    assertTrue(state.getApplicationMasterKeys().isEmpty());
+    assertTrue(loadedAppKeys.isEmpty());
 
     // store a master key and verify recovered
     NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
@@ -834,18 +926,20 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeNMTokenCurrentMasterKey(currentKey);
     restartStateStore();
     state = stateStore.loadNMTokensState();
+    loadedAppKeys = loadNMTokens(state.getIterator());
     assertEquals(currentKey, state.getCurrentMasterKey());
     assertNull(state.getPreviousMasterKey());
-    assertTrue(state.getApplicationMasterKeys().isEmpty());
+    assertTrue(loadedAppKeys.isEmpty());
 
     // store a previous key and verify recovered
     MasterKey prevKey = secretMgr.generateKey();
     stateStore.storeNMTokenPreviousMasterKey(prevKey);
     restartStateStore();
     state = stateStore.loadNMTokensState();
+    loadedAppKeys = loadNMTokens(state.getIterator());
     assertEquals(currentKey, state.getCurrentMasterKey());
     assertEquals(prevKey, state.getPreviousMasterKey());
-    assertTrue(state.getApplicationMasterKeys().isEmpty());
+    assertTrue(loadedAppKeys.isEmpty());
 
     // store a few application keys and verify recovered
     ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
@@ -858,10 +952,9 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
     restartStateStore();
     state = stateStore.loadNMTokensState();
+    loadedAppKeys = loadNMTokens(state.getIterator());
     assertEquals(currentKey, state.getCurrentMasterKey());
     assertEquals(prevKey, state.getPreviousMasterKey());
-    Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
-        state.getApplicationMasterKeys();
     assertEquals(2, loadedAppKeys.size());
     assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
     assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
@@ -880,9 +973,9 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeNMTokenCurrentMasterKey(currentKey);
     restartStateStore();
     state = stateStore.loadNMTokensState();
+    loadedAppKeys = loadNMTokens(state.getIterator());
     assertEquals(currentKey, state.getCurrentMasterKey());
     assertEquals(prevKey, state.getPreviousMasterKey());
-    loadedAppKeys = state.getApplicationMasterKeys();
     assertEquals(2, loadedAppKeys.size());
     assertNull(loadedAppKeys.get(attempt1));
     assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
@@ -894,9 +987,10 @@ public class TestNMLeveldbStateStoreService {
     // test empty when no state
     RecoveredContainerTokensState state =
         stateStore.loadContainerTokensState();
+    Map<ContainerId, Long> loadedActiveTokens = loadContainerTokens(state.it);
     assertNull(state.getCurrentMasterKey());
     assertNull(state.getPreviousMasterKey());
-    assertTrue(state.getActiveTokens().isEmpty());
+    assertTrue(loadedActiveTokens.isEmpty());
 
     // store a master key and verify recovered
     ContainerTokenKeyGeneratorForTest keygen =
@@ -905,18 +999,20 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeContainerTokenCurrentMasterKey(currentKey);
     restartStateStore();
     state = stateStore.loadContainerTokensState();
+    loadedActiveTokens = loadContainerTokens(state.it);
     assertEquals(currentKey, state.getCurrentMasterKey());
     assertNull(state.getPreviousMasterKey());
-    assertTrue(state.getActiveTokens().isEmpty());
+    assertTrue(loadedActiveTokens.isEmpty());
 
     // store a previous key and verify recovered
     MasterKey prevKey = keygen.generateKey();
     stateStore.storeContainerTokenPreviousMasterKey(prevKey);
     restartStateStore();
     state = stateStore.loadContainerTokensState();
+    loadedActiveTokens = loadContainerTokens(state.it);
     assertEquals(currentKey, state.getCurrentMasterKey());
     assertEquals(prevKey, state.getPreviousMasterKey());
-    assertTrue(state.getActiveTokens().isEmpty());
+    assertTrue(loadedActiveTokens.isEmpty());
 
     // store a few container tokens and verify recovered
     ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
@@ -927,10 +1023,9 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeContainerToken(cid2, expTime2);
     restartStateStore();
     state = stateStore.loadContainerTokensState();
+    loadedActiveTokens = loadContainerTokens(state.it);
     assertEquals(currentKey, state.getCurrentMasterKey());
     assertEquals(prevKey, state.getPreviousMasterKey());
-    Map<ContainerId, Long> loadedActiveTokens =
-        state.getActiveTokens();
     assertEquals(2, loadedActiveTokens.size());
     assertEquals(expTime1, loadedActiveTokens.get(cid1));
     assertEquals(expTime2, loadedActiveTokens.get(cid2));
@@ -948,9 +1043,9 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeContainerTokenCurrentMasterKey(currentKey);
     restartStateStore();
     state = stateStore.loadContainerTokensState();
+    loadedActiveTokens = loadContainerTokens(state.it);
     assertEquals(currentKey, state.getCurrentMasterKey());
     assertEquals(prevKey, state.getPreviousMasterKey());
-    loadedActiveTokens = state.getActiveTokens();
     assertEquals(2, loadedActiveTokens.size());
     assertNull(loadedActiveTokens.get(cid1));
     assertEquals(expTime2, loadedActiveTokens.get(cid2));
@@ -1029,8 +1124,8 @@ public class TestNMLeveldbStateStoreService {
   @Test
   public void testUnexpectedKeyDoesntThrowException() throws IOException {
     // test empty when no state
-    List<RecoveredContainerState> recoveredContainers = stateStore
-        .loadContainersState();
+    List<RecoveredContainerState> recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertTrue(recoveredContainers.isEmpty());
 
     ApplicationId appId = ApplicationId.newInstance(1234, 3);
@@ -1045,7 +1140,8 @@ public class TestNMLeveldbStateStoreService {
     + containerId.toString() + "/invalidKey1234").getBytes();
     stateStore.getDB().put(invalidKey, new byte[1]);
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     RecoveredContainerState rcs = recoveredContainers.get(0);
     assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
@@ -1162,8 +1258,8 @@ public class TestNMLeveldbStateStoreService {
   @Test
   public void testStateStoreForResourceMapping() throws IOException {
     // test empty when no state
-    List<RecoveredContainerState> recoveredContainers = stateStore
-        .loadContainersState();
+    List<RecoveredContainerState> recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertTrue(recoveredContainers.isEmpty());
 
     ApplicationId appId = ApplicationId.newInstance(1234, 3);
@@ -1190,7 +1286,8 @@ public class TestNMLeveldbStateStoreService {
 
     // add a invalid key
     restartStateStore();
-    recoveredContainers = stateStore.loadContainersState();
+    recoveredContainers =
+        loadContainersState(stateStore.getContainerStateIterator());
     assertEquals(1, recoveredContainers.size());
     RecoveredContainerState rcs = recoveredContainers.get(0);
     List<Serializable> res = rcs.getResourceMappings()
@@ -1253,7 +1350,8 @@ public class TestNMLeveldbStateStoreService {
     stateStore.storeContainerRestartTimes(containerId,
         restartTimes);
     restartStateStore();
-    RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
+    RecoveredContainerState rcs =
+        loadContainersState(stateStore.getContainerStateIterator()).get(0);
     List<Long> recoveredRestartTimes = rcs.getRestartTimes();
     assertTrue(recoveredRestartTimes.isEmpty());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org