You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by eb...@apache.org on 2020/04/14 18:37:38 UTC

[hadoop] branch branch-2.10 updated: YARN-8680. YARN NM: Implement Iterable Abstraction for LocalResourceTracker state. Contributed by Pradeep Ambati.

This is an automated email from the ASF dual-hosted git repository.

ebadger pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 0e05b02  YARN-8680. YARN NM: Implement Iterable Abstraction for LocalResourceTracker state. Contributed by Pradeep Ambati.
0e05b02 is described below

commit 0e05b02136704c488fcf5991f835a62fd59ad50d
Author: Eric Badger <eb...@verizonmedia.com>
AuthorDate: Tue Apr 14 18:30:56 2020 +0000

    YARN-8680. YARN NM: Implement Iterable Abstraction for LocalResourceTracker
    state. Contributed by Pradeep Ambati.
---
 .../localizer/ResourceLocalizationService.java     |  87 +++----
 .../recovery/NMLeveldbStateStoreService.java       | 174 ++++++++-----
 .../nodemanager/recovery/NMStateStoreService.java  |  29 ++-
 .../recovery/NMMemoryStateStoreService.java        |  18 +-
 .../recovery/TestNMLeveldbStateStoreService.java   | 269 ++++++++++++++++++---
 5 files changed, 418 insertions(+), 159 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 25df843..92f3286 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -309,63 +309,66 @@ public class ResourceLocalizationService extends CompositeService
         String user = userEntry.getKey();
         RecoveredUserResources userResources = userEntry.getValue();
         trackerState = userResources.getPrivateTrackerState();
-        if (!trackerState.isEmpty()) {
-          LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-              null, dispatcher, true, super.getConfig(), stateStore,
-              dirsHandler);
-          LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
-              tracker);
-          if (oldTracker != null) {
-            tracker = oldTracker;
-          }
-          recoverTrackerResources(tracker, trackerState);
+        LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+            null, dispatcher, true, super.getConfig(), stateStore,
+            dirsHandler);
+        LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
+            tracker);
+        if (oldTracker != null) {
+          tracker = oldTracker;
         }
+        recoverTrackerResources(tracker, trackerState);
 
         for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
             userResources.getAppTrackerStates().entrySet()) {
           trackerState = appEntry.getValue();
-          if (!trackerState.isEmpty()) {
-            ApplicationId appId = appEntry.getKey();
-            String appIdStr = appId.toString();
-            LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-                appId, dispatcher, false, super.getConfig(), stateStore,
-                dirsHandler);
-            LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
-                tracker);
-            if (oldTracker != null) {
-              tracker = oldTracker;
-            }
-            recoverTrackerResources(tracker, trackerState);
+          ApplicationId appId = appEntry.getKey();
+          String appIdStr = appId.toString();
+          LocalResourcesTracker tracker1 = new LocalResourcesTrackerImpl(user,
+              appId, dispatcher, false, super.getConfig(), stateStore,
+              dirsHandler);
+          LocalResourcesTracker oldTracker1 = appRsrc.putIfAbsent(appIdStr,
+              tracker1);
+          if (oldTracker1 != null) {
+            tracker1 = oldTracker1;
           }
+          recoverTrackerResources(tracker1, trackerState);
         }
       }
     }
   }
 
   private void recoverTrackerResources(LocalResourcesTracker tracker,
-      LocalResourceTrackerState state) throws URISyntaxException {
-    for (LocalizedResourceProto proto : state.getLocalizedResources()) {
-      LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
-      LocalResourceRequest req = new LocalResourceRequest(rsrc);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Recovering localized resource " + req + " at "
-            + proto.getLocalPath());
+      LocalResourceTrackerState state) throws URISyntaxException, IOException {
+    try (RecoveryIterator<LocalizedResourceProto> it =
+             state.getCompletedResourcesIterator()) {
+      while (it != null && it.hasNext()) {
+        LocalizedResourceProto proto = it.next();
+        LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
+        LocalResourceRequest req = new LocalResourceRequest(rsrc);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Recovering localized resource " + req + " at "
+              + proto.getLocalPath());
+        }
+        tracker.handle(new ResourceRecoveredEvent(req,
+            new Path(proto.getLocalPath()), proto.getSize()));
       }
-      tracker.handle(new ResourceRecoveredEvent(req,
-          new Path(proto.getLocalPath()), proto.getSize()));
     }
 
