You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by eb...@apache.org on 2020/04/14 18:37:38 UTC
[hadoop] branch branch-2.10 updated: YARN-8680. YARN NM: Implement
Iterable Abstraction for LocalResourceTracker state. Contributed by Pradeep
Ambati.
This is an automated email from the ASF dual-hosted git repository.
ebadger pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 0e05b02 YARN-8680. YARN NM: Implement Iterable Abstraction for LocalResourceTracker state. Contributed by Pradeep Ambati.
0e05b02 is described below
commit 0e05b02136704c488fcf5991f835a62fd59ad50d
Author: Eric Badger <eb...@verizonmedia.com>
AuthorDate: Tue Apr 14 18:30:56 2020 +0000
YARN-8680. YARN NM: Implement Iterable Abstraction for LocalResourceTracker
state. Contributed by Pradeep Ambati.
---
.../localizer/ResourceLocalizationService.java | 87 +++----
.../recovery/NMLeveldbStateStoreService.java | 174 ++++++++-----
.../nodemanager/recovery/NMStateStoreService.java | 29 ++-
.../recovery/NMMemoryStateStoreService.java | 18 +-
.../recovery/TestNMLeveldbStateStoreService.java | 269 ++++++++++++++++++---
5 files changed, 418 insertions(+), 159 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 25df843..92f3286 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -309,63 +309,66 @@ public class ResourceLocalizationService extends CompositeService
String user = userEntry.getKey();
RecoveredUserResources userResources = userEntry.getValue();
trackerState = userResources.getPrivateTrackerState();
- if (!trackerState.isEmpty()) {
- LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
- null, dispatcher, true, super.getConfig(), stateStore,
- dirsHandler);
- LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
- tracker);
- if (oldTracker != null) {
- tracker = oldTracker;
- }
- recoverTrackerResources(tracker, trackerState);
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ null, dispatcher, true, super.getConfig(), stateStore,
+ dirsHandler);
+ LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
+ tracker);
+ if (oldTracker != null) {
+ tracker = oldTracker;
}
+ recoverTrackerResources(tracker, trackerState);
for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
userResources.getAppTrackerStates().entrySet()) {
trackerState = appEntry.getValue();
- if (!trackerState.isEmpty()) {
- ApplicationId appId = appEntry.getKey();
- String appIdStr = appId.toString();
- LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
- appId, dispatcher, false, super.getConfig(), stateStore,
- dirsHandler);
- LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
- tracker);
- if (oldTracker != null) {
- tracker = oldTracker;
- }
- recoverTrackerResources(tracker, trackerState);
+ ApplicationId appId = appEntry.getKey();
+ String appIdStr = appId.toString();
+ LocalResourcesTracker tracker1 = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, false, super.getConfig(), stateStore,
+ dirsHandler);
+ LocalResourcesTracker oldTracker1 = appRsrc.putIfAbsent(appIdStr,
+ tracker1);
+ if (oldTracker1 != null) {
+ tracker1 = oldTracker1;
}
+ recoverTrackerResources(tracker1, trackerState);
}
}
}
}
private void recoverTrackerResources(LocalResourcesTracker tracker,
- LocalResourceTrackerState state) throws URISyntaxException {
- for (LocalizedResourceProto proto : state.getLocalizedResources()) {
- LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
- LocalResourceRequest req = new LocalResourceRequest(rsrc);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recovering localized resource " + req + " at "
- + proto.getLocalPath());
+ LocalResourceTrackerState state) throws URISyntaxException, IOException {
+ try (RecoveryIterator<LocalizedResourceProto> it =
+ state.getCompletedResourcesIterator()) {
+ while (it != null && it.hasNext()) {
+ LocalizedResourceProto proto = it.next();
+ LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovering localized resource " + req + " at "
+ + proto.getLocalPath());
+ }
+ tracker.handle(new ResourceRecoveredEvent(req,
+ new Path(proto.getLocalPath()), proto.getSize()));
}
- tracker.handle(new ResourceRecoveredEvent(req,
- new Path(proto.getLocalPath()), proto.getSize()));
}
- for (Map.Entry<LocalResourceProto, Path> entry :
- state.getInProgressResources().entrySet()) {
- LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
- LocalResourceRequest req = new LocalResourceRequest(rsrc);
- Path localPath = entry.getValue();
- tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
-
- // delete any in-progress localizations, containers will request again
- LOG.info("Deleting in-progress localization for " + req + " at "
- + localPath);
- tracker.remove(tracker.getLocalizedResource(req), delService);
+ try (RecoveryIterator<Map.Entry<LocalResourceProto, Path>> it =
+ state.getStartedResourcesIterator()) {
+ while (it != null && it.hasNext()) {
+ Map.Entry<LocalResourceProto, Path> entry = it.next();
+ LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ Path localPath = entry.getValue();
+ tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
+
+ // delete any in-progress localizations, containers will request again
+ LOG.info("Deleting in-progress localization for " + req + " at "
+ + localPath);
+ tracker.remove(tracker.getLocalizedResource(req), delService);
+ }
}
// TODO: remove untracked directories in local filesystem
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index ee6bf6b..bcdcc4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -68,7 +68,7 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractMap;
-import java.util.ArrayList;
+import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -154,6 +154,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
+ /**
+ * The Local Tracker State DB key locations - "completed" and "started".
+ * To seek through app tracker states in RecoveredUserResources
+ * we need to move from one app tracker state to another using key "zzz".
+ * zzz comes later in lexicographical order than started.
+ * Similarly to move one user to another in RLS,we can use "zzz",
+ * as RecoveredUserResources uses two keys appcache and filecache.
+ */
+ private static final String BEYOND_ENTRIES_SUFFIX = "zzz/";
+
private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX =
"/assignedResources_";
@@ -832,112 +842,154 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
public RecoveredLocalizationState loadLocalizationState()
throws IOException {
RecoveredLocalizationState state = new RecoveredLocalizationState();
- LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX);
- state.publicTrackerState = loadResourceTrackerState(it,
+ state.publicTrackerState = loadResourceTrackerState(
LOCALIZATION_PUBLIC_KEY_PREFIX);
state.it = new UserResourcesIterator();
return state;
}
- private LocalResourceTrackerState loadResourceTrackerState(
- LeveldbIterator iter, String keyPrefix) throws IOException {
+ private LocalResourceTrackerState loadResourceTrackerState(String keyPrefix)
+ throws IOException {
final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX;
final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
- LocalResourceTrackerState state = new LocalResourceTrackerState();
- while (iter.hasNext()) {
- Entry<byte[], byte[]> entry = iter.peekNext();
- String key = asString(entry.getKey());
- if (!key.startsWith(keyPrefix)) {
- break;
- }
- if (key.startsWith(completedPrefix)) {
- state.localizedResources = loadCompletedResources(iter,
- completedPrefix);
- } else if (key.startsWith(startedPrefix)) {
- state.inProgressResources = loadStartedResources(iter, startedPrefix);
- } else {
- throw new IOException("Unexpected key in resource tracker state: "
- + key);
- }
+ RecoveryIterator<LocalizedResourceProto> crIt =
+ new CompletedResourcesIterator(completedPrefix);
+ RecoveryIterator<Entry<LocalResourceProto, Path>> srIt =
+ new StartedResourcesIterator(startedPrefix);
+
+ return new LocalResourceTrackerState(crIt, srIt);
+ }
+
+ private class CompletedResourcesIterator extends
+ BaseRecoveryIterator<LocalizedResourceProto> {
+ private String startKey;
+ CompletedResourcesIterator(String startKey) throws IOException {
+ super(startKey);
+ this.startKey = startKey;
}
- return state;
+ @Override
+ protected LocalizedResourceProto getNextItem(LeveldbIterator it)
+ throws IOException {
+ return getNextCompletedResource(it, startKey);
+ }
}
- private List<LocalizedResourceProto> loadCompletedResources(
+ private LocalizedResourceProto getNextCompletedResource(
LeveldbIterator iter, String keyPrefix) throws IOException {
- List<LocalizedResourceProto> rsrcs =
- new ArrayList<LocalizedResourceProto>();
- while (iter.hasNext()) {
- Entry<byte[],byte[]> entry = iter.peekNext();
+ LocalizedResourceProto nextCompletedResource = null;
+ if (iter.hasNext()){
+ Entry<byte[], byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
- break;
+ return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Loading completed resource from " + key);
}
- rsrcs.add(LocalizedResourceProto.parseFrom(entry.getValue()));
- iter.next();
+ nextCompletedResource = LocalizedResourceProto.parseFrom(
+ entry.getValue());
}
+ return nextCompletedResource;
+ }
- return rsrcs;
+ private class StartedResourcesIterator extends
+ BaseRecoveryIterator<Entry<LocalResourceProto, Path>> {
+ private String startKey;
+ StartedResourcesIterator(String startKey) throws IOException {
+ super(startKey);
+ this.startKey = startKey;
+ }
+
+ @Override
+ protected Entry<LocalResourceProto, Path> getNextItem(LeveldbIterator it)
+ throws IOException {
+ return getNextStartedResource(it, startKey);
+ }
}
- private Map<LocalResourceProto, Path> loadStartedResources(
+ private Entry<LocalResourceProto, Path> getNextStartedResource(
LeveldbIterator iter, String keyPrefix) throws IOException {
- Map<LocalResourceProto, Path> rsrcs =
- new HashMap<LocalResourceProto, Path>();
- while (iter.hasNext()) {
- Entry<byte[],byte[]> entry = iter.peekNext();
+ Entry<LocalResourceProto, Path> nextStartedResource = null;
+ if (iter.hasNext()){
+ Entry<byte[], byte[]> entry = iter.next();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
- break;
+ return null;
}
Path localPath = new Path(key.substring(keyPrefix.length()));
if (LOG.isDebugEnabled()) {
LOG.debug("Loading in-progress resource at " + localPath);
}
- rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath);
- iter.next();
+ nextStartedResource = new SimpleEntry<LocalResourceProto, Path>(
+ LocalResourceProto.parseFrom(entry.getValue()), localPath);
}
+ return nextStartedResource;
+ }
- return rsrcs;
+ private void seekPastPrefix(LeveldbIterator iter, String keyPrefix)
+ throws IOException {
+ try{
+ iter.seek(bytes(keyPrefix + BEYOND_ENTRIES_SUFFIX));
+ while (iter.hasNext()) {
+ Entry<byte[], byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (key.startsWith(keyPrefix)) {
+ iter.next();
+ } else {
+ break;
+ }
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
}
private RecoveredUserResources loadUserLocalizedResources(
LeveldbIterator iter, String keyPrefix) throws IOException {
RecoveredUserResources userResources = new RecoveredUserResources();
+
+ // seek through App cache
+ String appCachePrefix = keyPrefix + LOCALIZATION_APPCACHE_SUFFIX;
+ iter.seek(bytes(appCachePrefix));
while (iter.hasNext()) {
- Entry<byte[],byte[]> entry = iter.peekNext();
+ Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
- if (!key.startsWith(keyPrefix)) {
+
+ if (!key.startsWith(appCachePrefix)) {
break;
}
- if (key.startsWith(LOCALIZATION_FILECACHE_SUFFIX, keyPrefix.length())) {
- userResources.privateTrackerState = loadResourceTrackerState(iter,
- keyPrefix + LOCALIZATION_FILECACHE_SUFFIX);
- } else if (key.startsWith(LOCALIZATION_APPCACHE_SUFFIX,
- keyPrefix.length())) {
- int appIdStartPos = keyPrefix.length() +
- LOCALIZATION_APPCACHE_SUFFIX.length();
- int appIdEndPos = key.indexOf('/', appIdStartPos);
- if (appIdEndPos < 0) {
- throw new IOException("Unable to determine appID in resource key: "
- + key);
- }
- ApplicationId appId = ApplicationId.fromString(
- key.substring(appIdStartPos, appIdEndPos));
- userResources.appTrackerStates.put(appId,
- loadResourceTrackerState(iter, key.substring(0, appIdEndPos+1)));
- } else {
- throw new IOException("Unexpected user resource key " + key);
+ int appIdStartPos = appCachePrefix.length();
+ int appIdEndPos = key.indexOf('/', appIdStartPos);
+ if (appIdEndPos < 0) {
+ throw new IOException("Unable to determine appID in resource key: "
+ + key);
}
- }
+ ApplicationId appId = ApplicationId.fromString(
+ key.substring(appIdStartPos, appIdEndPos));
+ String trackerStateKey = key.substring(0, appIdEndPos+1);
+ userResources.appTrackerStates.put(appId,
+ loadResourceTrackerState(trackerStateKey));
+ // Seek to next application
+ seekPastPrefix(iter, trackerStateKey);
+ }
+
+ // File Cache
+ String fileCachePrefix = keyPrefix + LOCALIZATION_FILECACHE_SUFFIX;
+ iter.seek(bytes(fileCachePrefix));
+ Entry<byte[], byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (key.startsWith(fileCachePrefix)) {
+ userResources.privateTrackerState =
+ loadResourceTrackerState(fileCachePrefix);
+ }
+
+ // seek to Next User.
+ seekPastPrefix(iter, keyPrefix);
return userResources;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 50eeaef..3831aaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -209,27 +208,31 @@ public abstract class NMStateStoreService extends AbstractService {
}
public static class LocalResourceTrackerState {
- List<LocalizedResourceProto> localizedResources =
- new ArrayList<LocalizedResourceProto>();
- Map<LocalResourceProto, Path> inProgressResources =
- new HashMap<LocalResourceProto, Path>();
+ final private RecoveryIterator<LocalizedResourceProto>
+ completedResourcesIterator;
+ final private RecoveryIterator<Entry<LocalResourceProto, Path>>
+ startedResourcesIterator;
- public List<LocalizedResourceProto> getLocalizedResources() {
- return localizedResources;
+ LocalResourceTrackerState(RecoveryIterator<LocalizedResourceProto> crIt,
+ RecoveryIterator<Entry<LocalResourceProto, Path>> srIt) {
+ this.completedResourcesIterator = crIt;
+ this.startedResourcesIterator = srIt;
}
- public Map<LocalResourceProto, Path> getInProgressResources() {
- return inProgressResources;
+ public RecoveryIterator<LocalizedResourceProto>
+ getCompletedResourcesIterator() {
+ return completedResourcesIterator;
}
- public boolean isEmpty() {
- return localizedResources.isEmpty() && inProgressResources.isEmpty();
+ public RecoveryIterator<Entry<LocalResourceProto, Path>>
+ getStartedResourcesIterator() {
+ return startedResourcesIterator;
}
}
public static class RecoveredUserResources {
LocalResourceTrackerState privateTrackerState =
- new LocalResourceTrackerState();
+ new LocalResourceTrackerState(null, null);
Map<ApplicationId, LocalResourceTrackerState> appTrackerStates =
new HashMap<ApplicationId, LocalResourceTrackerState>();
@@ -245,7 +248,7 @@ public abstract class NMStateStoreService extends AbstractService {
public static class RecoveredLocalizationState {
LocalResourceTrackerState publicTrackerState =
- new LocalResourceTrackerState();
+ new LocalResourceTrackerState(null, null);
RecoveryIterator<Entry<String, RecoveredUserResources>> it = null;
public LocalResourceTrackerState getPublicTrackerState() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 25d96da..7911309 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -283,13 +283,23 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
- LocalResourceTrackerState result = new LocalResourceTrackerState();
- result.localizedResources.addAll(ts.localizedResources.values());
+ List<LocalizedResourceProto> completedResources =
+ new ArrayList<LocalizedResourceProto>(ts.localizedResources.values());
+ RecoveryIterator<LocalizedResourceProto> crIt =
+ new NMMemoryRecoveryIterator<LocalizedResourceProto>(
+ completedResources.iterator());
+
+ Map<LocalResourceProto, Path> inProgressMap =
+ new HashMap<LocalResourceProto, Path>();
for (Map.Entry<Path, LocalResourceProto> entry :
ts.inProgressMap.entrySet()) {
- result.inProgressResources.put(entry.getValue(), entry.getKey());
+ inProgressMap.put(entry.getValue(), entry.getKey());
}
- return result;
+ RecoveryIterator<Map.Entry<LocalResourceProto, Path>> srIt =
+ new NMMemoryRecoveryIterator<Map.Entry<LocalResourceProto, Path>>(
+ inProgressMap.entrySet().iterator());
+
+ return new LocalResourceTrackerState(crIt, srIt);
}
private TrackerState getTrackerState(TrackerKey key) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 3f33584..c64d48f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -190,6 +190,28 @@ public class TestNMLeveldbStateStoreService {
return containerTokens;
}
+ private List<LocalizedResourceProto> loadCompletedResources(
+ RecoveryIterator<LocalizedResourceProto> it) throws IOException {
+ List<LocalizedResourceProto> completedResources =
+ new ArrayList<LocalizedResourceProto>();
+ while (it != null && it.hasNext()) {
+ completedResources.add(it.next());
+ }
+ return completedResources;
+ }
+
+ private Map<LocalResourceProto, Path> loadStartedResources(
+ RecoveryIterator <Map.Entry<LocalResourceProto, Path>> it)
+ throws IOException {
+ Map<LocalResourceProto, Path> startedResources =
+ new HashMap<LocalResourceProto, Path>();
+ while (it != null &&it.hasNext()) {
+ Map.Entry<LocalResourceProto, Path> entry = it.next();
+ startedResources.put(entry.getKey(), entry.getValue());
+ }
+ return startedResources;
+ }
+
private void restartStateStore() throws IOException {
// need to close so leveldb releases database lock
if (stateStore != null) {
@@ -205,8 +227,10 @@ public class TestNMLeveldbStateStoreService {
assertNotNull(state);
LocalResourceTrackerState pubts = state.getPublicTrackerState();
assertNotNull(pubts);
- assertTrue(pubts.getLocalizedResources().isEmpty());
- assertTrue(pubts.getInProgressResources().isEmpty());
+ assertTrue(loadCompletedResources(pubts.getCompletedResourcesIterator())
+ .isEmpty());
+ assertTrue(loadStartedResources(pubts.getStartedResourcesIterator())
+ .isEmpty());
assertTrue(loadUserResources(state.getIterator()).isEmpty());
}
@@ -518,6 +542,111 @@ public class TestNMLeveldbStateStoreService {
}
@Test
+ public void testLocalTrackerStateIterator() throws IOException {
+ String user1 = "somebody";
+ ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+ ApplicationId appId2 = ApplicationId.newInstance(2, 2);
+
+ String user2 = "someone";
+ ApplicationId appId3 = ApplicationId.newInstance(3, 3);
+
+ // start and finish local resource for applications
+ Path appRsrcPath1 = new Path("hdfs://some/app/resource1");
+ LocalResourcePBImpl rsrcPb1 = (LocalResourcePBImpl)
+ LocalResource.newInstance(
+ URL.fromPath(appRsrcPath1),
+ LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
+ 123L, 456L);
+ LocalResourceProto appRsrcProto1 = rsrcPb1.getProto();
+ Path appRsrcLocalPath1 = new Path("/some/local/dir/for/apprsrc1");
+ Path appRsrcPath2 = new Path("hdfs://some/app/resource2");
+ LocalResourcePBImpl rsrcPb2 = (LocalResourcePBImpl)
+ LocalResource.newInstance(
+ URL.fromPath(appRsrcPath2),
+ LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
+ 123L, 456L);
+ LocalResourceProto appRsrcProto2 = rsrcPb2.getProto();
+ Path appRsrcLocalPath2 = new Path("/some/local/dir/for/apprsrc2");
+ Path appRsrcPath3 = new Path("hdfs://some/app/resource3");
+ LocalResourcePBImpl rsrcPb3 = (LocalResourcePBImpl)
+ LocalResource.newInstance(
+ URL.fromPath(appRsrcPath3),
+ LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION,
+ 123L, 456L);
+ LocalResourceProto appRsrcProto3 = rsrcPb3.getProto();
+ Path appRsrcLocalPath3 = new Path("/some/local/dir/for/apprsrc2");
+
+ stateStore.startResourceLocalization(user1, appId1, appRsrcProto1,
+ appRsrcLocalPath1);
+ stateStore.startResourceLocalization(user1, appId2, appRsrcProto2,
+ appRsrcLocalPath2);
+ stateStore.startResourceLocalization(user2, appId3, appRsrcProto3,
+ appRsrcLocalPath3);
+
+ LocalizedResourceProto appLocalizedProto1 =
+ LocalizedResourceProto.newBuilder()
+ .setResource(appRsrcProto1)
+ .setLocalPath(appRsrcLocalPath1.toString())
+ .setSize(1234567L)
+ .build();
+ LocalizedResourceProto appLocalizedProto2 =
+ LocalizedResourceProto.newBuilder()
+ .setResource(appRsrcProto2)
+ .setLocalPath(appRsrcLocalPath2.toString())
+ .setSize(1234567L)
+ .build();
+ LocalizedResourceProto appLocalizedProto3 =
+ LocalizedResourceProto.newBuilder()
+ .setResource(appRsrcProto3)
+ .setLocalPath(appRsrcLocalPath3.toString())
+ .setSize(1234567L)
+ .build();
+
+
+ stateStore.finishResourceLocalization(user1, appId1, appLocalizedProto1);
+ stateStore.finishResourceLocalization(user1, appId2, appLocalizedProto2);
+ stateStore.finishResourceLocalization(user2, appId3, appLocalizedProto3);
+
+
+ List<LocalizedResourceProto> completedResources =
+ new ArrayList<LocalizedResourceProto>();
+ Map<LocalResourceProto, Path> startedResources =
+ new HashMap<LocalResourceProto, Path>();
+
+ // restart and verify two users exist and two apps completed for user1.
+ restartStateStore();
+ RecoveredLocalizationState state = stateStore.loadLocalizationState();
+ Map<String, RecoveredUserResources> userResources =
+ loadUserResources(state.getIterator());
+ assertEquals(2, userResources.size());
+
+ RecoveredUserResources uResource = userResources.get(user1);
+ assertEquals(2, uResource.getAppTrackerStates().size());
+ LocalResourceTrackerState app1ts =
+ uResource.getAppTrackerStates().get(appId1);
+ assertNotNull(app1ts);
+ completedResources = loadCompletedResources(
+ app1ts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ app1ts.getStartedResourcesIterator());
+ assertTrue(startedResources.isEmpty());
+ assertEquals(1, completedResources.size());
+ assertEquals(appLocalizedProto1,
+ completedResources.iterator().next());
+ LocalResourceTrackerState app2ts =
+ uResource.getAppTrackerStates().get(appId2);
+ assertNotNull(app2ts);
+ completedResources = loadCompletedResources(
+ app2ts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ app2ts.getStartedResourcesIterator());
+ assertTrue(startedResources.isEmpty());
+ assertEquals(1, completedResources.size());
+ assertEquals(appLocalizedProto2,
+ completedResources.iterator().next());
+ }
+
+ @Test
public void testStartResourceLocalization() throws IOException {
String user = "somebody";
ApplicationId appId = ApplicationId.newInstance(1, 1);
@@ -534,27 +663,44 @@ public class TestNMLeveldbStateStoreService {
stateStore.startResourceLocalization(user, appId, appRsrcProto,
appRsrcLocalPath);
+ List<LocalizedResourceProto> completedResources =
+ new ArrayList<LocalizedResourceProto>();
+ Map<LocalResourceProto, Path> startedResources =
+ new HashMap<LocalResourceProto, Path>();
+
// restart and verify only app resource is marked in-progress
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
- assertTrue(pubts.getLocalizedResources().isEmpty());
- assertTrue(pubts.getInProgressResources().isEmpty());
+ completedResources = loadCompletedResources(
+ pubts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ pubts.getStartedResourcesIterator());
+ assertTrue(completedResources.isEmpty());
+ assertTrue(startedResources.isEmpty());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
- assertTrue(privts.getLocalizedResources().isEmpty());
- assertTrue(privts.getInProgressResources().isEmpty());
+ completedResources = loadCompletedResources(
+ privts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ privts.getStartedResourcesIterator());
+ assertTrue(completedResources.isEmpty());
+ assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
- assertTrue(appts.getLocalizedResources().isEmpty());
- assertEquals(1, appts.getInProgressResources().size());
+ completedResources = loadCompletedResources(
+ appts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ appts.getStartedResourcesIterator());
+ assertTrue(completedResources.isEmpty());
+ assertEquals(1, startedResources.size());
assertEquals(appRsrcLocalPath,
- appts.getInProgressResources().get(appRsrcProto));
+ startedResources.get(appRsrcProto));
// start some public and private resources
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
@@ -589,28 +735,40 @@ public class TestNMLeveldbStateStoreService {
restartStateStore();
state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState();
- assertTrue(pubts.getLocalizedResources().isEmpty());
- assertEquals(2, pubts.getInProgressResources().size());
+ completedResources = loadCompletedResources(
+ pubts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ pubts.getStartedResourcesIterator());
+ assertTrue(completedResources.isEmpty());
+ assertEquals(2, startedResources.size());
assertEquals(pubRsrcLocalPath1,
- pubts.getInProgressResources().get(pubRsrcProto1));
+ startedResources.get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2,
- pubts.getInProgressResources().get(pubRsrcProto2));
+ startedResources.get(pubRsrcProto2));
userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
assertNotNull(privts);
- assertTrue(privts.getLocalizedResources().isEmpty());
- assertEquals(1, privts.getInProgressResources().size());
+ completedResources = loadCompletedResources(
+ privts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ privts.getStartedResourcesIterator());
+ assertTrue(completedResources.isEmpty());
+ assertEquals(1, startedResources.size());
assertEquals(privRsrcLocalPath,
- privts.getInProgressResources().get(privRsrcProto));
+ startedResources.get(privRsrcProto));
assertEquals(1, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
- assertTrue(appts.getLocalizedResources().isEmpty());
- assertEquals(1, appts.getInProgressResources().size());
+ completedResources = loadCompletedResources(
+ appts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ appts.getStartedResourcesIterator());
+ assertTrue(completedResources.isEmpty());
+ assertEquals(1, startedResources.size());
assertEquals(appRsrcLocalPath,
- appts.getInProgressResources().get(appRsrcProto));
+ startedResources.get(appRsrcProto));
}
@Test
@@ -637,27 +795,44 @@ public class TestNMLeveldbStateStoreService {
.build();
stateStore.finishResourceLocalization(user, appId, appLocalizedProto);
+ List<LocalizedResourceProto> completedResources =
+ new ArrayList<LocalizedResourceProto>();
+ Map<LocalResourceProto, Path> startedResources =
+ new HashMap<LocalResourceProto, Path>();
+
// restart and verify only app resource is completed
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
- assertTrue(pubts.getLocalizedResources().isEmpty());
- assertTrue(pubts.getInProgressResources().isEmpty());
+ completedResources = loadCompletedResources(
+ pubts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ pubts.getStartedResourcesIterator());
+ assertTrue(completedResources.isEmpty());
+ assertTrue(startedResources.isEmpty());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
assertNotNull(privts);
- assertTrue(privts.getLocalizedResources().isEmpty());
- assertTrue(privts.getInProgressResources().isEmpty());
+ completedResources = loadCompletedResources(
+ privts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ privts.getStartedResourcesIterator());
+ assertTrue(completedResources.isEmpty());
+ assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
- assertTrue(appts.getInProgressResources().isEmpty());
- assertEquals(1, appts.getLocalizedResources().size());
+ completedResources = loadCompletedResources(
+ appts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ appts.getStartedResourcesIterator());
+ assertTrue(startedResources.isEmpty());
+ assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto,
- appts.getLocalizedResources().iterator().next());
+ completedResources.iterator().next());
// start some public and private resources
Path pubRsrcPath1 = new Path("hdfs://some/public/resource1");
@@ -708,28 +883,40 @@ public class TestNMLeveldbStateStoreService {
restartStateStore();
state = stateStore.loadLocalizationState();
pubts = state.getPublicTrackerState();
- assertEquals(1, pubts.getLocalizedResources().size());
+ completedResources = loadCompletedResources(
+ pubts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ pubts.getStartedResourcesIterator());
+ assertEquals(1, completedResources.size());
assertEquals(pubLocalizedProto1,
- pubts.getLocalizedResources().iterator().next());
- assertEquals(1, pubts.getInProgressResources().size());
+ completedResources.iterator().next());
+ assertEquals(1, startedResources.size());
assertEquals(pubRsrcLocalPath2,
- pubts.getInProgressResources().get(pubRsrcProto2));
+ startedResources.get(pubRsrcProto2));
userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
assertNotNull(privts);
- assertEquals(1, privts.getLocalizedResources().size());
+ completedResources = loadCompletedResources(
+ privts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ privts.getStartedResourcesIterator());
+ assertEquals(1, completedResources.size());
assertEquals(privLocalizedProto,
- privts.getLocalizedResources().iterator().next());
- assertTrue(privts.getInProgressResources().isEmpty());
+ completedResources.iterator().next());
+ assertTrue(startedResources.isEmpty());
assertEquals(1, rur.getAppTrackerStates().size());
appts = rur.getAppTrackerStates().get(appId);
assertNotNull(appts);
- assertTrue(appts.getInProgressResources().isEmpty());
- assertEquals(1, appts.getLocalizedResources().size());
+ completedResources = loadCompletedResources(
+ appts.getCompletedResourcesIterator());
+ startedResources = loadStartedResources(
+ appts.getStartedResourcesIterator());
+ assertTrue(startedResources.isEmpty());
+ assertEquals(1, completedResources.size());
assertEquals(appLocalizedProto,
- appts.getLocalizedResources().iterator().next());
+ completedResources.iterator().next());
}
@Test
@@ -817,10 +1004,14 @@ public class TestNMLeveldbStateStoreService {
restartStateStore();
RecoveredLocalizationState state = stateStore.loadLocalizationState();
LocalResourceTrackerState pubts = state.getPublicTrackerState();
- assertTrue(pubts.getInProgressResources().isEmpty());
- assertEquals(1, pubts.getLocalizedResources().size());
+ List<LocalizedResourceProto> completedResources =
+ loadCompletedResources(pubts.getCompletedResourcesIterator());
+ Map<LocalResourceProto, Path> startedResources =
+ loadStartedResources(pubts.getStartedResourcesIterator());
+ assertTrue(startedResources.isEmpty());
+ assertEquals(1, completedResources.size());
assertEquals(pubLocalizedProto1,
- pubts.getLocalizedResources().iterator().next());
+ completedResources.iterator().next());
Map<String, RecoveredUserResources> userResources =
loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org