You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/06/07 18:29:20 UTC
svn commit: r1601151 [2/5] - in
/hadoop/common/branches/HDFS-5442/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yar...
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Sat Jun 7 16:29:10 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.io.File;
+import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -27,14 +28,21 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import com.google.common.annotations.VisibleForTesting;
@@ -53,6 +61,7 @@ class LocalResourcesTrackerImpl implemen
.compile(RANDOM_DIR_REGEX);
private final String user;
+ private final ApplicationId appId;
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
private Configuration conf;
@@ -77,17 +86,22 @@ class LocalResourcesTrackerImpl implemen
* per APPLICATION, USER and PUBLIC cache.
*/
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
+ private NMStateStoreService stateStore;
- public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
- boolean useLocalCacheDirectoryManager, Configuration conf) {
- this(user, dispatcher,
+ public LocalResourcesTrackerImpl(String user, ApplicationId appId,
+ Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
+ Configuration conf, NMStateStoreService stateStore) {
+ this(user, appId, dispatcher,
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
- useLocalCacheDirectoryManager, conf);
+ useLocalCacheDirectoryManager, conf, stateStore);
}
- LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ LocalResourcesTrackerImpl(String user, ApplicationId appId,
+ Dispatcher dispatcher,
ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
- boolean useLocalCacheDirectoryManager, Configuration conf) {
+ boolean useLocalCacheDirectoryManager, Configuration conf,
+ NMStateStoreService stateStore) {
+ this.appId = appId;
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
@@ -98,6 +112,7 @@ class LocalResourcesTrackerImpl implemen
new ConcurrentHashMap<LocalResourceRequest, Path>();
}
this.conf = conf;
+ this.stateStore = stateStore;
}
/*
@@ -119,8 +134,7 @@ class LocalResourcesTrackerImpl implemen
if (rsrc != null && (!isResourcePresent(rsrc))) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
- localrsrc.remove(req);
- decrementFileCountForLocalCacheDirectory(req, rsrc);
+ removeResource(req);
rsrc = null;
}
if (null == rsrc) {
@@ -141,15 +155,102 @@ class LocalResourcesTrackerImpl implemen
}
break;
case LOCALIZATION_FAILED:
- decrementFileCountForLocalCacheDirectory(req, null);
/*
* If resource localization fails then Localized resource will be
* removed from local cache.
*/
- localrsrc.remove(req);
+ removeResource(req);
+ break;
+ case RECOVERED:
+ if (rsrc != null) {
+ LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
+ return;
+ }
+ rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
+ localrsrc.put(req, rsrc);
break;
}
+
rsrc.handle(event);
+
+ if (event.getType() == ResourceEventType.LOCALIZED) {
+ if (rsrc.getLocalPath() != null) {
+ try {
+ stateStore.finishResourceLocalization(user, appId,
+ buildLocalizedResourceProto(rsrc));
+ } catch (IOException ioe) {
+ LOG.error("Error storing resource state for " + rsrc, ioe);
+ }
+ } else {
+ LOG.warn("Resource " + rsrc + " localized without a location");
+ }
+ }
+ }
+
+ private LocalizedResource recoverResource(LocalResourceRequest req,
+ ResourceRecoveredEvent event) {
+ // unique number for a resource is the directory of the resource
+ Path localDir = event.getLocalPath().getParent();
+ long rsrcId = Long.parseLong(localDir.getName());
+
+ // update ID generator to avoid conflicts with existing resources
+ while (true) {
+ long currentRsrcId = uniqueNumberGenerator.get();
+ long nextRsrcId = Math.max(currentRsrcId, rsrcId);
+ if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
+ break;
+ }
+ }
+
+ incrementFileCountForLocalCacheDirectory(localDir.getParent());
+
+ return new LocalizedResource(req, dispatcher);
+ }
+
+ private LocalizedResourceProto buildLocalizedResourceProto(
+ LocalizedResource rsrc) {
+ return LocalizedResourceProto.newBuilder()
+ .setResource(buildLocalResourceProto(rsrc.getRequest()))
+ .setLocalPath(rsrc.getLocalPath().toString())
+ .setSize(rsrc.getSize())
+ .build();
+ }
+
+ private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
+ LocalResourcePBImpl lrpb;
+ if (!(lr instanceof LocalResourcePBImpl)) {
+ lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
+ lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
+ lr.getPattern());
+ }
+ lrpb = (LocalResourcePBImpl) lr;
+ return lrpb.getProto();
+ }
+
+ public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
+ if (useLocalCacheDirectoryManager) {
+ Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
+ cacheDir);
+ if (cacheRoot != null) {
+ LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
+ if (dir == null) {
+ dir = new LocalCacheDirectoryManager(conf);
+ LocalCacheDirectoryManager otherDir =
+ directoryManagers.putIfAbsent(cacheRoot, dir);
+ if (otherDir != null) {
+ dir = otherDir;
+ }
+ }
+ if (cacheDir.equals(cacheRoot)) {
+ dir.incrementFileCountForPath("");
+ } else {
+ String dirStr = cacheDir.toUri().getRawPath();
+ String rootStr = cacheRoot.toUri().getRawPath();
+ dir.incrementFileCountForPath(
+ dirStr.substring(rootStr.length() + 1));
+ }
+ }
+ }
}
/*
@@ -217,11 +318,6 @@ class LocalResourcesTrackerImpl implemen
}
@Override
- public boolean contains(LocalResourceRequest resource) {
- return localrsrc.containsKey(resource);
- }
-
- @Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.get(rem.getRequest());
@@ -237,16 +333,31 @@ class LocalResourcesTrackerImpl implemen
+ " with non-zero refcount");
return false;
} else { // ResourceState is LOCALIZED or INIT
- localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
- decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
+ removeResource(rem.getRequest());
LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
return true;
}
}
+ private void removeResource(LocalResourceRequest req) {
+ LocalizedResource rsrc = localrsrc.remove(req);
+ decrementFileCountForLocalCacheDirectory(req, rsrc);
+ if (rsrc != null) {
+ Path localPath = rsrc.getLocalPath();
+ if (localPath != null) {
+ try {
+ stateStore.removeLocalizedResource(user, appId, localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to remove resource " + rsrc + " from state store",
+ e);
+ }
+ }
+ }
+ }
+
/**
* Returns the path up to the random directory component.
*/
@@ -285,6 +396,7 @@ class LocalResourcesTrackerImpl implemen
@Override
public Path
getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+ Path rPath = localDirPath;
if (useLocalCacheDirectoryManager && localDirPath != null) {
if (!directoryManagers.containsKey(localDirPath)) {
@@ -293,7 +405,7 @@ class LocalResourcesTrackerImpl implemen
}
LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
- Path rPath = localDirPath;
+ rPath = localDirPath;
String hierarchicalPath = dir.getRelativePathForLocalization();
// For most of the scenarios we will get root path only which
// is an empty string
@@ -301,21 +413,36 @@ class LocalResourcesTrackerImpl implemen
rPath = new Path(localDirPath, hierarchicalPath);
}
inProgressLocalResourcesMap.put(req, rPath);
- return rPath;
- } else {
- return localDirPath;
}
- }
- @Override
- public long nextUniqueNumber() {
- return uniqueNumberGenerator.incrementAndGet();
+ rPath = new Path(rPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
+ Path localPath = new Path(rPath, req.getPath().getName());
+ LocalizedResource rsrc = localrsrc.get(req);
+ rsrc.setLocalPath(localPath);
+ LocalResource lr = LocalResource.newInstance(req.getResource(),
+ req.getType(), req.getVisibility(), req.getSize(),
+ req.getTimestamp());
+ try {
+ stateStore.startResourceLocalization(user, appId,
+ ((LocalResourcePBImpl) lr).getProto(), localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to record localization start for " + rsrc, e);
+ }
+ return rPath;
}
- @VisibleForTesting
- @Private
@Override
public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
return localrsrc.get(request);
}
-}
\ No newline at end of file
+
+ @VisibleForTesting
+ LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
+ LocalCacheDirectoryManager mgr = null;
+ if (useLocalCacheDirectoryManager) {
+ mgr = directoryManagers.get(localDirPath);
+ }
+ return mgr;
+ }
+}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Sat Jun 7 16:29:10 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -54,8 +55,8 @@ public class LocalizedResource implement
private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
- Path localPath;
- long size = -1;
+ volatile Path localPath;
+ volatile long size = -1;
final LocalResourceRequest rsrc;
final Dispatcher dispatcher;
final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
@@ -76,6 +77,8 @@ public class LocalizedResource implement
// From INIT (ref == 0, awaiting req)
.addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition())
+ .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
+ ResourceEventType.RECOVERED, new RecoveredTransition())
// From DOWNLOADING (ref > 0, may be localizing)
.addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
@@ -157,6 +160,10 @@ public class LocalizedResource implement
return localPath;
}
+ public void setLocalPath(Path localPath) {
+ this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath);
+ }
+
public long getTimestamp() {
return timestamp.get();
}
@@ -234,7 +241,8 @@ public class LocalizedResource implement
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
- rsrc.localPath = locEvent.getLocation();
+ rsrc.localPath =
+ Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
rsrc.size = locEvent.getSize();
for (ContainerId container : rsrc.ref) {
rsrc.dispatcher.getEventHandler().handle(
@@ -291,4 +299,13 @@ public class LocalizedResource implement
rsrc.release(relEvent.getContainer());
}
}
+
+ private static class RecoveredTransition extends ResourceTransition {
+ @Override
+ public void transition(LocalizedResource rsrc, ResourceEvent event) {
+ ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event;
+ rsrc.localPath = recoveredEvent.getLocalPath();
+ rsrc.size = recoveredEvent.getSize();
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Sat Jun 7 16:29:10 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
@@ -109,10 +112,15 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -142,6 +150,7 @@ public class ResourceLocalizationService
private RecordFactory recordFactory;
private final ScheduledExecutorService cacheCleanup;
private LocalizerTokenSecretManager secretManager;
+ private NMStateStoreService stateStore;
private LocalResourcesTracker publicRsrc;
@@ -163,7 +172,7 @@ public class ResourceLocalizationService
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
- LocalDirsHandlerService dirsHandler) {
+ LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
@@ -175,6 +184,7 @@ public class ResourceLocalizationService
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
+ this.stateStore = stateStore;
}
FileContext getLocalFileContext(Configuration conf) {
@@ -203,15 +213,17 @@ public class ResourceLocalizationService
@Override
public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
- this.publicRsrc =
- new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
+ this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
+ true, conf, stateStore);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
- cleanUpLocalDir(lfs,delService);
+ if (!stateStore.canRecover()) {
+ cleanUpLocalDir(lfs,delService);
+ }
List<String> localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
@@ -249,6 +261,74 @@ public class ResourceLocalizationService
super.serviceInit(conf);
}
+ //Recover localized resources after an NM restart
+ public void recoverLocalizedResources(RecoveredLocalizationState state)
+ throws URISyntaxException {
+ LocalResourceTrackerState trackerState = state.getPublicTrackerState();
+ recoverTrackerResources(publicRsrc, trackerState);
+
+ for (Map.Entry<String, RecoveredUserResources> userEntry :
+ state.getUserResources().entrySet()) {
+ String user = userEntry.getKey();
+ RecoveredUserResources userResources = userEntry.getValue();
+ trackerState = userResources.getPrivateTrackerState();
+ if (!trackerState.isEmpty()) {
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ null, dispatcher, true, super.getConfig(), stateStore);
+ LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
+ tracker);
+ if (oldTracker != null) {
+ tracker = oldTracker;
+ }
+ recoverTrackerResources(tracker, trackerState);
+ }
+
+ for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
+ userResources.getAppTrackerStates().entrySet()) {
+ trackerState = appEntry.getValue();
+ if (!trackerState.isEmpty()) {
+ ApplicationId appId = appEntry.getKey();
+ String appIdStr = ConverterUtils.toString(appId);
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, false, super.getConfig(), stateStore);
+ LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+ tracker);
+ if (oldTracker != null) {
+ tracker = oldTracker;
+ }
+ recoverTrackerResources(tracker, trackerState);
+ }
+ }
+ }
+ }
+
+ private void recoverTrackerResources(LocalResourcesTracker tracker,
+ LocalResourceTrackerState state) throws URISyntaxException {
+ for (LocalizedResourceProto proto : state.getLocalizedResources()) {
+ LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ LOG.info("Recovering localized resource " + req + " at "
+ + proto.getLocalPath());
+ tracker.handle(new ResourceRecoveredEvent(req,
+ new Path(proto.getLocalPath()), proto.getSize()));
+ }
+
+ for (Map.Entry<LocalResourceProto, Path> entry :
+ state.getInProgressResources().entrySet()) {
+ LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ Path localPath = entry.getValue();
+ tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
+
+ // delete any in-progress localizations, containers will request again
+ LOG.info("Deleting in-progress localization for " + req + " at "
+ + localPath);
+ tracker.remove(tracker.getLocalizedResource(req), delService);
+ }
+
+ // TODO: remove untracked directories in local filesystem
+ }
+
@Override
public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
return localizerTracker.processHeartbeat(status);
@@ -337,17 +417,10 @@ public class ResourceLocalizationService
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
- dispatcher, true, super.getConfig()));
- if (null != appRsrc.putIfAbsent(
- ConverterUtils.toString(app.getAppId()),
- new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
- .getConfig()))) {
- LOG.warn("Initializing application " + app + " already present");
- assert false; // TODO: FIXME assert doesn't help
- // ^ The condition is benign. Tests should fail and it
- // should appear in logs, but it's an internal error
- // that should have no effect on applications
- }
+ null, dispatcher, true, super.getConfig(), stateStore));
+ String appIdStr = ConverterUtils.toString(app.getAppId());
+ appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
+ app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
// 1) Signal container init
//
// This is handled by the ApplicationImpl state machine and allows
@@ -446,18 +519,28 @@ public class ResourceLocalizationService
@SuppressWarnings({"unchecked"})
private void handleDestroyApplicationResources(Application application) {
- String userName;
- String appIDStr;
+ String userName = application.getUser();
+ ApplicationId appId = application.getAppId();
+ String appIDStr = application.toString();
LocalResourcesTracker appLocalRsrcsTracker =
- appRsrc.remove(ConverterUtils.toString(application.getAppId()));
- if (null == appLocalRsrcsTracker) {
+ appRsrc.remove(ConverterUtils.toString(appId));
+ if (appLocalRsrcsTracker != null) {
+ for (LocalizedResource rsrc : appLocalRsrcsTracker ) {
+ Path localPath = rsrc.getLocalPath();
+ if (localPath != null) {
+ try {
+ stateStore.removeLocalizedResource(userName, appId, localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr
+ + " from state store", e);
+ }
+ }
+ }
+ } else {
LOG.warn("Removing uninitialized application " + application);
}
- // TODO: What to do with appLocalRsrcsTracker?
// Delete the application directories
- userName = application.getUser();
- appIDStr = application.toString();
for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned app-dir
@@ -668,19 +751,15 @@ public class ResourceLocalizationService
if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResource resource = request.getResource().getRequest();
try {
- Path publicDirDestPath =
+ Path publicRootPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
- Path hierarchicalPath =
- publicRsrc.getPathForLocalization(key, publicDirDestPath);
- if (!hierarchicalPath.equals(publicDirDestPath)) {
- publicDirDestPath = hierarchicalPath;
+ Path publicDirDestPath =
+ publicRsrc.getPathForLocalization(key, publicRootPath);
+ if (!publicDirDestPath.getParent().equals(publicRootPath)) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
- publicDirDestPath =
- new Path(publicDirDestPath, Long.toString(publicRsrc
- .nextUniqueNumber()));
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
@@ -968,9 +1047,8 @@ public class ResourceLocalizationService
Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory,
ContainerLocalizer.getEstimatedSize(rsrc), false);
- dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
- dirPath);
- return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+ return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+ dirPath);
}
@Override
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Sat Jun 7 16:29:10 2014
@@ -31,5 +31,7 @@ public enum ResourceEventType {
/** See {@link ResourceReleaseEvent} */
RELEASE,
/** See {@link ResourceFailedLocalizationEvent} */
- LOCALIZATION_FAILED
+ LOCALIZATION_FAILED,
+ /** See {@link ResourceRecoveredEvent} */
+ RECOVERED
}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Sat Jun 7 16:29:10 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
public class DummyContainerManager extends ContainerManagerImpl {
@@ -75,7 +76,7 @@ public class DummyContainerManager exten
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(super.dispatcher, exec,
- deletionContext, super.dirsHandler) {
+ deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
@Override
public void handle(LocalizationEvent event) {
switch (event.getType()) {
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Sat Jun 7 16:29:10 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -79,7 +80,8 @@ public class TestEventFlow {
YarnConfiguration conf = new YarnConfiguration();
Context context = new NMContext(new NMContainerTokenSecretManager(conf),
- new NMTokenSecretManagerInNM(), null, null) {
+ new NMTokenSecretManagerInNM(), null, null,
+ new NMNullStateStoreService()) {
@Override
public int getHttpPort() {
return 1234;
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java Sat Jun 7 16:29:10 2014
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -27,17 +30,18 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Assert;
-
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -53,10 +58,12 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -185,6 +192,9 @@ public class TestNodeManagerResync {
TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE);
final Container container =
TestNodeStatusUpdater.getMockContainer(testCompleteContainer);
+ NMContainerStatus report =
+ createNMContainerStatus(2, ContainerState.COMPLETE);
+ when(container.getNMContainerStatus()).thenReturn(report);
NodeManager nm = new NodeManager() {
int registerCount = 0;
@@ -203,7 +213,7 @@ public class TestNodeManagerResync {
if (registerCount == 0) {
// first register, no containers info.
try {
- Assert.assertEquals(0, request.getContainerStatuses()
+ Assert.assertEquals(0, request.getNMContainerStatuses()
.size());
} catch (AssertionError error) {
error.printStackTrace();
@@ -214,8 +224,8 @@ public class TestNodeManagerResync {
testCompleteContainer.getContainerId(), container);
} else {
// second register contains the completed container info.
- List<ContainerStatus> statuses =
- request.getContainerStatuses();
+ List<NMContainerStatus> statuses =
+ request.getNMContainerStatuses();
try {
Assert.assertEquals(1, statuses.size());
Assert.assertEquals(testCompleteContainer.getContainerId(),
@@ -510,4 +520,16 @@ public class TestNodeManagerResync {
}
}
}}
+
+ public static NMContainerStatus createNMContainerStatus(int id,
+ ContainerState containerState) {
+ ApplicationId applicationId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, 1);
+ ContainerId containerId = ContainerId.newInstance(applicationAttemptId, id);
+ NMContainerStatus containerReport =
+ NMContainerStatus.newInstance(containerId, containerState,
+ Resource.newInstance(1024, 1), "recover container", 0);
+ return containerReport;
+ }
}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Sat Jun 7 16:29:10 2014
@@ -109,6 +109,36 @@ public class TestNodeManagerShutdown {
}
@Test
+ public void testStateStoreRemovalOnDecommission() throws IOException {
+ final File recoveryDir = new File(basedir, "nm-recovery");
+ nm = new TestNodeManager();
+ YarnConfiguration conf = createNMConfig();
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.NM_RECOVERY_DIR, recoveryDir.getAbsolutePath());
+
+ // verify state store is not removed on normal shutdown
+ nm.init(conf);
+ nm.start();
+ Assert.assertTrue(recoveryDir.exists());
+ Assert.assertTrue(recoveryDir.isDirectory());
+ nm.stop();
+ nm = null;
+ Assert.assertTrue(recoveryDir.exists());
+ Assert.assertTrue(recoveryDir.isDirectory());
+
+ // verify state store is removed on decommissioned shutdown
+ nm = new TestNodeManager();
+ nm.init(conf);
+ nm.start();
+ Assert.assertTrue(recoveryDir.exists());
+ Assert.assertTrue(recoveryDir.isDirectory());
+ nm.getNMContext().setDecommissioned(true);
+ nm.stop();
+ nm = null;
+ Assert.assertFalse(recoveryDir.exists());
+ }
+
+ @Test
public void testKillContainersOnShutdown() throws IOException,
YarnException {
nm = new TestNodeManager();
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Sat Jun 7 16:29:10 2014
@@ -91,6 +91,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@SuppressWarnings("rawtypes")
public class TestNodeStatusUpdater {
@@ -1159,7 +1161,8 @@ public class TestNodeStatusUpdater {
@Override
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
- NMTokenSecretManagerInNM nmTokenSecretManager) {
+ NMTokenSecretManagerInNM nmTokenSecretManager,
+ NMStateStoreService store) {
return new MyNMContext(containerTokenSecretManager,
nmTokenSecretManager);
}
@@ -1268,7 +1271,8 @@ public class TestNodeStatusUpdater {
public MyNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager) {
- super(containerTokenSecretManager, nmTokenSecretManager, null, null);
+ super(containerTokenSecretManager, nmTokenSecretManager, null, null,
+ new NMNullStateStoreService());
}
@Override
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Sat Jun 7 16:29:10 2014
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -103,7 +104,8 @@ public abstract class BaseContainerManag
protected static final int HTTP_PORT = 5412;
protected Configuration conf = new YarnConfiguration();
protected Context context = new NMContext(new NMContainerTokenSecretManager(
- conf), new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf)) {
+ conf), new NMTokenSecretManagerInNM(), null,
+ new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
public int getHttpPort() {
return HTTP_PORT;
};
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java Sat Jun 7 16:29:10 2014
@@ -23,6 +23,7 @@ import org.junit.Assert;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
import org.junit.Test;
public class TestLocalCacheDirectoryManager {
@@ -73,7 +74,7 @@ public class TestLocalCacheDirectoryMana
conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
Exception e = null;
ResourceLocalizationService service =
- new ResourceLocalizationService(null, null, null, null);
+ new ResourceLocalizationService(null, null, null, null, null);
try {
service.init(conf);
} catch (Exception e1) {
@@ -109,4 +110,49 @@ public class TestLocalCacheDirectoryMana
// first sub directory
Assert.assertEquals(firstSubDir, dir.getRelativePathForLocalization());
}
+
+ @Test
+ public void testDirectoryConversion() {
+ for (int i = 0; i < 10000; ++i) {
+ String path = Directory.getRelativePath(i);
+ Assert.assertEquals("Incorrect conversion for " + i, i,
+ Directory.getDirectoryNumber(path));
+ }
+ }
+
+ @Test
+ public void testIncrementFileCountForPath() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+ LocalCacheDirectoryManager.DIRECTORIES_PER_LEVEL + 2);
+ LocalCacheDirectoryManager mgr = new LocalCacheDirectoryManager(conf);
+ final String rootPath = "";
+ mgr.incrementFileCountForPath(rootPath);
+ Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
+ Assert.assertFalse("root dir should be full",
+ rootPath.equals(mgr.getRelativePathForLocalization()));
+ // finish filling the other directory
+ mgr.getRelativePathForLocalization();
+ // free up space in the root dir
+ mgr.decrementFileCountForPath(rootPath);
+ mgr.decrementFileCountForPath(rootPath);
+ Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
+ Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
+ String otherDir = mgr.getRelativePathForLocalization();
+ Assert.assertFalse("root dir should be full", otherDir.equals(rootPath));
+
+ final String deepDir0 = "d/e/e/p/0";
+ final String deepDir1 = "d/e/e/p/1";
+ final String deepDir2 = "d/e/e/p/2";
+ final String deepDir3 = "d/e/e/p/3";
+ mgr.incrementFileCountForPath(deepDir0);
+ Assert.assertEquals(otherDir, mgr.getRelativePathForLocalization());
+ Assert.assertEquals(deepDir0, mgr.getRelativePathForLocalization());
+ Assert.assertEquals("total dir count incorrect after increment",
+ deepDir1, mgr.getRelativePathForLocalization());
+ mgr.incrementFileCountForPath(deepDir2);
+ mgr.incrementFileCountForPath(deepDir1);
+ mgr.incrementFileCountForPath(deepDir2);
+ Assert.assertEquals(deepDir3, mgr.getRelativePathForLocalization());
+ }
}
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Sat Jun 7 16:29:10 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
import static org.mockito.Mockito.any;
import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -34,13 +35,17 @@ import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
@@ -52,10 +57,14 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
public class TestLocalResourcesTrackerImpl {
@@ -92,8 +101,8 @@ public class TestLocalResourcesTrackerIm
localrsrc.put(req1, lr1);
localrsrc.put(req2, lr2);
LocalResourcesTracker tracker =
- new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
- conf);
+ new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+ false, conf, new NMNullStateStoreService());
ResourceEvent req11Event =
new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -176,7 +185,8 @@ public class TestLocalResourcesTrackerIm
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
localrsrc.put(req1, lr1);
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
- dispatcher, localrsrc, false, conf);
+ null, dispatcher, localrsrc, false, conf,
+ new NMNullStateStoreService());
ResourceEvent req11Event = new ResourceRequestEvent(req1,
LocalResourceVisibility.PUBLIC, lc1);
@@ -246,7 +256,8 @@ public class TestLocalResourcesTrackerIm
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourcesTracker tracker =
- new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf);
+ new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+ true, conf, new NMNullStateStoreService());
LocalResourceRequest lr =
createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
@@ -264,6 +275,7 @@ public class TestLocalResourcesTrackerIm
// Container-1 requesting local resource.
tracker.handle(reqEvent1);
+ dispatcher.await();
// New localized Resource should have been added to local resource map
// and the requesting container will be added to its waiting queue.
@@ -280,6 +292,7 @@ public class TestLocalResourcesTrackerIm
ResourceEvent reqEvent2 =
new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2);
tracker.handle(reqEvent2);
+ dispatcher.await();
// Container 2 should have been added to the waiting queue of the local
// resource
@@ -295,6 +308,7 @@ public class TestLocalResourcesTrackerIm
LocalizedResource localizedResource = localrsrc.get(lr);
tracker.handle(resourceFailedEvent);
+ dispatcher.await();
// After receiving failed resource event; all waiting containers will be
// notified with Container Resource Failed Event.
@@ -308,6 +322,7 @@ public class TestLocalResourcesTrackerIm
// exception.
ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1);
tracker.handle(relEvent1);
+ dispatcher.await();
// Container-3 now requests for the same resource. This request call
// is coming prior to Container-2's release call.
@@ -316,6 +331,7 @@ public class TestLocalResourcesTrackerIm
ResourceEvent reqEvent3 =
new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3);
tracker.handle(reqEvent3);
+ dispatcher.await();
// Local resource cache now should have the requested resource and the
// number of waiting containers should be 1.
@@ -327,6 +343,7 @@ public class TestLocalResourcesTrackerIm
// Container-2 Releases the resource
ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2);
tracker.handle(relEvent2);
+ dispatcher.await();
// Making sure that there is no change in the cache after the release.
Assert.assertEquals(1, localrsrc.size());
@@ -340,6 +357,7 @@ public class TestLocalResourcesTrackerIm
ResourceLocalizedEvent localizedEvent =
new ResourceLocalizedEvent(lr, localizedPath, 123L);
tracker.handle(localizedEvent);
+ dispatcher.await();
// Verifying ContainerResourceLocalizedEvent .
verify(containerEventHandler, times(1)).handle(
@@ -351,6 +369,7 @@ public class TestLocalResourcesTrackerIm
// Container-3 releasing the resource.
ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3);
tracker.handle(relEvent3);
+ dispatcher.await();
Assert.assertEquals(0, localrsrc.get(lr).getRefCount());
@@ -384,7 +403,8 @@ public class TestLocalResourcesTrackerIm
ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
- dispatcher, localrsrc, true, conf);
+ null, dispatcher, localrsrc, true, conf,
+ new NMNullStateStoreService());
// This is a random path. NO File creation will take place at this place.
Path localDir = new Path("/tmp");
@@ -401,7 +421,9 @@ public class TestLocalResourcesTrackerIm
tracker.handle(reqEvent1);
// Simulate the process of localization of lr1
- Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+ // NOTE: Localization path from tracker has resource ID at end
+ Path hierarchicalPath1 =
+ tracker.getPathForLocalization(lr1, localDir).getParent();
// Simulate lr1 getting localized
ResourceLocalizedEvent rle1 =
new ResourceLocalizedEvent(lr1,
@@ -417,7 +439,8 @@ public class TestLocalResourcesTrackerIm
new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1);
tracker.handle(reqEvent2);
- Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+ Path hierarchicalPath2 =
+ tracker.getPathForLocalization(lr2, localDir).getParent();
// localization failed.
ResourceFailedLocalizationEvent rfe2 =
new ResourceFailedLocalizationEvent(
@@ -435,7 +458,8 @@ public class TestLocalResourcesTrackerIm
ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
LocalResourceVisibility.PUBLIC, lc1);
tracker.handle(reqEvent3);
- Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
+ Path hierarchicalPath3 =
+ tracker.getPathForLocalization(lr3, localDir).getParent();
// localization successful
ResourceLocalizedEvent rle3 =
new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
@@ -479,6 +503,284 @@ public class TestLocalResourcesTrackerIm
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testStateStoreSuccessfulLocalization() throws Exception {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(1, 1);
+ // This is a random path. NO File creation will take place at this place.
+ final Path localDir = new Path("/tmp");
+ Configuration conf = new YarnConfiguration();
+ DrainDispatcher dispatcher = null;
+ dispatcher = createDispatcher(conf);
+ EventHandler<LocalizerEvent> localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler<LocalizerEvent> containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+ DeletionService mockDelService = mock(DeletionService.class);
+ NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+ try {
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, false, conf, stateStore);
+ // Container 1 needs lr1 resource
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.APPLICATION);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+ // Container 1 requests lr1 to be localized
+ ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+ LocalResourceVisibility.APPLICATION, lc1);
+ tracker.handle(reqEvent1);
+ dispatcher.await();
+
+ // Simulate the process of localization of lr1
+ Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+
+ ArgumentCaptor<LocalResourceProto> localResourceCaptor =
+ ArgumentCaptor.forClass(LocalResourceProto.class);
+ ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ verify(stateStore).startResourceLocalization(eq(user), eq(appId),
+ localResourceCaptor.capture(), pathCaptor.capture());
+ LocalResourceProto lrProto = localResourceCaptor.getValue();
+ Path localizedPath1 = pathCaptor.getValue();
+ Assert.assertEquals(lr1,
+ new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
+ Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
+
+ // Simulate lr1 getting localized
+ ResourceLocalizedEvent rle1 =
+ new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120);
+ tracker.handle(rle1);
+ dispatcher.await();
+
+ ArgumentCaptor<LocalizedResourceProto> localizedProtoCaptor =
+ ArgumentCaptor.forClass(LocalizedResourceProto.class);
+ verify(stateStore).finishResourceLocalization(eq(user), eq(appId),
+ localizedProtoCaptor.capture());
+ LocalizedResourceProto localizedProto = localizedProtoCaptor.getValue();
+ Assert.assertEquals(lr1, new LocalResourceRequest(
+ new LocalResourcePBImpl(localizedProto.getResource())));
+ Assert.assertEquals(localizedPath1.toString(),
+ localizedProto.getLocalPath());
+ LocalizedResource localizedRsrc1 = tracker.getLocalizedResource(lr1);
+ Assert.assertNotNull(localizedRsrc1);
+
+ // simulate release and retention processing
+ tracker.handle(new ResourceReleaseEvent(lr1, cId1));
+ dispatcher.await();
+ boolean removeResult = tracker.remove(localizedRsrc1, mockDelService);
+
+ Assert.assertTrue(removeResult);
+ verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
+ eq(localizedPath1));
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testStateStoreFailedLocalization() throws Exception {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(1, 1);
+ // This is a random path. NO File creation will take place at this place.
+ final Path localDir = new Path("/tmp");
+ Configuration conf = new YarnConfiguration();
+ DrainDispatcher dispatcher = null;
+ dispatcher = createDispatcher(conf);
+ EventHandler<LocalizerEvent> localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler<LocalizerEvent> containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+ NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+ try {
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, false, conf, stateStore);
+ // Container 1 needs lr1 resource
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.APPLICATION);
+ LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+ // Container 1 requests lr1 to be localized
+ ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+ LocalResourceVisibility.APPLICATION, lc1);
+ tracker.handle(reqEvent1);
+ dispatcher.await();
+
+ // Simulate the process of localization of lr1
+ Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+
+ ArgumentCaptor<LocalResourceProto> localResourceCaptor =
+ ArgumentCaptor.forClass(LocalResourceProto.class);
+ ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ verify(stateStore).startResourceLocalization(eq(user), eq(appId),
+ localResourceCaptor.capture(), pathCaptor.capture());
+ LocalResourceProto lrProto = localResourceCaptor.getValue();
+ Path localizedPath1 = pathCaptor.getValue();
+ Assert.assertEquals(lr1,
+ new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
+ Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
+
+ ResourceFailedLocalizationEvent rfe1 =
+ new ResourceFailedLocalizationEvent(
+ lr1, new Exception("Test").toString());
+ tracker.handle(rfe1);
+ dispatcher.await();
+ verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
+ eq(localizedPath1));
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testRecoveredResource() throws Exception {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(1, 1);
+ // This is a random path. NO File creation will take place at this place.
+ final Path localDir = new Path("/tmp/localdir");
+ Configuration conf = new YarnConfiguration();
+ DrainDispatcher dispatcher = null;
+ dispatcher = createDispatcher(conf);
+ EventHandler<LocalizerEvent> localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler<LocalizerEvent> containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+ NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+ try {
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, false, conf, stateStore);
+ // Container 1 needs lr1 resource
+ ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.APPLICATION);
+ Assert.assertNull(tracker.getLocalizedResource(lr1));
+ final long localizedId1 = 52;
+ Path hierarchicalPath1 = new Path(localDir,
+ Long.toString(localizedId1));
+ Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
+ tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
+ dispatcher.await();
+ Assert.assertNotNull(tracker.getLocalizedResource(lr1));
+
+ // verify new paths reflect recovery of previous resources
+ LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+ LocalResourceVisibility.APPLICATION);
+ LocalizerContext lc2 = new LocalizerContext(user, cId1, null);
+ ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2,
+ LocalResourceVisibility.APPLICATION, lc2);
+ tracker.handle(reqEvent2);
+ dispatcher.await();
+ Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+ long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
+ Assert.assertEquals(localizedId1 + 1, localizedId2);
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testRecoveredResourceWithDirCacheMgr() throws Exception {
+ final String user = "someuser";
+ final ApplicationId appId = ApplicationId.newInstance(1, 1);
+ // This is a random path. NO File creation will take place at this place.
+ final Path localDirRoot = new Path("/tmp/localdir");
+ Configuration conf = new YarnConfiguration();
+ DrainDispatcher dispatcher = null;
+ dispatcher = createDispatcher(conf);
+ EventHandler<LocalizerEvent> localizerEventHandler =
+ mock(EventHandler.class);
+ EventHandler<LocalizerEvent> containerEventHandler =
+ mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+ dispatcher.register(ContainerEventType.class, containerEventHandler);
+ NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+ try {
+ LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, true, conf, stateStore);
+ LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+ LocalResourceVisibility.PUBLIC);
+ Assert.assertNull(tracker.getLocalizedResource(lr1));
+ final long localizedId1 = 52;
+ Path hierarchicalPath1 = new Path(localDirRoot + "/4/2",
+ Long.toString(localizedId1));
+ Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
+ tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
+ dispatcher.await();
+ Assert.assertNotNull(tracker.getLocalizedResource(lr1));
+ LocalCacheDirectoryManager dirMgrRoot =
+ tracker.getDirectoryManager(localDirRoot);
+ Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+ Assert.assertEquals(1, dirMgrRoot.getDirectory("4/2").getCount());
+
+ LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+ LocalResourceVisibility.PUBLIC);
+ Assert.assertNull(tracker.getLocalizedResource(lr2));
+ final long localizedId2 = localizedId1 + 1;
+ Path hierarchicalPath2 = new Path(localDirRoot + "/4/2",
+ Long.toString(localizedId2));
+ Path localizedPath2 = new Path(hierarchicalPath2, "resource.jar");
+ tracker.handle(new ResourceRecoveredEvent(lr2, localizedPath2, 120));
+ dispatcher.await();
+ Assert.assertNotNull(tracker.getLocalizedResource(lr2));
+ Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+ Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+
+ LocalResourceRequest lr3 = createLocalResourceRequest(user, 3, 3,
+ LocalResourceVisibility.PUBLIC);
+ Assert.assertNull(tracker.getLocalizedResource(lr3));
+ final long localizedId3 = 128;
+ Path hierarchicalPath3 = new Path(localDirRoot + "/4/3",
+ Long.toString(localizedId3));
+ Path localizedPath3 = new Path(hierarchicalPath3, "resource.jar");
+ tracker.handle(new ResourceRecoveredEvent(lr3, localizedPath3, 120));
+ dispatcher.await();
+ Assert.assertNotNull(tracker.getLocalizedResource(lr3));
+ Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+ Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+ Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
+
+ LocalResourceRequest lr4 = createLocalResourceRequest(user, 4, 4,
+ LocalResourceVisibility.PUBLIC);
+ Assert.assertNull(tracker.getLocalizedResource(lr4));
+ final long localizedId4 = 256;
+ Path hierarchicalPath4 = new Path(localDirRoot + "/4",
+ Long.toString(localizedId4));
+ Path localizedPath4 = new Path(hierarchicalPath4, "resource.jar");
+ tracker.handle(new ResourceRecoveredEvent(lr4, localizedPath4, 120));
+ dispatcher.await();
+ Assert.assertNotNull(tracker.getLocalizedResource(lr4));
+ Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+ Assert.assertEquals(1, dirMgrRoot.getDirectory("4").getCount());
+ Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+ Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
+ } finally {
+ if (dispatcher != null) {
+ dispatcher.stop();
+ }
+ }
+ }
+
private boolean createdummylocalizefile(Path path) {
boolean ret = false;
File file = new File(path.toUri().getRawPath().toString());