-    for (Map.Entry<LocalResourceProto, Path> entry :
-         state.getInProgressResources().entrySet()) {
-      LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
-      LocalResourceRequest req = new LocalResourceRequest(rsrc);
-      Path localPath = entry.getValue();
-      tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
-
-      // delete any in-progress localizations, containers will request again
-      LOG.info("Deleting in-progress localization for " + req + " at "
-          + localPath);
-      tracker.remove(tracker.getLocalizedResource(req), delService);
+    try (RecoveryIterator<Map.Entry<LocalResourceProto, Path>> it =
+             state.getStartedResourcesIterator()) {
+      while (it != null && it.hasNext()) {
+        Map.Entry<LocalResourceProto, Path> entry = it.next();
+        LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
+        LocalResourceRequest req = new LocalResourceRequest(rsrc);
+        Path localPath = entry.getValue();
+        tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
+
+        // delete any in-progress localizations, containers will request again
+        LOG.info("Deleting in-progress localization for " + req + " at "
+            + localPath);
+        tracker.remove(tracker.getLocalizedResource(req), delService);
+      }
     }
 
     // TODO: remove untracked directories in local filesystem
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index ee6bf6b..bcdcc4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -68,7 +68,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.AbstractMap;
-import java.util.ArrayList;
+import java.util.AbstractMap.SimpleEntry;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -154,6 +154,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
 
+  /**
+   * The Local Tracker State DB key locations - "completed" and "started".
+   * To seek through app tracker states in RecoveredUserResources
+   * we need to move from one app tracker state to another using key "zzz".
+   * zzz comes later in lexicographical order than started.
+   * Similarly to move one user to another in RLS,we can use "zzz",
+   * as RecoveredUserResources uses two keys appcache and filecache.
+   */
+  private static final String BEYOND_ENTRIES_SUFFIX = "zzz/";
+
   private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX =
       "/assignedResources_";
 
@@ -832,112 +842,154 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   public RecoveredLocalizationState loadLocalizationState()
       throws IOException {
     RecoveredLocalizationState state = new RecoveredLocalizationState();
-    LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX);
-    state.publicTrackerState = loadResourceTrackerState(it,
+    state.publicTrackerState = loadResourceTrackerState(
         LOCALIZATION_PUBLIC_KEY_PREFIX);
     state.it = new UserResourcesIterator();
     return state;
   }
 
-  private LocalResourceTrackerState loadResourceTrackerState(
-      LeveldbIterator iter, String keyPrefix) throws IOException {
+  private LocalResourceTrackerState loadResourceTrackerState(String keyPrefix)
+      throws IOException {
     final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX;
     final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
-    LocalResourceTrackerState state = new LocalResourceTrackerState();
-    while (iter.hasNext()) {
-      Entry<byte[], byte[]> entry = iter.peekNext();
-      String key = asString(entry.getKey());
-      if (!key.startsWith(keyPrefix)) {
-        break;
-      }
 
-      if (key.startsWith(completedPrefix)) {
-        state.localizedResources = loadCompletedResources(iter,
-            completedPrefix);
-      } else if (key.startsWith(startedPrefix)) {
-        state.inProgressResources = loadStartedResources(iter, startedPrefix);
-      } else {
-        throw new IOException("Unexpected key in resource tracker state: "
-            + key);
-      }
+    RecoveryIterator<LocalizedResourceProto> crIt =
+        new CompletedResourcesIterator(completedPrefix);
+    RecoveryIterator<Entry<LocalResourceProto, Path>> srIt =
+        new StartedResourcesIterator(startedPrefix);
+
+    return new LocalResourceTrackerState(crIt, srIt);
+  }
+
+  private class CompletedResourcesIterator extends
+      BaseRecoveryIterator<LocalizedResourceProto> {
+    private String startKey;
+    CompletedResourcesIterator(String startKey) throws IOException {
+      super(startKey);
+      this.startKey = startKey;
     }
 
-    return state;
+    @Override
+    protected LocalizedResourceProto getNextItem(LeveldbIterator it)
+        throws IOException {
+      return getNextCompletedResource(it, startKey);
+    }
   }
 
-  private List<LocalizedResourceProto> loadCompletedResources(
+  private LocalizedResourceProto getNextCompletedResource(
       LeveldbIterator iter, String keyPrefix) throws IOException {
-    List<LocalizedResourceProto> rsrcs =
-        new ArrayList<LocalizedResourceProto>();
-    while (iter.hasNext()) {
-      Entry<byte[],byte[]> entry = iter.peekNext();
+    LocalizedResourceProto nextCompletedResource = null;
+    if (iter.hasNext()){
+      Entry<byte[], byte[]> entry = iter.next();
       String key = asString(entry.getKey());
       if (!key.startsWith(keyPrefix)) {
-        break;
+        return null;
       }
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("Loading completed resource from " + key);
       }
-      rsrcs.add(LocalizedResourceProto.parseFrom(entry.getValue()));
-      iter.next();
+      nextCompletedResource = LocalizedResourceProto.parseFrom(
+          entry.getValue());
     }
+    return nextCompletedResource;
+  }
 
-    return rsrcs;
+  private class StartedResourcesIterator extends
+      BaseRecoveryIterator<Entry<LocalResourceProto, Path>> {
+    private String startKey;
+    StartedResourcesIterator(String startKey) throws IOException {
+      super(startKey);
+      this.startKey = startKey;
+    }
+
+    @Override
+    protected Entry<LocalResourceProto, Path> getNextItem(LeveldbIterator it)
+        throws IOException {
+      return getNextStartedResource(it, startKey);
+    }
   }
 
-  private Map<LocalResourceProto, Path> loadStartedResources(
+  private Entry<LocalResourceProto, Path> getNextStartedResource(
       LeveldbIterator iter, String keyPrefix) throws IOException {
-    Map<LocalResourceProto, Path> rsrcs =
-        new HashMap<LocalResourceProto, Path>();
-    while (iter.hasNext()) {
-      Entry<byte[],byte[]> entry = iter.peekNext();
+    Entry<LocalResourceProto, Path> nextStartedResource = null;
+    if (iter.hasNext()){
+      Entry<byte[], byte[]> entry = iter.next();
       String key = asString(entry.getKey());
       if (!key.startsWith(keyPrefix)) {
-        break;
+        return null;
       }
 
       Path localPath = new Path(key.substring(keyPrefix.length()));
       if (LOG.isDebugEnabled()) {
         LOG.debug("Loading in-progress resource at " + localPath);
       }
-      rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath);
-      iter.next();
+      nextStartedResource = new SimpleEntry<LocalResourceProto, Path>(
+          LocalResourceProto.parseFrom(entry.getValue()), localPath);
     }
+    return nextStartedResource;
+  }
 
-    return rsrcs;
+  private void seekPastPrefix(LeveldbIterator iter, String keyPrefix)
+      throws IOException {
+    try{
+      iter.seek(bytes(keyPrefix + BEYOND_ENTRIES_SUFFIX));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.peekNext();
+        String key = asString(entry.getKey());
+        if (key.startsWith(keyPrefix)) {
+          iter.next();
+        } else {
+          break;
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
   }
 
   private RecoveredUserResources loadUserLocalizedResources(
       LeveldbIterator iter, String keyPrefix) throws IOException {
     RecoveredUserResources userResources = new RecoveredUserResources();
+
+    // seek through App cache
+    String appCachePrefix = keyPrefix + LOCALIZATION_APPCACHE_SUFFIX;
+    iter.seek(bytes(appCachePrefix));
     while (iter.hasNext()) {
-      Entry<byte[],byte[]> entry = iter.peekNext();
+      Entry<byte[], byte[]> entry = iter.peekNext();
       String key = asString(entry.getKey());
-      if (!key.startsWith(keyPrefix)) {
+
+      if (!key.startsWith(appCachePrefix)) {
         break;
       }
 
-      if (key.startsWith(LOCALIZATION_FILECACHE_SUFFIX, keyPrefix.length())) {
-        userResources.privateTrackerState = loadResourceTrackerState(iter,
-            keyPrefix + LOCALIZATION_FILECACHE_SUFFIX);
-      } else if (key.startsWith(LOCALIZATION_APPCACHE_SUFFIX,
-          keyPrefix.length())) {
-        int appIdStartPos = keyPrefix.length() +
-            LOCALIZATION_APPCACHE_SUFFIX.length();
-        int appIdEndPos = key.indexOf('/', appIdStartPos);
-        if (appIdEndPos < 0) {
-          throw new IOException("Unable to determine appID in resource key: "
-              + key);
-        }
-        ApplicationId appId = ApplicationId.fromString(
-            key.substring(appIdStartPos, appIdEndPos));
-        userResources.appTrackerStates.put(appId,
-            loadResourceTrackerState(iter, key.substring(0, appIdEndPos+1)));
-      } else {
-        throw new IOException("Unexpected user resource key " + key);
+      int appIdStartPos = appCachePrefix.length();
+      int appIdEndPos = key.indexOf('/', appIdStartPos);
+      if (appIdEndPos < 0) {
+        throw new IOException("Unable to determine appID in resource key: "
+            + key);
       }
-    }
+      ApplicationId appId = ApplicationId.fromString(
+          key.substring(appIdStartPos, appIdEndPos));
+      String trackerStateKey = key.substring(0, appIdEndPos+1);
+      userResources.appTrackerStates.put(appId,
+          loadResourceTrackerState(trackerStateKey));
+      // Seek to next application
+      seekPastPrefix(iter, trackerStateKey);
+    }
+
+    // File Cache
+    String fileCachePrefix = keyPrefix + LOCALIZATION_FILECACHE_SUFFIX;
+    iter.seek(bytes(fileCachePrefix));
+    Entry<byte[], byte[]> entry = iter.peekNext();
+    String key = asString(entry.getKey());
+    if (key.startsWith(fileCachePrefix)) {
+      userResources.privateTrackerState =
+          loadResourceTrackerState(fileCachePrefix);
+    }
+
+    // seek to Next User.
+    seekPastPrefix(iter, keyPrefix);
     return userResources;
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 50eeaef..3831aaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -209,27 +208,31 @@ public abstract class NMStateStoreService extends AbstractService {
   }
 
   public static class LocalResourceTrackerState {
-    List<LocalizedResourceProto> localizedResources =
-        new ArrayList<LocalizedResourceProto>();
-    Map<LocalResourceProto, Path> inProgressResources =
-        new HashMap<LocalResourceProto, Path>();
+    final private RecoveryIterator<LocalizedResourceProto>
+        completedResourcesIterator;
+    final private RecoveryIterator<Entry<LocalResourceProto, Path>>
+        startedResourcesIterator;
 
-    public List<LocalizedResourceProto> getLocalizedResources() {
-      return localizedResources;
+    LocalResourceTrackerState(RecoveryIterator<LocalizedResourceProto> crIt,
+        RecoveryIterator<Entry<LocalResourceProto, Path>> srIt) {
+      this.completedResourcesIterator = crIt;
+      this.startedResourcesIterator = srIt;
     }
 
-    public Map<LocalResourceProto, Path> getInProgressResources() {
-      return inProgressResources;
+    public RecoveryIterator<LocalizedResourceProto>
+        getCompletedResourcesIterator() {
+      return completedResourcesIterator;
     }
 
-    public boolean isEmpty() {
-      return localizedResources.isEmpty() && inProgressResources.isEmpty();
+    public RecoveryIterator<Entry<LocalResourceProto, Path>>
+        getStartedResourcesIterator() {
+      return startedResourcesIterator;
     }
   }
 
   public static class RecoveredUserResources {
     LocalResourceTrackerState privateTrackerState =
-        new LocalResourceTrackerState();
+        new LocalResourceTrackerState(null, null);
     Map<ApplicationId, LocalResourceTrackerState> appTrackerStates =
         new HashMap<ApplicationId, LocalResourceTrackerState>();
 
@@ -245,7 +248,7 @@ public abstract class NMStateStoreService extends AbstractService {
 
   public static class RecoveredLocalizationState {
     LocalResourceTrackerState publicTrackerState =
-        new LocalResourceTrackerState();
+        new LocalResourceTrackerState(null, null);
     RecoveryIterator<Entry<String, RecoveredUserResources>> it = null;
 
     public LocalResourceTrackerState getPublicTrackerState() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 25d96da..7911309 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -283,13 +283,23 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
-    LocalResourceTrackerState result = new LocalResourceTrackerState();
-    result.localizedResources.addAll(ts.localizedResources.values());
+    List<LocalizedResourceProto> completedResources =
+        new ArrayList<LocalizedResourceProto>(ts.localizedResources.values());
+    RecoveryIterator<LocalizedResourceProto> crIt =
+        new NMMemoryRecoveryIterator<LocalizedResourceProto>(
+            completedResources.iterator());
+
+    Map<LocalResourceProto, Path> inProgressMap =
+        new HashMap<LocalResourceProto, Path>();
     for (Map.Entry<Path, LocalResourceProto> entry :
          ts.inProgressMap.entrySet()) {
-      result.inProgressResources.put(entry.getValue(), entry.getKey());
+      inProgressMap.put(entry.getValue(), entry.getKey());
     }
-    return result;
+    RecoveryIterator<Map.Entry<LocalResourceProto, Path>> srIt =
+        new NMMemoryRecoveryIterator<Map.Entry<LocalResourceProto, Path>>(
+            inProgressMap.entrySet().iterator());
+
+    return new LocalResourceTrackerState(crIt, srIt);
   }
 
   private TrackerState getTrackerState(TrackerKey key) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 3f33584..c64d48f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -190,6 +190,28 @@ public class TestNMLeveldbStateStoreService {
     return containerTokens;
   }
 
+  private List<LocalizedResourceProto> loadCompletedResources(
+      RecoveryIterator<LocalizedResourceProto> it) throws IOException {
+    List<LocalizedResourceProto> completedResources =
+        new ArrayList<LocalizedResourceProto>();
+    while (it != null && it.hasNext()) {
+      completedResources.add(it.next());
+    }
+    return completedResources;
+  }
+
+  private Map<LocalResourceProto, Path> loadStartedResources(
+      RecoveryIterator <Map.Entry<LocalResourceProto, Path>> it)
+      throws IOException {
+    Map<LocalResourceProto, Path> startedResources =
+        new HashMap<LocalResourceProto, Path>();
+    while (it != null &&it.hasNext()) {
+      Map.Entry<LocalResourceProto, Path> entry = it.next();
+      startedResources.put(entry.getKey(), entry.getValue());
+    }
+    return startedResources;
+  }
+
   private void restartStateStore() throws IOException {
     // need to close so leveldb releases database lock
     if (stateStore != null) {
@@ -205,8 +227,10 @@ public class TestNMLeveldbStateStoreService {
     assertNotNull(state);
     LocalResourceTrackerState pubts = state.getPublicTrackerState();
     assertNotNull(pubts);
-    assertTrue(pubts.getLocalizedResources().isEmpty());
-    assertTrue(pubts.getInProgressResources().isEmpty());
+    assertTrue(loadCompletedResources(pubts.getCompletedResourcesIterator())
+        .isEmpty());
+    assertTrue(loadStartedResources(pubts.getStartedResourcesIterator())
+        .isEmpty());
     assertTrue(loadUserResources(state.getIterator()).isEmpty());
   }
 
@@ -518,6 +542,111 @@ public class TestNMLeveldbStateStoreService {
   }
 
   @Test
+  public void testLocalTrackerStateIterator() throws IOException {
+    String user1 = "somebody";
+    ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(2, 2);
+
+    String user2 = "someone";
+    ApplicationId appId3 = ApplicationId.newInstance(3, 3);
+
+    // start and finish local resource for applications
+    Path appRsrcPath1 = new Path("hdfs://some/app/resource1");
+    LocalResourcePBImpl rsrcPb1 = (LocalResourcePBImpl)
+        LocalResource.newInstance(
+            URL.fromPath(appRsrcPath1),
+            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
+            123L, 456L);
+    LocalResourceProto appRsrcProto1 = rsrcPb1.getProto();
+    Path appRsrcLocalPath1 = new Path("/some/local/dir/for/apprsrc1");
+    Path appRsrcPath2 = new Path("hdfs://some/app/resource2");
+    LocalResourcePBImpl rsrcPb2 = (LocalResourcePBImpl)
+        LocalResource.newInstance(
+            URL.fromPath(appRsrcPath2),
+            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
+            123L, 456L);
+    LocalResourceProto appRsrcProto2 = rsrcPb2.getProto();
+    Path appRsrcLocalPath2 = new Path("/some/local/dir/for/apprsrc2");
+    Path appRsrcPath3 = new Path("hdfs://some/app/resource3");
+    LocalResourcePBImpl rsrcPb3 = (LocalResourcePBImpl)
+        LocalResource.newInstance(
+            URL.fromPath(appRsrcPath3),
+            LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
+            123L, 456L);
+    LocalResourceProto appRsrcProto3 = rsrcPb3.getProto();
+    Path appRsrcLocalPath3 = new Path("/some/local/dir/for/apprsrc2");
+
+    stateStore.startResourceLocalization(user1, appId1, appRsrcProto1,
+        appRsrcLocalPath1);
+    stateStore.startResourceLocalization(user1, appId2, appRsrcProto2,
+        appRsrcLocalPath2);
+    stateStore.startResourceLocalization(user2, appId3, appRsrcProto3,
+        appRsrcLocalPath3);
+
+    LocalizedResourceProto appLocalizedProto1 =
+        LocalizedResourceProto.newBuilder()
+            .setResource(appRsrcProto1)
+            .setLocalPath(appRsrcLocalPath1.toString())
+            .setSize(1234567L)
+            .build();
+    LocalizedResourceProto appLocalizedProto2 =
+        LocalizedResourceProto.newBuilder()
+            .setResource(appRsrcProto2)
+            .setLocalPath(appRsrcLocalPath2.toString())
+            .setSize(1234567L)
+            .build();
+    LocalizedResourceProto appLocalizedProto3 =
+        LocalizedResourceProto.newBuilder()
+            .setResource(appRsrcProto3)
+            .setLocalPath(appRsrcLocalPath3.toString())
+            .setSize(1234567L)
+            .build();
+
+
+    stateStore.finishResourceLocalization(user1, appId1, appLocalizedProto1);
+    stateStore.finishResourceLocalization(user1, appId2, appLocalizedProto2);
+    stateStore.finishResourceLocalization(user2, appId3, appLocalizedProto3);
+
+
+    List<LocalizedResourceProto> completedResources =
+        new ArrayList<LocalizedResourceProto>();
+    Map<LocalResourceProto, Path> startedResources =
+        new HashMap<LocalResourceProto, Path>();
+
+    // restart and verify two users exist and two apps completed for user1.
+    restartStateStore();
+    RecoveredLocalizationState state = stateStore.loadLocalizationState();
+    Map<String, RecoveredUserResources> userResources =
+        loadUserResources(state.getIterator());
+    assertEquals(2, userResources.size());
+
+    RecoveredUserResources uResource = userResources.get(user1);
+    assertEquals(2, uResource.getAppTrackerStates().size());
+    LocalResourceTrackerState app1ts =
+        uResource.getAppTrackerStates().get(appId1);
+    assertNotNull(app1ts);
+    completedResources = loadCompletedResources(
+        app1ts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        app1ts.getStartedResourcesIterator());
+    assertTrue(startedResources.isEmpty());
+    assertEquals(1, completedResources.size());
+    assertEquals(appLocalizedProto1,
+        completedResources.iterator().next());
+    LocalResourceTrackerState app2ts =
+        uResource.getAppTrackerStates().get(appId2);
+    assertNotNull(app2ts);
+    completedResources = loadCompletedResources(
+        app2ts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        app2ts.getStartedResourcesIterator());
+    assertTrue(startedResources.isEmpty());
+    assertEquals(1, completedResources.size());
+    assertEquals(appLocalizedProto2,
+        completedResources.iterator().next());
+  }
+
+  @Test
   public void testStartResourceLocalization() throws IOException {
     String user = "somebody";
     ApplicationId appId = ApplicationId.newInstance(1, 1);
@@ -534,27 +663,44 @@ public class TestNMLeveldbStateStoreService {
     stateStore.startResourceLocalization(user, appId, appRsrcProto,
         appRsrcLocalPath);
 
+    List<LocalizedResourceProto> completedResources =
+        new ArrayList<LocalizedResourceProto>();
+    Map<LocalResourceProto, Path> startedResources =
+        new HashMap<LocalResourceProto, Path>();
+
     // restart and verify only app resource is marked in-progress
     restartStateStore();
     RecoveredLocalizationState state = stateStore.loadLocalizationState();
     LocalResourceTrackerState pubts = state.getPublicTrackerState();
-    assertTrue(pubts.getLocalizedResources().isEmpty());
-    assertTrue(pubts.getInProgressResources().isEmpty());
+    completedResources = loadCompletedResources(
+        pubts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        pubts.getStartedResourcesIterator());
+    assertTrue(completedResources.isEmpty());
+    assertTrue(startedResources.isEmpty());
     Map<String, RecoveredUserResources> userResources =
         loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
     RecoveredUserResources rur = userResources.get(user);
     LocalResourceTrackerState privts = rur.getPrivateTrackerState();
     assertNotNull(privts);
-    assertTrue(privts.getLocalizedResources().isEmpty());
-    assertTrue(privts.getInProgressResources().isEmpty());
+    completedResources = loadCompletedResources(
+        privts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        privts.getStartedResourcesIterator());
+    assertTrue(completedResources.isEmpty());
+    assertTrue(startedResources.isEmpty());
     assertEquals(1, rur.getAppTrackerStates().size());
     LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
     assertNotNull(appts);
-    assertTrue(appts.getLocalizedResources().isEmpty());
-    assertEquals(1, appts.getInProgressResources().size());
+    completedResources = loadCompletedResources(
+        appts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        appts.getStartedResourcesIterator());
+    assertTrue(completedResources.isEmpty());
+    assertEquals(1, startedResources.size());
     assertEquals(appRsrcLocalPath,
-        appts.getInProgressResources().get(appRsrcProto));
+        startedResources.get(appRsrcProto));
 
     // start some public and private resources
     Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
@@ -589,28 +735,40 @@ public class TestNMLeveldbStateStoreService {
     restartStateStore();
     state = stateStore.loadLocalizationState();
     pubts = state.getPublicTrackerState();
-    assertTrue(pubts.getLocalizedResources().isEmpty());
-    assertEquals(2, pubts.getInProgressResources().size());
+    completedResources = loadCompletedResources(
+        pubts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        pubts.getStartedResourcesIterator());
+    assertTrue(completedResources.isEmpty());
+    assertEquals(2, startedResources.size());
     assertEquals(pubRsrcLocalPath1,
-        pubts.getInProgressResources().get(pubRsrcProto1));
+        startedResources.get(pubRsrcProto1));
     assertEquals(pubRsrcLocalPath2,
-        pubts.getInProgressResources().get(pubRsrcProto2));
+        startedResources.get(pubRsrcProto2));
     userResources = loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
     rur = userResources.get(user);
     privts = rur.getPrivateTrackerState();
     assertNotNull(privts);
-    assertTrue(privts.getLocalizedResources().isEmpty());
-    assertEquals(1, privts.getInProgressResources().size());
+    completedResources = loadCompletedResources(
+        privts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        privts.getStartedResourcesIterator());
+    assertTrue(completedResources.isEmpty());
+    assertEquals(1, startedResources.size());
     assertEquals(privRsrcLocalPath,
-        privts.getInProgressResources().get(privRsrcProto));
+        startedResources.get(privRsrcProto));
     assertEquals(1, rur.getAppTrackerStates().size());
     appts = rur.getAppTrackerStates().get(appId);
     assertNotNull(appts);
-    assertTrue(appts.getLocalizedResources().isEmpty());
-    assertEquals(1, appts.getInProgressResources().size());
+    completedResources = loadCompletedResources(
+        appts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        appts.getStartedResourcesIterator());
+    assertTrue(completedResources.isEmpty());
+    assertEquals(1, startedResources.size());
     assertEquals(appRsrcLocalPath,
-        appts.getInProgressResources().get(appRsrcProto));
+        startedResources.get(appRsrcProto));
   }
 
   @Test
@@ -637,27 +795,44 @@ public class TestNMLeveldbStateStoreService {
           .build();
     stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
 
+    List<LocalizedResourceProto> completedResources =
+        new ArrayList<LocalizedResourceProto>();
+    Map<LocalResourceProto, Path> startedResources =
+        new HashMap<LocalResourceProto, Path>();
+
     // restart and verify only app resource is completed
     restartStateStore();
     RecoveredLocalizationState state = stateStore.loadLocalizationState();
     LocalResourceTrackerState pubts = state.getPublicTrackerState();
-    assertTrue(pubts.getLocalizedResources().isEmpty());
-    assertTrue(pubts.getInProgressResources().isEmpty());
+    completedResources = loadCompletedResources(
+        pubts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        pubts.getStartedResourcesIterator());
+    assertTrue(completedResources.isEmpty());
+    assertTrue(startedResources.isEmpty());
     Map<String, RecoveredUserResources> userResources =
         loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
     RecoveredUserResources rur = userResources.get(user);
     LocalResourceTrackerState privts = rur.getPrivateTrackerState();
     assertNotNull(privts);
-    assertTrue(privts.getLocalizedResources().isEmpty());
-    assertTrue(privts.getInProgressResources().isEmpty());
+    completedResources = loadCompletedResources(
+        privts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        privts.getStartedResourcesIterator());
+    assertTrue(completedResources.isEmpty());
+    assertTrue(startedResources.isEmpty());
     assertEquals(1, rur.getAppTrackerStates().size());
     LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
     assertNotNull(appts);
-    assertTrue(appts.getInProgressResources().isEmpty());
-    assertEquals(1, appts.getLocalizedResources().size());
+    completedResources = loadCompletedResources(
+        appts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        appts.getStartedResourcesIterator());
+    assertTrue(startedResources.isEmpty());
+    assertEquals(1, completedResources.size());
     assertEquals(appLocalizedProto,
-        appts.getLocalizedResources().iterator().next());
+        completedResources.iterator().next());
 
     // start some public and private resources
     Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
@@ -708,28 +883,40 @@ public class TestNMLeveldbStateStoreService {
     restartStateStore();
     state = stateStore.loadLocalizationState();
     pubts = state.getPublicTrackerState();
-    assertEquals(1, pubts.getLocalizedResources().size());
+    completedResources = loadCompletedResources(
+        pubts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        pubts.getStartedResourcesIterator());
+    assertEquals(1, completedResources.size());
     assertEquals(pubLocalizedProto1,
-        pubts.getLocalizedResources().iterator().next());
-    assertEquals(1, pubts.getInProgressResources().size());
+        completedResources.iterator().next());
+    assertEquals(1, startedResources.size());
     assertEquals(pubRsrcLocalPath2,
-        pubts.getInProgressResources().get(pubRsrcProto2));
+        startedResources.get(pubRsrcProto2));
     userResources = loadUserResources(state.getIterator());
     assertEquals(1, userResources.size());
     rur = userResources.get(user);
     privts = rur.getPrivateTrackerState();
     assertNotNull(privts);
-    assertEquals(1, privts.getLocalizedResources().size());
+    completedResources = loadCompletedResources(
+        privts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        privts.getStartedResourcesIterator());
+    assertEquals(1, completedResources.size());
     assertEquals(privLocalizedProto,
-        privts.getLocalizedResources().iterator().next());
-    assertTrue(privts.getInProgressResources().isEmpty());
+        completedResources.iterator().next());
+    assertTrue(startedResources.isEmpty());
     assertEquals(1, rur.getAppTrackerStates().size());
     appts = rur.getAppTrackerStates().get(appId);
     assertNotNull(appts);
-    assertTrue(appts.getInProgressResources().isEmpty());
-    assertEquals(1, appts.getLocalizedResources().size());
+    completedResources = loadCompletedResources(
+        appts.getCompletedResourcesIterator());
+    startedResources = loadStartedResources(
+        appts.getStartedResourcesIterator());
+    assertTrue(startedResources.isEmpty());
+    assertEquals(1, completedResources.size());
     assertEquals(appLocalizedProto,
-        appts.getLocalizedResources().iterator().next());
+        completedResources.iterator().next());
   }
 
   @Test
@@ -817,10 +1004,14 @@ public class TestNMLeveldbStateStoreService {
     restartStateStore();
     RecoveredLocalizationState state = stateStore.loadLocalizationState();
     LocalResourceTrackerState pubts = state.getPublicTrackerState();
-    assertTrue(pubts.getInProgressResources().isEmpty());
-    assertEquals(1, pubts.getLocalizedResources().size());
+    List<LocalizedResourceProto> completedResources =
+        loadCompletedResources(pubts.getCompletedResourcesIterator());
+    Map<LocalResourceProto, Path> startedResources =
+        loadStartedResources(pubts.getStartedResourcesIterator());
+    assertTrue(startedResources.isEmpty());
+    assertEquals(1, completedResources.size());
     assertEquals(pubLocalizedProto1,
-        pubts.getLocalizedResources().iterator().next());
+        completedResources.iterator().next());
     Map<String, RecoveredUserResources> userResources =
         loadUserResources(state.getIterator());
     assertTrue(userResources.isEmpty());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org