You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2018/08/20 15:19:43 UTC
hadoop git commit: YARN-8242. YARN NM: OOM error while reading back
the state store on recovery. Contributed by Pradeep Ambati and Kanwaljeet
Sachdev
Repository: hadoop
Updated Branches:
refs/heads/trunk 01ff81781 -> 65e746971
YARN-8242. YARN NM: OOM error while reading back the state store on recovery. Contributed by Pradeep Ambati and Kanwaljeet Sachdev
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/65e74697
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/65e74697
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/65e74697
Branch: refs/heads/trunk
Commit: 65e7469712be6cf393e29ef73cc94727eec81227
Parents: 01ff817
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Aug 20 10:14:40 2018 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Aug 20 10:14:40 2018 -0500
----------------------------------------------------------------------
.../server/nodemanager/DeletionService.java | 25 +-
.../containermanager/ContainerManagerImpl.java | 26 +-
.../localizer/ResourceLocalizationService.java | 56 +--
.../recovery/NMLeveldbStateStoreService.java | 412 ++++++++++++-------
.../recovery/NMNullStateStoreService.java | 2 +-
.../recovery/NMStateStoreService.java | 55 +--
.../nodemanager/recovery/RecoveryIterator.java | 41 ++
.../security/NMContainerTokenSecretManager.java | 27 +-
.../security/NMTokenSecretManagerInNM.java | 15 +-
.../recovery/NMMemoryStateStoreService.java | 82 +++-
.../TestNMLeveldbStateStoreService.java | 216 +++++++---
11 files changed, 647 insertions(+), 310 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
index ae81dc1..e665c5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
@@ -19,13 +19,14 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static java.util.concurrent.TimeUnit.SECONDS;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -96,16 +97,20 @@ public class DeletionService extends AbstractService {
private void recover(NMStateStoreService.RecoveredDeletionServiceState state)
throws IOException {
- List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
- new HashMap<>(taskProtos.size());
- Set<Integer> successorTasks = new HashSet<>();
- for (DeletionServiceDeleteTaskProto proto : taskProtos) {
- DeletionTaskRecoveryInfo info =
- NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
- idToInfoMap.put(info.getTask().getTaskId(), info);
- nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
- successorTasks.addAll(info.getSuccessorTaskIds());
+ new HashMap<Integer, DeletionTaskRecoveryInfo>();
+ Set<Integer> successorTasks = new HashSet<Integer>();
+
+ try (RecoveryIterator<DeletionServiceDeleteTaskProto> it =
+ state.getIterator()) {
+ while (it.hasNext()) {
+ DeletionServiceDeleteTaskProto proto = it.next();
+ DeletionTaskRecoveryInfo info =
+ NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this);
+ idToInfoMap.put(info.getTask().getTaskId(), info);
+ nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId()));
+ successorTasks.addAll(info.getSuccessorTaskIds());
+ }
}
// restore the task dependencies and schedule the deletion tasks that
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 8b35258..b89e2dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -23,6 +23,7 @@ import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -356,19 +357,26 @@ public class ContainerManagerImpl extends CompositeService implements
stateStore.loadLocalizationState());
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
- for (ContainerManagerApplicationProto proto :
- appsState.getApplications()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recovering application with state: " + proto.toString());
+ try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator =
+ appsState.getIterator()) {
+ while (rasIterator.hasNext()) {
+ ContainerManagerApplicationProto proto = rasIterator.next();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovering application with state: " + proto.toString());
+ }
+ recoverApplication(proto);
}
- recoverApplication(proto);
}
- for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Recovering container with state: " + rcs);
+ try (RecoveryIterator<RecoveredContainerState> rcsIterator =
+ stateStore.getContainerStateIterator()) {
+ while (rcsIterator.hasNext()) {
+ RecoveredContainerState rcs = rcsIterator.next();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovering container with state: " + rcs);
+ }
+ recoverContainer(rcs);
}
- recoverContainer(rcs);
}
// Recovery AMRMProxy state after apps and containers are recovered
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index 3834ece..2892d1f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -295,42 +297,46 @@ public class ResourceLocalizationService extends CompositeService
//Recover localized resources after an NM restart
public void recoverLocalizedResources(RecoveredLocalizationState state)
- throws URISyntaxException {
+ throws URISyntaxException, IOException {
LocalResourceTrackerState trackerState = state.getPublicTrackerState();
recoverTrackerResources(publicRsrc, trackerState);
- for (Map.Entry<String, RecoveredUserResources> userEntry :
- state.getUserResources().entrySet()) {
- String user = userEntry.getKey();
- RecoveredUserResources userResources = userEntry.getValue();
- trackerState = userResources.getPrivateTrackerState();
- if (!trackerState.isEmpty()) {
- LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
- null, dispatcher, true, super.getConfig(), stateStore, dirsHandler);
- LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
- tracker);
- if (oldTracker != null) {
- tracker = oldTracker;
- }
- recoverTrackerResources(tracker, trackerState);
- }
-
- for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
- userResources.getAppTrackerStates().entrySet()) {
- trackerState = appEntry.getValue();
+ try (RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it
+ = state.getIterator()) {
+ while (it.hasNext()) {
+ Map.Entry<String, RecoveredUserResources> userEntry = it.next();
+ String user = userEntry.getKey();
+ RecoveredUserResources userResources = userEntry.getValue();
+ trackerState = userResources.getPrivateTrackerState();
if (!trackerState.isEmpty()) {
- ApplicationId appId = appEntry.getKey();
- String appIdStr = appId.toString();
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
- appId, dispatcher, false, super.getConfig(), stateStore,
+ null, dispatcher, true, super.getConfig(), stateStore,
dirsHandler);
- LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+ LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
tracker);
if (oldTracker != null) {
tracker = oldTracker;
}
recoverTrackerResources(tracker, trackerState);
}
+
+ for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
+ userResources.getAppTrackerStates().entrySet()) {
+ trackerState = appEntry.getValue();
+ if (!trackerState.isEmpty()) {
+ ApplicationId appId = appEntry.getKey();
+ String appIdStr = appId.toString();
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, false, super.getConfig(), stateStore,
+ dirsHandler);
+ LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+ tracker);
+ if (oldTracker != null) {
+ tracker = oldTracker;
+ }
+ recoverTrackerResources(tracker, trackerState);
+ }
+ }
}
}
}
@@ -556,7 +562,7 @@ public class ResourceLocalizationService extends CompositeService
rsrcCleanup.getResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) {
- LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
+ LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerId().getApplicationAttemptId()
.getApplicationId());
for (LocalResourceRequest req : e.getValue()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 67f642d..5d4253d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -73,6 +74,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -225,68 +227,119 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
return isHealthy;
}
- @Override
- public List<RecoveredContainerState> loadContainersState()
+ // LeveldbIterator starting at startkey
+ private LeveldbIterator getLevelDBIterator(String startKey)
throws IOException {
- ArrayList<RecoveredContainerState> containers =
- new ArrayList<RecoveredContainerState>();
- ArrayList<ContainerId> containersToRemove =
- new ArrayList<ContainerId>();
- LeveldbIterator iter = null;
try {
- iter = new LeveldbIterator(db);
- iter.seek(bytes(CONTAINERS_KEY_PREFIX));
+ LeveldbIterator it = new LeveldbIterator(db);
+ it.seek(bytes(startKey));
+ return it;
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
- while (iter.hasNext()) {
- Entry<byte[], byte[]> entry = iter.peekNext();
+ // Base Recovery Iterator
+ private abstract class BaseRecoveryIterator<T> implements
+ RecoveryIterator<T> {
+ LeveldbIterator it;
+ T nextItem;
+
+ BaseRecoveryIterator(String dbKey) throws IOException {
+ this.it = getLevelDBIterator(dbKey);
+ this.nextItem = null;
+ }
+
+ protected abstract T getNextItem(LeveldbIterator it) throws IOException;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (nextItem == null) {
+ nextItem = getNextItem(it);
+ }
+ return (nextItem != null);
+ }
+
+ @Override
+ public T next() throws IOException, NoSuchElementException {
+ T tmp = nextItem;
+ if (tmp != null) {
+ nextItem = null;
+ return tmp;
+ } else {
+ tmp = getNextItem(it);
+ if (tmp == null) {
+ throw new NoSuchElementException();
+ }
+ return tmp;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (it != null) {
+ it.close();
+ }
+ }
+ }
+
+ // Container Recovery Iterator
+ private class ContainerStateIterator extends
+ BaseRecoveryIterator<RecoveredContainerState> {
+ ContainerStateIterator() throws IOException {
+ super(CONTAINERS_KEY_PREFIX);
+ }
+
+ @Override
+ protected RecoveredContainerState getNextItem(LeveldbIterator it)
+ throws IOException {
+ return getNextRecoveredContainer(it);
+ }
+ }
+
+ private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it)
+ throws IOException {
+ RecoveredContainerState rcs = null;
+ try {
+ while (it.hasNext()) {
+ Entry<byte[], byte[]> entry = it.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(CONTAINERS_KEY_PREFIX)) {
- break;
+ return null;
}
int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length());
if (idEndPos < 0) {
throw new IOException("Unable to determine container in key: " + key);
}
- ContainerId containerId = ContainerId.fromString(
- key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos));
- String keyPrefix = key.substring(0, idEndPos+1);
- RecoveredContainerState rcs = loadContainerState(containerId,
- iter, keyPrefix);
- // Don't load container without StartContainerRequest
+ String keyPrefix = key.substring(0, idEndPos + 1);
+ rcs = loadContainerState(it, keyPrefix);
if (rcs.startRequest != null) {
- containers.add(rcs);
+ break;
} else {
- containersToRemove.add(containerId);
+ removeContainer(rcs.getContainerId());
+ rcs = null;
}
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
+ return rcs;
+ }
- // remove container without StartContainerRequest
- for (ContainerId containerId : containersToRemove) {
- LOG.warn("Remove container " + containerId +
- " with incomplete records");
- try {
- removeContainer(containerId);
- // TODO: kill and cleanup the leaked container
- } catch (IOException e) {
- LOG.error("Unable to remove container " + containerId +
- " in store", e);
- }
- }
- return containers;
+ @Override
+ public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
+ throws IOException {
+ return new ContainerStateIterator();
}
- private RecoveredContainerState loadContainerState(ContainerId containerId,
- LeveldbIterator iter, String keyPrefix) throws IOException {
- RecoveredContainerState rcs = new RecoveredContainerState();
+ private RecoveredContainerState loadContainerState(LeveldbIterator iter,
+ String keyPrefix) throws IOException {
+ ContainerId containerId = ContainerId.fromString(
+ keyPrefix.substring(CONTAINERS_KEY_PREFIX.length(),
+ keyPrefix.length()-1));
+ RecoveredContainerState rcs = new RecoveredContainerState(containerId);
rcs.status = RecoveredContainerStatus.REQUESTED;
while (iter.hasNext()) {
Entry<byte[],byte[]> entry = iter.peekNext();
@@ -680,35 +733,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
- @Override
- public RecoveredApplicationsState loadApplicationsState()
- throws IOException {
- RecoveredApplicationsState state = new RecoveredApplicationsState();
- state.applications = new ArrayList<ContainerManagerApplicationProto>();
- String keyPrefix = APPLICATIONS_KEY_PREFIX;
- LeveldbIterator iter = null;
+ // Application Recovery Iterator
+ private class ApplicationStateIterator extends
+ BaseRecoveryIterator<ContainerManagerApplicationProto> {
+ ApplicationStateIterator() throws IOException {
+ super(APPLICATIONS_KEY_PREFIX);
+ }
+
+ @Override
+ protected ContainerManagerApplicationProto getNextItem(LeveldbIterator it)
+ throws IOException {
+ return getNextRecoveredApplication(it);
+ }
+ }
+
+ private ContainerManagerApplicationProto getNextRecoveredApplication(
+ LeveldbIterator it) throws IOException {
+ ContainerManagerApplicationProto applicationProto = null;
try {
- iter = new LeveldbIterator(db);
- iter.seek(bytes(keyPrefix));
- while (iter.hasNext()) {
- Entry<byte[], byte[]> entry = iter.next();
+ if (it.hasNext()) {
+ Entry<byte[], byte[]> entry = it.next();
String key = asString(entry.getKey());
- if (!key.startsWith(keyPrefix)) {
- break;
+ if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) {
+ return null;
}
- state.applications.add(
- ContainerManagerApplicationProto.parseFrom(entry.getValue()));
+ applicationProto = ContainerManagerApplicationProto.parseFrom(
+ entry.getValue());
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
+ return applicationProto;
+ }
+ @Override
+ public RecoveredApplicationsState loadApplicationsState()
+ throws IOException {
+ RecoveredApplicationsState state = new RecoveredApplicationsState();
+ state.it = new ApplicationStateIterator();
cleanupDeprecatedFinishedApps();
-
return state;
}
@@ -752,24 +815,29 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
- @Override
- public RecoveredLocalizationState loadLocalizationState()
- throws IOException {
- RecoveredLocalizationState state = new RecoveredLocalizationState();
+ // User Resource Recovery Iterator.
+ private class UserResourcesIterator extends
+ BaseRecoveryIterator<Entry<String, RecoveredUserResources>> {
+ UserResourcesIterator() throws IOException {
+ super(LOCALIZATION_PRIVATE_KEY_PREFIX);
+ }
- LeveldbIterator iter = null;
- try {
- iter = new LeveldbIterator(db);
- iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX));
- state.publicTrackerState = loadResourceTrackerState(iter,
- LOCALIZATION_PUBLIC_KEY_PREFIX);
+ @Override
+ protected Entry<String, RecoveredUserResources> getNextItem(
+ LeveldbIterator it) throws IOException {
+ return getNextRecoveredPrivateLocalizationEntry(it);
+ }
+ }
- iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX));
- while (iter.hasNext()) {
- Entry<byte[],byte[]> entry = iter.peekNext();
+ private Entry<String, RecoveredUserResources> getNextRecoveredPrivateLocalizationEntry(
+ LeveldbIterator it) throws IOException {
+ Entry<String, RecoveredUserResources> localEntry = null;
+ try {
+ if (it.hasNext()) {
+ Entry<byte[], byte[]> entry = it.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) {
- break;
+ return null;
}
int userEndPos = key.indexOf('/',
@@ -780,17 +848,24 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
String user = key.substring(
LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos);
- state.userResources.put(user, loadUserLocalizedResources(iter,
- key.substring(0, userEndPos+1)));
+ RecoveredUserResources val = loadUserLocalizedResources(it,
+ key.substring(0, userEndPos+1));
+ localEntry = new AbstractMap.SimpleEntry<>(user, val);
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
+ return localEntry;
+ }
+ @Override
+ public RecoveredLocalizationState loadLocalizationState()
+ throws IOException {
+ RecoveredLocalizationState state = new RecoveredLocalizationState();
+ LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX);
+ state.publicTrackerState = loadResourceTrackerState(it,
+ LOCALIZATION_PUBLIC_KEY_PREFIX);
+ state.it = new UserResourcesIterator();
return state;
}
@@ -800,7 +875,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX;
LocalResourceTrackerState state = new LocalResourceTrackerState();
while (iter.hasNext()) {
- Entry<byte[],byte[]> entry = iter.peekNext();
+ Entry<byte[], byte[]> entry = iter.peekNext();
String key = asString(entry.getKey());
if (!key.startsWith(keyPrefix)) {
break;
@@ -981,32 +1056,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
+ LOCALIZATION_APPCACHE_SUFFIX + appId + "/";
}
+ // Deletion State Recovery Iterator.
+ private class DeletionStateIterator extends
+ BaseRecoveryIterator<DeletionServiceDeleteTaskProto> {
+ DeletionStateIterator() throws IOException {
+ super(DELETION_TASK_KEY_PREFIX);
+ }
- @Override
- public RecoveredDeletionServiceState loadDeletionServiceState()
- throws IOException {
- RecoveredDeletionServiceState state = new RecoveredDeletionServiceState();
- state.tasks = new ArrayList<DeletionServiceDeleteTaskProto>();
- LeveldbIterator iter = null;
+ @Override
+ protected DeletionServiceDeleteTaskProto getNextItem(LeveldbIterator it)
+ throws IOException {
+ return getNextRecoveredDeletionService(it);
+ }
+ }
+
+ private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService(
+ LeveldbIterator it) throws IOException {
+ DeletionServiceDeleteTaskProto deleteProto = null;
try {
- iter = new LeveldbIterator(db);
- iter.seek(bytes(DELETION_TASK_KEY_PREFIX));
- while (iter.hasNext()) {
- Entry<byte[], byte[]> entry = iter.next();
+ if (it.hasNext()) {
+ Entry<byte[], byte[]> entry = it.next();
String key = asString(entry.getKey());
if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) {
- break;
+ return null;
}
- state.tasks.add(
- DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
+ deleteProto = DeletionServiceDeleteTaskProto.parseFrom(
+ entry.getValue());
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
+ return deleteProto;
+ }
+
+ @Override
+ public RecoveredDeletionServiceState loadDeletionServiceState()
+ throws IOException {
+ RecoveredDeletionServiceState state = new RecoveredDeletionServiceState();
+ state.it = new DeletionStateIterator();
return state;
}
@@ -1033,29 +1120,44 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
+ private MasterKey getMasterKey(String dbKey) throws IOException {
+ try{
+ byte[] data = db.get(bytes(dbKey));
+ if (data == null || data.length == 0) {
+ return null;
+ }
+ return parseMasterKey(data);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
- @Override
- public RecoveredNMTokensState loadNMTokensState() throws IOException {
- RecoveredNMTokensState state = new RecoveredNMTokensState();
- state.applicationMasterKeys =
- new HashMap<ApplicationAttemptId, MasterKey>();
- LeveldbIterator iter = null;
+ // Recover NMTokens Iterator
+ private class NMTokensStateIterator extends
+ BaseRecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> {
+ NMTokensStateIterator() throws IOException {
+ super(NM_TOKENS_KEY_PREFIX);
+ }
+
+ @Override
+ protected Entry<ApplicationAttemptId, MasterKey> getNextItem(
+ LeveldbIterator it) throws IOException {
+ return getNextMasterKeyEntry(it);
+ }
+ }
+
+ private Entry<ApplicationAttemptId, MasterKey> getNextMasterKeyEntry(
+ LeveldbIterator it) throws IOException {
+ Entry<ApplicationAttemptId, MasterKey> masterKeyentry = null;
try {
- iter = new LeveldbIterator(db);
- iter.seek(bytes(NM_TOKENS_KEY_PREFIX));
- while (iter.hasNext()) {
- Entry<byte[], byte[]> entry = iter.next();
+ while (it.hasNext()) {
+ Entry<byte[], byte[]> entry = it.next();
String fullKey = asString(entry.getKey());
if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) {
break;
}
String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length());
- if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
- state.currentMasterKey = parseMasterKey(entry.getValue());
- } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
- state.previousMasterKey = parseMasterKey(entry.getValue());
- } else if (key.startsWith(
- ApplicationAttemptId.appAttemptIdStrPrefix)) {
+ if (key.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
ApplicationAttemptId attempt;
try {
attempt = ApplicationAttemptId.fromString(key);
@@ -1063,17 +1165,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
throw new IOException("Bad application master key state for "
+ fullKey, e);
}
- state.applicationMasterKeys.put(attempt,
+ masterKeyentry = new AbstractMap.SimpleEntry<>(attempt,
parseMasterKey(entry.getValue()));
+ break;
}
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
+ return masterKeyentry;
+ }
+
+ @Override
+ public RecoveredNMTokensState loadNMTokensState() throws IOException {
+ RecoveredNMTokensState state = new RecoveredNMTokensState();
+ state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX
+ + CURRENT_MASTER_KEY_SUFFIX);
+ state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX
+ + PREV_MASTER_KEY_SUFFIX);
+ state.it = new NMTokensStateIterator();
return state;
}
@@ -1122,45 +1232,45 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
}
}
+ // Recover ContainersToken Iterator.
+ private class ContainerTokensStateIterator extends
+ BaseRecoveryIterator<Entry<ContainerId, Long>> {
+ ContainerTokensStateIterator() throws IOException {
+ super(CONTAINER_TOKENS_KEY_PREFIX);
+ }
- @Override
- public RecoveredContainerTokensState loadContainerTokensState()
+ @Override
+ protected Entry<ContainerId, Long> getNextItem(LeveldbIterator it)
+ throws IOException {
+ return getNextContainerToken(it);
+ }
+ }
+
+ private Entry<ContainerId, Long> getNextContainerToken(LeveldbIterator it)
throws IOException {
- RecoveredContainerTokensState state = new RecoveredContainerTokensState();
- state.activeTokens = new HashMap<ContainerId, Long>();
- LeveldbIterator iter = null;
+ Entry<ContainerId, Long> containerTokenEntry = null;
try {
- iter = new LeveldbIterator(db);
- iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX));
- final int containerTokensKeyPrefixLength =
- CONTAINER_TOKENS_KEY_PREFIX.length();
- while (iter.hasNext()) {
- Entry<byte[], byte[]> entry = iter.next();
+ while (it.hasNext()) {
+ Entry<byte[], byte[]> entry = it.next();
String fullKey = asString(entry.getKey());
if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) {
break;
}
- String key = fullKey.substring(containerTokensKeyPrefixLength);
- if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
- state.currentMasterKey = parseMasterKey(entry.getValue());
- } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
- state.previousMasterKey = parseMasterKey(entry.getValue());
- } else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
- loadContainerToken(state, fullKey, key, entry.getValue());
+ String key = fullKey.substring(CONTAINER_TOKENS_KEY_PREFIX.length());
+ if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+ containerTokenEntry = loadContainerToken(fullKey, key,
+ entry.getValue());
+ break;
}
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
- return state;
+ return containerTokenEntry;
}
- private static void loadContainerToken(RecoveredContainerTokensState state,
- String key, String containerIdStr, byte[] value) throws IOException {
+ private static Entry<ContainerId, Long> loadContainerToken(String key,
+ String containerIdStr, byte[] value) throws IOException {
ContainerId containerId;
Long expTime;
try {
@@ -1169,7 +1279,19 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} catch (IllegalArgumentException e) {
throw new IOException("Bad container token state for " + key, e);
}
- state.activeTokens.put(containerId, expTime);
+ return new AbstractMap.SimpleEntry<>(containerId, expTime);
+ }
+
+ @Override
+ public RecoveredContainerTokensState loadContainerTokensState()
+ throws IOException {
+ RecoveredContainerTokensState state = new RecoveredContainerTokensState();
+ state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX
+ + CURRENT_MASTER_KEY_SUFFIX);
+ state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX
+ + PREV_MASTER_KEY_SUFFIX);
+ state.it = new ContainerTokensStateIterator();
+ return state;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index dfad9cf..3ae00f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -65,7 +65,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
}
@Override
- public List<RecoveredContainerState> loadContainersState()
+ public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
throws IOException {
throw new UnsupportedOperationException(
"Recovery not supported by this state store");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 70decdb..35caec9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -67,12 +68,11 @@ public abstract class NMStateStoreService extends AbstractService {
}
public static class RecoveredApplicationsState {
- List<ContainerManagerApplicationProto> applications;
+ RecoveryIterator<ContainerManagerApplicationProto> it = null;
- public List<ContainerManagerApplicationProto> getApplications() {
- return applications;
+ public RecoveryIterator<ContainerManagerApplicationProto> getIterator() {
+ return it;
}
-
}
/**
@@ -106,6 +106,15 @@ public abstract class NMStateStoreService extends AbstractService {
RecoveredContainerType.RECOVER;
private long startTime;
private ResourceMappings resMappings = new ResourceMappings();
+ private final ContainerId containerId;
+
+ RecoveredContainerState(ContainerId containerId){
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
public RecoveredContainerStatus getStatus() {
return status;
@@ -248,30 +257,33 @@ public abstract class NMStateStoreService extends AbstractService {
public static class RecoveredLocalizationState {
LocalResourceTrackerState publicTrackerState =
new LocalResourceTrackerState();
- Map<String, RecoveredUserResources> userResources =
- new HashMap<String, RecoveredUserResources>();
+ RecoveryIterator<Entry<String, RecoveredUserResources>> it = null;
public LocalResourceTrackerState getPublicTrackerState() {
return publicTrackerState;
}
- public Map<String, RecoveredUserResources> getUserResources() {
- return userResources;
+ public RecoveryIterator<Entry<String, RecoveredUserResources>> getIterator() {
+ return it;
}
}
public static class RecoveredDeletionServiceState {
- List<DeletionServiceDeleteTaskProto> tasks;
+ RecoveryIterator<DeletionServiceDeleteTaskProto> it = null;
- public List<DeletionServiceDeleteTaskProto> getTasks() {
- return tasks;
+ public RecoveryIterator<DeletionServiceDeleteTaskProto> getIterator(){
+ return it;
}
}
public static class RecoveredNMTokensState {
MasterKey currentMasterKey;
MasterKey previousMasterKey;
- Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
+ RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> it = null;
+
+ public RecoveryIterator<Entry<ApplicationAttemptId, MasterKey>> getIterator() {
+ return it;
+ }
public MasterKey getCurrentMasterKey() {
return currentMasterKey;
@@ -281,15 +293,16 @@ public abstract class NMStateStoreService extends AbstractService {
return previousMasterKey;
}
- public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() {
- return applicationMasterKeys;
- }
}
public static class RecoveredContainerTokensState {
MasterKey currentMasterKey;
MasterKey previousMasterKey;
- Map<ContainerId, Long> activeTokens;
+ RecoveryIterator<Entry<ContainerId, Long>> it = null;
+
+ public RecoveryIterator<Entry<ContainerId, Long>> getIterator() {
+ return it;
+ }
public MasterKey getCurrentMasterKey() {
return currentMasterKey;
@@ -299,9 +312,6 @@ public abstract class NMStateStoreService extends AbstractService {
return previousMasterKey;
}
- public Map<ContainerId, Long> getActiveTokens() {
- return activeTokens;
- }
}
public static class RecoveredLogDeleterState {
@@ -400,11 +410,10 @@ public abstract class NMStateStoreService extends AbstractService {
/**
- * Load the state of containers
- * @return recovered state for containers
- * @throws IOException
+ * get the Recovered Container State Iterator
+ * @return recovery iterator
*/
- public abstract List<RecoveredContainerState> loadContainersState()
+ public abstract RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java
new file mode 100644
index 0000000..0bb262a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.recovery;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * A wrapper for a Iterator to translate the raw RuntimeExceptions that
+ * can be thrown into IOException.
+ */
+public interface RecoveryIterator<T> extends Closeable {
+
+ /**
+ * Returns true if the iteration has more elements.
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Returns the next element in the iteration.
+ */
+ T next() throws IOException, NoSuchElementException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
index 256f649..b3df69b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,17 +92,20 @@ public class NMContainerTokenSecretManager extends
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
}
- for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
- ContainerId containerId = entry.getKey();
- Long expTime = entry.getValue();
- List<ContainerId> containerList =
- recentlyStartedContainerTracker.get(expTime);
- if (containerList == null) {
- containerList = new ArrayList<ContainerId>();
- recentlyStartedContainerTracker.put(expTime, containerList);
- }
- if (!containerList.contains(containerId)) {
- containerList.add(containerId);
+ try (RecoveryIterator<Entry<ContainerId, Long>> it = state.getIterator()) {
+ while (it.hasNext()) {
+ Entry<ContainerId, Long> entry = it.next();
+ ContainerId containerId = entry.getKey();
+ Long expTime = entry.getValue();
+ List<ContainerId> containerList =
+ recentlyStartedContainerTracker.get(expTime);
+ if (containerList == null) {
+ containerList = new ArrayList<ContainerId>();
+ recentlyStartedContainerTracker.put(expTime, containerList);
+ }
+ if (!containerList.contains(containerId)) {
+ containerList.add(containerId);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
index 0956e77..f895791 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,11 +89,14 @@ public class NMTokenSecretManagerInNM extends BaseNMTokenSecretManager {
super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
}
- for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
- state.getApplicationMasterKeys().entrySet()) {
- key = entry.getValue();
- oldMasterKeys.put(entry.getKey(),
- new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+ try (RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it =
+ state.getIterator()) {
+ while (it.hasNext()) {
+ Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
+ key = entry.getValue();
+ oldMasterKeys.put(entry.getKey(),
+ new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+ }
}
// reconstruct app to app attempts map
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index c5428d1..9658ecd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -56,6 +57,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
private RecoveredNMTokensState nmTokenState;
private RecoveredContainerTokensState containerTokenState;
+ private Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
+ private Map<ContainerId, Long> activeTokens;
private Map<ApplicationId, LogDeleterProto> logDeleterState;
private RecoveredAMRMProxyState amrmProxyState;
@@ -68,10 +71,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
containerStates = new HashMap<ContainerId, RecoveredContainerState>();
nmTokenState = new RecoveredNMTokensState();
- nmTokenState.applicationMasterKeys =
- new HashMap<ApplicationAttemptId, MasterKey>();
+ applicationMasterKeys = new HashMap<ApplicationAttemptId, MasterKey>();
containerTokenState = new RecoveredContainerTokensState();
- containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
+ activeTokens = new HashMap<ContainerId, Long>();
trackerStates = new HashMap<TrackerKey, TrackerState>();
deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
@@ -86,13 +88,39 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
protected void closeStorage() {
}
+ // Recovery Iterator Implementation.
+ private class NMMemoryRecoveryIterator<T> implements RecoveryIterator<T> {
+
+ private Iterator<T> it;
+
+ NMMemoryRecoveryIterator(Iterator<T> it){
+ this.it = it;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override
+ public T next() throws IOException {
+ return it.next();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
@Override
public synchronized RecoveredApplicationsState loadApplicationsState()
throws IOException {
RecoveredApplicationsState state = new RecoveredApplicationsState();
- state.applications = new ArrayList<ContainerManagerApplicationProto>(
- apps.values());
+ List<ContainerManagerApplicationProto> containerList =
+ new ArrayList<ContainerManagerApplicationProto>(apps.values());
+ state.it = new NMMemoryRecoveryIterator<ContainerManagerApplicationProto>(
+ containerList.iterator());
return state;
}
@@ -111,13 +139,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
@Override
- public synchronized List<RecoveredContainerState> loadContainersState()
+ public RecoveryIterator<RecoveredContainerState> getContainerStateIterator()
throws IOException {
// return a copy so caller can't modify our state
List<RecoveredContainerState> result =
new ArrayList<RecoveredContainerState>(containerStates.size());
for (RecoveredContainerState rcs : containerStates.values()) {
- RecoveredContainerState rcsCopy = new RecoveredContainerState();
+ RecoveredContainerState rcsCopy = new RecoveredContainerState(rcs.getContainerId());
rcsCopy.status = rcs.status;
rcsCopy.exitCode = rcs.exitCode;
rcsCopy.killed = rcs.killed;
@@ -131,13 +159,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
rcsCopy.setResourceMappings(rcs.getResourceMappings());
result.add(rcsCopy);
}
- return result;
+ return new NMMemoryRecoveryIterator<RecoveredContainerState>(
+ result.iterator());
}
@Override
public synchronized void storeContainer(ContainerId containerId,
int version, long startTime, StartContainerRequest startRequest) {
- RecoveredContainerState rcs = new RecoveredContainerState();
+ RecoveredContainerState rcs = new RecoveredContainerState(containerId);
rcs.startRequest = startRequest;
rcs.version = version;
try {
@@ -284,6 +313,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
public synchronized RecoveredLocalizationState loadLocalizationState() {
RecoveredLocalizationState result = new RecoveredLocalizationState();
+ Map<String, RecoveredUserResources> userResources =
+ new HashMap<String, RecoveredUserResources>();
for (Map.Entry<TrackerKey, TrackerState> e : trackerStates.entrySet()) {
TrackerKey tk = e.getKey();
TrackerState ts = e.getValue();
@@ -294,10 +325,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
if (tk.user == null) {
result.publicTrackerState = loadTrackerState(ts);
} else {
- RecoveredUserResources rur = result.userResources.get(tk.user);
+ RecoveredUserResources rur = userResources.get(tk.user);
if (rur == null) {
rur = new RecoveredUserResources();
- result.userResources.put(tk.user, rur);
+ userResources.put(tk.user, rur);
}
if (tk.appId == null) {
rur.privateTrackerState = loadTrackerState(ts);
@@ -306,6 +337,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
}
}
}
+ result.it = new NMMemoryRecoveryIterator<Map.Entry<String, RecoveredUserResources>>(
+ userResources.entrySet().iterator());
return result;
}
@@ -341,8 +374,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
throws IOException {
RecoveredDeletionServiceState result =
new RecoveredDeletionServiceState();
- result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>(
- deleteTasks.values());
+ List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
+ new ArrayList<DeletionServiceDeleteTaskProto>(deleteTasks.values());
+ result.it = new NMMemoryRecoveryIterator<DeletionServiceDeleteTaskProto>(
+ deleteTaskProtos.iterator());
return result;
}
@@ -365,9 +400,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
RecoveredNMTokensState result = new RecoveredNMTokensState();
result.currentMasterKey = nmTokenState.currentMasterKey;
result.previousMasterKey = nmTokenState.previousMasterKey;
- result.applicationMasterKeys =
- new HashMap<ApplicationAttemptId, MasterKey>(
- nmTokenState.applicationMasterKeys);
+ Map<ApplicationAttemptId, MasterKey> masterKeysMap =
+ new HashMap<ApplicationAttemptId, MasterKey>(applicationMasterKeys);
+ result.it = new NMMemoryRecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>>(
+ masterKeysMap.entrySet().iterator());
return result;
}
@@ -389,14 +425,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
public synchronized void storeNMTokenApplicationMasterKey(
ApplicationAttemptId attempt, MasterKey key) throws IOException {
MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
- nmTokenState.applicationMasterKeys.put(attempt,
+ applicationMasterKeys.put(attempt,
new MasterKeyPBImpl(keypb.getProto()));
}
@Override
public synchronized void removeNMTokenApplicationMasterKey(
ApplicationAttemptId attempt) throws IOException {
- nmTokenState.applicationMasterKeys.remove(attempt);
+ applicationMasterKeys.remove(attempt);
}
@@ -408,8 +444,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
new RecoveredContainerTokensState();
result.currentMasterKey = containerTokenState.currentMasterKey;
result.previousMasterKey = containerTokenState.previousMasterKey;
- result.activeTokens =
- new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
+ Map<ContainerId, Long> containersTokenMap =
+ new HashMap<ContainerId, Long>(activeTokens);
+ result.it = new NMMemoryRecoveryIterator<Map.Entry<ContainerId, Long>>(
+ containersTokenMap.entrySet().iterator());
return result;
}
@@ -432,13 +470,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
@Override
public synchronized void storeContainerToken(ContainerId containerId,
Long expirationTime) throws IOException {
- containerTokenState.activeTokens.put(containerId, expirationTime);
+ activeTokens.put(containerId, expirationTime);
}
@Override
public synchronized void removeContainerToken(ContainerId containerId)
throws IOException {
- containerTokenState.activeTokens.remove(containerId);
+ activeTokens.remove(containerId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/65e74697/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 8a8cfa2..fcbbc52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -125,6 +125,73 @@ public class TestNMLeveldbStateStoreService {
FileUtil.fullyDelete(TMP_DIR);
}
+ private List<RecoveredContainerState> loadContainersState(
+ RecoveryIterator<RecoveredContainerState> it) throws IOException {
+ List<RecoveredContainerState> containers =
+ new ArrayList<RecoveredContainerState>();
+ while (it.hasNext()) {
+ RecoveredContainerState rcs = it.next();
+ containers.add(rcs);
+ }
+ return containers;
+ }
+
+ private List<ContainerManagerApplicationProto> loadApplicationProtos(
+ RecoveryIterator<ContainerManagerApplicationProto> it)
+ throws IOException {
+ List<ContainerManagerApplicationProto> applicationProtos =
+ new ArrayList<ContainerManagerApplicationProto>();
+ while (it.hasNext()) {
+ applicationProtos.add(it.next());
+ }
+ return applicationProtos;
+ }
+
+ private List<DeletionServiceDeleteTaskProto> loadDeletionTaskProtos(
+ RecoveryIterator<DeletionServiceDeleteTaskProto> it) throws IOException {
+ List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
+ new ArrayList<DeletionServiceDeleteTaskProto>();
+ while (it.hasNext()) {
+ deleteTaskProtos.add(it.next());
+ }
+ return deleteTaskProtos;
+ }
+
+ private Map<String, RecoveredUserResources> loadUserResources(
+ RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it)
+ throws IOException {
+ Map<String, RecoveredUserResources> userResources =
+ new HashMap<String, RecoveredUserResources>();
+ while (it.hasNext()) {
+ Map.Entry<String, RecoveredUserResources> entry = it.next();
+ userResources.put(entry.getKey(), entry.getValue());
+ }
+ return userResources;
+ }
+
+ private Map<ApplicationAttemptId, MasterKey> loadNMTokens(
+ RecoveryIterator<Map.Entry<ApplicationAttemptId, MasterKey>> it)
+ throws IOException {
+ Map<ApplicationAttemptId, MasterKey> nmTokens =
+ new HashMap<ApplicationAttemptId, MasterKey>();
+ while (it.hasNext()) {
+ Map.Entry<ApplicationAttemptId, MasterKey> entry = it.next();
+ nmTokens.put(entry.getKey(), entry.getValue());
+ }
+ return nmTokens;
+ }
+
+ private Map<ContainerId, Long> loadContainerTokens(
+ RecoveryIterator<Map.Entry<ContainerId, Long>> it) throws IOException {
+ Map<ContainerId, Long> containerTokens =
+ new HashMap<ContainerId, Long>();
+ while (it.hasNext()) {
+ Map.Entry<ContainerId, Long> entry = it.next();
+ containerTokens.put(entry.getKey(), entry.getValue());
+ }
+ return containerTokens;
+ }
+
private void restartStateStore() throws IOException {
// need to close so leveldb releases database lock
if (stateStore != null) {
@@ -142,7 +209,7 @@ public class TestNMLeveldbStateStoreService {
assertNotNull(pubts);
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
- assertTrue(state.getUserResources().isEmpty());
+ assertTrue(loadUserResources(state.getIterator()).isEmpty());
}
@Test
@@ -183,7 +250,7 @@ public class TestNMLeveldbStateStoreService {
restartStateStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
- Assert.assertTrue("Exception message mismatch",
+ Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for NM state:"));
}
}
@@ -192,7 +259,9 @@ public class TestNMLeveldbStateStoreService {
public void testApplicationStorage() throws IOException {
// test empty when no state
RecoveredApplicationsState state = stateStore.loadApplicationsState();
- assertTrue(state.getApplications().isEmpty());
+ List<ContainerManagerApplicationProto> apps =
+ loadApplicationProtos(state.getIterator());
+ assertTrue(apps.isEmpty());
// store an application and verify recovered
final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
@@ -204,8 +273,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeApplication(appId1, appProto1);
restartStateStore();
state = stateStore.loadApplicationsState();
- assertEquals(1, state.getApplications().size());
- assertEquals(appProto1, state.getApplications().get(0));
+ apps = loadApplicationProtos(state.getIterator());
+ assertEquals(1, apps.size());
+ assertEquals(appProto1, apps.get(0));
// add a new app
final ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
@@ -216,23 +286,25 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeApplication(appId2, appProto2);
restartStateStore();
state = stateStore.loadApplicationsState();
- assertEquals(2, state.getApplications().size());
- assertTrue(state.getApplications().contains(appProto1));
- assertTrue(state.getApplications().contains(appProto2));
+ apps = loadApplicationProtos(state.getIterator());
+ assertEquals(2, apps.size());
+ assertTrue(apps.contains(appProto1));
+ assertTrue(apps.contains(appProto2));
// test removing an application
stateStore.removeApplication(appId2);
restartStateStore();
state = stateStore.loadApplicationsState();
- assertEquals(1, state.getApplications().size());
- assertEquals(appProto1, state.getApplications().get(0));
+ apps = loadApplicationProtos(state.getIterator());
+ assertEquals(1, apps.size());
+ assertEquals(appProto1, apps.get(0));
}
@Test
public void testContainerStorage() throws IOException {
// test empty when no state
List<RecoveredContainerState> recoveredContainers =
- stateStore.loadContainersState();
+ loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// create a container request
@@ -254,7 +326,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.getContainerVersionKey(containerId.toString()))));
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
@@ -269,14 +342,16 @@ public class TestNMLeveldbStateStoreService {
// store a new container record without StartContainerRequest
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6);
stateStore.storeContainerLaunched(containerId1);
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
// check whether the new container record is discarded
assertEquals(1, recoveredContainers.size());
// queue the container, and verify recovered
stateStore.storeContainerQueued(containerId);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
@@ -292,7 +367,8 @@ public class TestNMLeveldbStateStoreService {
diags.append("some diags for container");
stateStore.storeContainerDiagnostics(containerId, diags);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
@@ -305,7 +381,8 @@ public class TestNMLeveldbStateStoreService {
// pause the container, and verify recovered
stateStore.storeContainerPaused(containerId);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
@@ -316,7 +393,8 @@ public class TestNMLeveldbStateStoreService {
// Resume the container
stateStore.removeContainerPaused(containerId);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
// increase the container size, and verify recovered
@@ -328,7 +406,8 @@ public class TestNMLeveldbStateStoreService {
stateStore
.storeContainerUpdateToken(containerId, updateTokenIdentifier);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(0, rcs.getVersion());
@@ -342,7 +421,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerKilled(containerId);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
@@ -358,7 +438,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerDiagnostics(containerId, diags);
stateStore.storeContainerCompleted(containerId, 21);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
@@ -371,7 +452,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(6, rcs.getRemainingRetryAttempts());
@@ -382,12 +464,13 @@ public class TestNMLeveldbStateStoreService {
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
// recover again to check remove clears all containers
restartStateStore();
NMStateStoreService nmStoreSpy = spy(stateStore);
- nmStoreSpy.loadContainersState();
+ loadContainersState(nmStoreSpy.getContainerStateIterator());
verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class));
}
@@ -399,7 +482,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerRestartTimes(containerId,
finishTimeForRetryAttempts);
restartStateStore();
- RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
+ RecoveredContainerState rcs =
+ loadContainersState(stateStore.getContainerStateIterator()).get(0);
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
@@ -481,7 +565,7 @@ public class TestNMLeveldbStateStoreService {
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
Map<String, RecoveredUserResources> userResources =
- state.getUserResources();
+ loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
@@ -535,7 +619,7 @@ public class TestNMLeveldbStateStoreService {
pubts.getInProgressResources().get(pubRsrcProto1));
assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2));
- userResources = state.getUserResources();
+ userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
@@ -584,7 +668,7 @@ public class TestNMLeveldbStateStoreService {
assertTrue(pubts.getLocalizedResources().isEmpty());
assertTrue(pubts.getInProgressResources().isEmpty());
Map<String, RecoveredUserResources> userResources =
- state.getUserResources();
+ loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
RecoveredUserResources rur = userResources.get(user);
LocalResourceTrackerState privts = rur.getPrivateTrackerState();
@@ -654,7 +738,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(1, pubts.getInProgressResources().size());
assertEquals(pubRsrcLocalPath2,
pubts.getInProgressResources().get(pubRsrcProto2));
- userResources = state.getUserResources();
+ userResources = loadUserResources(state.getIterator());
assertEquals(1, userResources.size());
rur = userResources.get(user);
privts = rur.getPrivateTrackerState();
@@ -762,7 +846,7 @@ public class TestNMLeveldbStateStoreService {
assertEquals(pubLocalizedProto1,
pubts.getLocalizedResources().iterator().next());
Map<String, RecoveredUserResources> userResources =
- state.getUserResources();
+ loadUserResources(state.getIterator());
assertTrue(userResources.isEmpty());
}
@@ -771,7 +855,9 @@ public class TestNMLeveldbStateStoreService {
// test empty when no state
RecoveredDeletionServiceState state =
stateStore.loadDeletionServiceState();
- assertTrue(state.getTasks().isEmpty());
+ List<DeletionServiceDeleteTaskProto> deleteTaskProtos =
+ loadDeletionTaskProtos(state.getIterator());
+ assertTrue(deleteTaskProtos.isEmpty());
// store a deletion task and verify recovered
DeletionServiceDeleteTaskProto proto =
@@ -788,8 +874,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeDeletionTask(proto.getId(), proto);
restartStateStore();
state = stateStore.loadDeletionServiceState();
- assertEquals(1, state.getTasks().size());
- assertEquals(proto, state.getTasks().get(0));
+ deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
+ assertEquals(1, deleteTaskProtos.size());
+ assertEquals(proto, deleteTaskProtos.get(0));
// store another deletion task
DeletionServiceDeleteTaskProto proto2 =
@@ -802,31 +889,36 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeDeletionTask(proto2.getId(), proto2);
restartStateStore();
state = stateStore.loadDeletionServiceState();
- assertEquals(2, state.getTasks().size());
- assertTrue(state.getTasks().contains(proto));
- assertTrue(state.getTasks().contains(proto2));
+ deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
+ assertEquals(2, deleteTaskProtos.size());
+ assertTrue(deleteTaskProtos.contains(proto));
+ assertTrue(deleteTaskProtos.contains(proto2));
+
// delete a task and verify gone after recovery
stateStore.removeDeletionTask(proto2.getId());
restartStateStore();
- state = stateStore.loadDeletionServiceState();
- assertEquals(1, state.getTasks().size());
- assertEquals(proto, state.getTasks().get(0));
+ state = stateStore.loadDeletionServiceState();
+ deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
+ assertEquals(1, deleteTaskProtos.size());
+ assertEquals(proto, deleteTaskProtos.get(0));
// delete the last task and verify none left
stateStore.removeDeletionTask(proto.getId());
restartStateStore();
state = stateStore.loadDeletionServiceState();
- assertTrue(state.getTasks().isEmpty());
- }
+ deleteTaskProtos = loadDeletionTaskProtos(state.getIterator());
+ assertTrue(deleteTaskProtos.isEmpty()); }
@Test
public void testNMTokenStorage() throws IOException {
// test empty when no state
RecoveredNMTokensState state = stateStore.loadNMTokensState();
+ Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
+ loadNMTokens(state.getIterator());
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
- assertTrue(state.getApplicationMasterKeys().isEmpty());
+ assertTrue(loadedAppKeys.isEmpty());
// store a master key and verify recovered
NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
@@ -834,18 +926,20 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokensState();
+ loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
- assertTrue(state.getApplicationMasterKeys().isEmpty());
+ assertTrue(loadedAppKeys.isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = secretMgr.generateKey();
stateStore.storeNMTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadNMTokensState();
+ loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
- assertTrue(state.getApplicationMasterKeys().isEmpty());
+ assertTrue(loadedAppKeys.isEmpty());
// store a few application keys and verify recovered
ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
@@ -858,10 +952,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
restartStateStore();
state = stateStore.loadNMTokensState();
+ loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
- Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
- state.getApplicationMasterKeys();
assertEquals(2, loadedAppKeys.size());
assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
@@ -880,9 +973,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeNMTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadNMTokensState();
+ loadedAppKeys = loadNMTokens(state.getIterator());
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
- loadedAppKeys = state.getApplicationMasterKeys();
assertEquals(2, loadedAppKeys.size());
assertNull(loadedAppKeys.get(attempt1));
assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
@@ -894,9 +987,10 @@ public class TestNMLeveldbStateStoreService {
// test empty when no state
RecoveredContainerTokensState state =
stateStore.loadContainerTokensState();
+ Map<ContainerId, Long> loadedActiveTokens = loadContainerTokens(state.it);
assertNull(state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
- assertTrue(state.getActiveTokens().isEmpty());
+ assertTrue(loadedActiveTokens.isEmpty());
// store a master key and verify recovered
ContainerTokenKeyGeneratorForTest keygen =
@@ -905,18 +999,20 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
+ loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertNull(state.getPreviousMasterKey());
- assertTrue(state.getActiveTokens().isEmpty());
+ assertTrue(loadedActiveTokens.isEmpty());
// store a previous key and verify recovered
MasterKey prevKey = keygen.generateKey();
stateStore.storeContainerTokenPreviousMasterKey(prevKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
+ loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
- assertTrue(state.getActiveTokens().isEmpty());
+ assertTrue(loadedActiveTokens.isEmpty());
// store a few container tokens and verify recovered
ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
@@ -927,10 +1023,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerToken(cid2, expTime2);
restartStateStore();
state = stateStore.loadContainerTokensState();
+ loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
- Map<ContainerId, Long> loadedActiveTokens =
- state.getActiveTokens();
assertEquals(2, loadedActiveTokens.size());
assertEquals(expTime1, loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
@@ -948,9 +1043,9 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerTokenCurrentMasterKey(currentKey);
restartStateStore();
state = stateStore.loadContainerTokensState();
+ loadedActiveTokens = loadContainerTokens(state.it);
assertEquals(currentKey, state.getCurrentMasterKey());
assertEquals(prevKey, state.getPreviousMasterKey());
- loadedActiveTokens = state.getActiveTokens();
assertEquals(2, loadedActiveTokens.size());
assertNull(loadedActiveTokens.get(cid1));
assertEquals(expTime2, loadedActiveTokens.get(cid2));
@@ -1029,8 +1124,8 @@ public class TestNMLeveldbStateStoreService {
@Test
public void testUnexpectedKeyDoesntThrowException() throws IOException {
// test empty when no state
- List<RecoveredContainerState> recoveredContainers = stateStore
- .loadContainersState();
+ List<RecoveredContainerState> recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
@@ -1045,7 +1140,8 @@ public class TestNMLeveldbStateStoreService {
+ containerId.toString() + "/invalidKey1234").getBytes();
stateStore.getDB().put(invalidKey, new byte[1]);
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
@@ -1162,8 +1258,8 @@ public class TestNMLeveldbStateStoreService {
@Test
public void testStateStoreForResourceMapping() throws IOException {
// test empty when no state
- List<RecoveredContainerState> recoveredContainers = stateStore
- .loadContainersState();
+ List<RecoveredContainerState> recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertTrue(recoveredContainers.isEmpty());
ApplicationId appId = ApplicationId.newInstance(1234, 3);
@@ -1190,7 +1286,8 @@ public class TestNMLeveldbStateStoreService {
// add a invalid key
restartStateStore();
- recoveredContainers = stateStore.loadContainersState();
+ recoveredContainers =
+ loadContainersState(stateStore.getContainerStateIterator());
assertEquals(1, recoveredContainers.size());
RecoveredContainerState rcs = recoveredContainers.get(0);
List<Serializable> res = rcs.getResourceMappings()
@@ -1253,7 +1350,8 @@ public class TestNMLeveldbStateStoreService {
stateStore.storeContainerRestartTimes(containerId,
restartTimes);
restartStateStore();
- RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
+ RecoveredContainerState rcs =
+ loadContainersState(stateStore.getContainerStateIterator()).get(0);
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
assertTrue(recoveredRestartTimes.isEmpty());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org