You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by at...@apache.org on 2013/04/13 01:05:38 UTC
svn commit: r1467511 [3/6] - in
/hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Apr 12 23:05:28 2013
@@ -21,17 +21,20 @@ import java.io.File;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+
/**
* A collection of {@link LocalizedResource}s all of same
@@ -49,30 +52,72 @@ class LocalResourcesTrackerImpl implemen
private final String user;
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
+ private Configuration conf;
+ /*
+ * This flag controls whether this resource tracker uses hierarchical
+ * directories or not. For PRIVATE and PUBLIC resource trackers it
+ * will be set whereas for APPLICATION resource tracker it would
+ * be false.
+ */
+ private final boolean useLocalCacheDirectoryManager;
+ private ConcurrentHashMap<Path, LocalCacheDirectoryManager> directoryManagers;
+ /*
+ * It is used to keep track of resource into hierarchical directory
+ * while it is getting downloaded. It is useful for reference counting
+ * in case resource localization fails.
+ */
+ private ConcurrentHashMap<LocalResourceRequest, Path>
+ inProgressLocalResourcesMap;
+ /*
+ * starting with 10 to accommodate 0-9 directories created as a part of
+ * LocalCacheDirectoryManager. So there will be one unique number generator
+ * per APPLICATION, USER and PUBLIC cache.
+ */
+ private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
- public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
+ public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ boolean useLocalCacheDirectoryManager, Configuration conf) {
this(user, dispatcher,
- new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>());
+ new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
+ useLocalCacheDirectoryManager, conf);
}
LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
- ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc) {
+ ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
+ boolean useLocalCacheDirectoryManager, Configuration conf) {
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
+ this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
+ if ( this.useLocalCacheDirectoryManager) {
+ directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
+ inProgressLocalResourcesMap =
+ new ConcurrentHashMap<LocalResourceRequest, Path>();
+ }
+ this.conf = conf;
}
+ /*
+ * Synchronizing this method for avoiding races due to multiple ResourceEvent's
+ * coming to LocalResourcesTracker from Public/Private localizer and
+ * Resource Localization Service.
+ */
@Override
- public void handle(ResourceEvent event) {
+ public synchronized void handle(ResourceEvent event) {
LocalResourceRequest req = event.getLocalResourceRequest();
LocalizedResource rsrc = localrsrc.get(req);
switch (event.getType()) {
- case REQUEST:
case LOCALIZED:
+ if (useLocalCacheDirectoryManager) {
+ inProgressLocalResourcesMap.remove(req);
+ }
+ break;
+ case REQUEST:
if (rsrc != null && (!isResourcePresent(rsrc))) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
localrsrc.remove(req);
+ decrementFileCountForLocalCacheDirectory(req, rsrc);
rsrc = null;
}
if (null == rsrc) {
@@ -82,15 +127,74 @@ class LocalResourcesTrackerImpl implemen
break;
case RELEASE:
if (null == rsrc) {
- LOG.info("Release unknown rsrc null (discard)");
+ // The container sent a release event on a resource which
+ // 1) Failed
+ // 2) Removed for some reason (ex. disk is no longer accessible)
+ ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
+ LOG.info("Container " + relEvent.getContainer()
+ + " sent RELEASE event on a resource request " + req
+ + " not present in cache.");
return;
}
break;
+ case LOCALIZATION_FAILED:
+ decrementFileCountForLocalCacheDirectory(req, null);
+ /*
+ * If resource localization fails then Localized resource will be
+ * removed from local cache.
+ */
+ localrsrc.remove(req);
+ break;
}
rsrc.handle(event);
}
- /**
+ /*
+ * Update the file-count statistics for a local cache-directory.
+ * This will retrieve the localized path for the resource from
+ * 1) inProgressRsrcMap if the resource was under localization and it
+ * failed.
+ * 2) LocalizedResource if the resource is already localized.
+ * From this path it will identify the local directory under which the
+ * resource was localized. Then rest of the path will be used to decrement
+ * file count for the HierarchicalSubDirectory pointing to this relative
+ * path.
+ */
+ private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
+ LocalizedResource rsrc) {
+ if ( useLocalCacheDirectoryManager) {
+ Path rsrcPath = null;
+ if (inProgressLocalResourcesMap.containsKey(req)) {
+ // This happens when localization of a resource fails.
+ rsrcPath = inProgressLocalResourcesMap.remove(req);
+ } else if (rsrc != null && rsrc.getLocalPath() != null) {
+ rsrcPath = rsrc.getLocalPath().getParent().getParent();
+ }
+ if (rsrcPath != null) {
+ Path parentPath = new Path(rsrcPath.toUri().getRawPath());
+ while (!directoryManagers.containsKey(parentPath)) {
+ parentPath = parentPath.getParent();
+ if ( parentPath == null) {
+ return;
+ }
+ }
+ if ( parentPath != null) {
+ String parentDir = parentPath.toUri().getRawPath().toString();
+ LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
+ String rsrcDir = rsrcPath.toUri().getRawPath();
+ if (rsrcDir.equals(parentDir)) {
+ dir.decrementFileCountForPath("");
+ } else {
+ dir.decrementFileCountForPath(
+ rsrcDir.substring(
+ parentDir.length() + 1));
+ }
+ }
+ }
+ }
+ }
+
+/**
* This module checks if the resource which was localized is already present
* or not
*
@@ -100,7 +204,8 @@ class LocalResourcesTrackerImpl implemen
public boolean isResourcePresent(LocalizedResource rsrc) {
boolean ret = true;
if (rsrc.getState() == ResourceState.LOCALIZED) {
- File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+ File file = new File(rsrc.getLocalPath().toUri().getRawPath().
+ toString());
if (!file.exists()) {
ret = false;
}
@@ -133,11 +238,11 @@ class LocalResourcesTrackerImpl implemen
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
+ decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
return true;
}
}
-
/**
* Returns the path up to the random directory component.
*/
@@ -163,4 +268,43 @@ class LocalResourcesTrackerImpl implemen
public Iterator<LocalizedResource> iterator() {
return localrsrc.values().iterator();
}
-}
+
+ /**
+ * @return {@link Path} absolute path for localization which includes local
+ * directory path and the relative hierarchical path (if use local
+ * cache directory manager is enabled)
+ *
+ * @param {@link LocalResourceRequest} Resource localization request to
+ * localize the resource.
+ * @param {@link Path} local directory path
+ */
+ @Override
+ public Path
+ getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+ if (useLocalCacheDirectoryManager && localDirPath != null) {
+
+ if (!directoryManagers.containsKey(localDirPath)) {
+ directoryManagers.putIfAbsent(localDirPath,
+ new LocalCacheDirectoryManager(conf));
+ }
+ LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
+
+ Path rPath = localDirPath;
+ String hierarchicalPath = dir.getRelativePathForLocalization();
+ // For most of the scenarios we will get root path only which
+ // is an empty string
+ if (!hierarchicalPath.isEmpty()) {
+ rPath = new Path(localDirPath, hierarchicalPath);
+ }
+ inProgressLocalResourcesMap.put(req, rPath);
+ return rPath;
+ } else {
+ return localDirPath;
+ }
+ }
+
+ @Override
+ public long nextUniqueNumber() {
+ return uniqueNumberGenerator.incrementAndGet();
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Fri Apr 12 23:05:28 2013
@@ -32,10 +32,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
@@ -89,6 +91,8 @@ public class LocalizedResource implement
.addTransition(ResourceState.DOWNLOADING,
EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
ResourceEventType.RELEASE, new ReleasePendingTransition())
+ .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
+ ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())
// From LOCALIZED (ref >= 0, on disk)
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
@@ -126,12 +130,14 @@ public class LocalizedResource implement
}
private void release(ContainerId container) {
- if (!ref.remove(container)) {
- LOG.info("Attempt to release claim on " + this +
- " from unregistered container " + container);
- assert false; // TODO: FIX
+ if (ref.remove(container)) {
+ // updating the timestamp only in case of success.
+ timestamp.set(currentTime());
+ } else {
+ LOG.info("Container " + container
+ + " doesn't exist in the container list of the Resource " + this
+ + " to which it sent RELEASE event");
}
- timestamp.set(currentTime());
}
private long currentTime() {
@@ -251,6 +257,25 @@ public class LocalizedResource implement
}
/**
+ * Resource localization failed, notify waiting containers.
+ */
+ @SuppressWarnings("unchecked")
+ private static class FetchFailedTransition extends ResourceTransition {
+ @Override
+ public void transition(LocalizedResource rsrc, ResourceEvent event) {
+ ResourceFailedLocalizationEvent failedEvent =
+ (ResourceFailedLocalizationEvent) event;
+ Queue<ContainerId> containers = rsrc.ref;
+ Throwable failureCause = failedEvent.getCause();
+ for (ContainerId container : containers) {
+ rsrc.dispatcher.getEventHandler().handle(
+ new ContainerResourceFailedEvent(container, failedEvent
+ .getLocalResourceRequest(), failureCause));
+ }
+ }
+ }
+
+ /**
* Resource already localized, notify immediately.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri Apr 12 23:05:28 2013
@@ -34,7 +34,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
@@ -64,6 +63,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -99,11 +100,13 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -130,7 +133,7 @@ public class ResourceLocalizationService
private RecordFactory recordFactory;
private final ScheduledExecutorService cacheCleanup;
- private final LocalResourcesTracker publicRsrc;
+ private LocalResourcesTracker publicRsrc;
private LocalDirsHandlerService dirsHandler;
@@ -158,7 +161,6 @@ public class ResourceLocalizationService
this.delService = delService;
this.dirsHandler = dirsHandler;
- this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
@@ -173,8 +175,26 @@ public class ResourceLocalizationService
}
}
+ private void validateConf(Configuration conf) {
+ int perDirFileLimit =
+ conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+ YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
+ if (perDirFileLimit <= 36) {
+ LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ + " parameter is configured with very low value.");
+ throw new YarnException(
+ YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+ + " parameter is configured with a value less than 37.");
+ } else {
+ LOG.info("per directory file limit = " + perDirFileLimit);
+ }
+ }
+
@Override
public void init(Configuration conf) {
+ this.validateConf(conf);
+ this.publicRsrc =
+ new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
@@ -212,6 +232,7 @@ public class ResourceLocalizationService
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
@@ -306,15 +327,17 @@ public class ResourceLocalizationService
private void handleInitApplicationResources(Application app) {
// 0) Create application tracking structs
String userName = app.getUser();
- privateRsrc.putIfAbsent(userName,
- new LocalResourcesTrackerImpl(userName, dispatcher));
- if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
- new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
+ privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
+ dispatcher, true, super.getConfig()));
+ if (null != appRsrc.putIfAbsent(
+ ConverterUtils.toString(app.getAppId()),
+ new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
+ .getConfig()))) {
LOG.warn("Initializing application " + app + " already present");
assert false; // TODO: FIXME assert doesn't help
// ^ The condition is benign. Tests should fail and it
- // should appear in logs, but it's an internal error
- // that should have no effect on applications
+ // should appear in logs, but it's an internal error
+ // that should have no effect on applications
}
// 1) Signal container init
//
@@ -455,6 +478,21 @@ public class ResourceLocalizationService
}
}
+ private String getUserFileCachePath(String user) {
+ String path =
+ "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
+ return path;
+ }
+
+ private String getUserAppCachePath(String user, String appId) {
+ String path =
+ "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+ + Path.SEPARATOR + appId;
+ return path;
+ }
+
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@@ -620,8 +658,18 @@ public class ResourceLocalizationService
Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
"." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
+ Path hierarchicalPath =
+ publicRsrc.getPathForLocalization(key, publicDirDestPath);
+ if (!hierarchicalPath.equals(publicDirDestPath)) {
+ publicDirDestPath = hierarchicalPath;
+ DiskChecker.checkDir(
+ new File(publicDirDestPath.toUri().getPath()));
+ }
+ publicDirDestPath =
+ new Path(publicDirDestPath, Long.toString(publicRsrc
+ .nextUniqueNumber()));
pending.put(queue.submit(new FSDownload(
- lfs, null, conf, publicDirDestPath, resource, new Random())),
+ lfs, null, conf, publicDirDestPath, resource)),
request);
attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
} catch (IOException e) {
@@ -635,7 +683,6 @@ public class ResourceLocalizationService
}
@Override
- @SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
try {
// TODO shutdown, better error handling esp. DU
@@ -651,22 +698,19 @@ public class ResourceLocalizationService
return;
}
LocalResourceRequest key = assoc.getResource().getRequest();
- assoc.getResource().handle(
- new ResourceLocalizedEvent(key,
- local, FileUtil.getDU(new File(local.toUri()))));
+ publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
+ .getDU(new File(local.toUri()))));
synchronized (attempts) {
attempts.remove(key);
}
} catch (ExecutionException e) {
LOG.info("Failed to download rsrc " + assoc.getResource(),
e.getCause());
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(
- assoc.getContext().getContainerId(),
- assoc.getResource().getRequest(), e.getCause()));
- List<LocalizerResourceRequestEvent> reqs;
+ LocalResourceRequest req = assoc.getResource().getRequest();
+ publicRsrc.handle(new ResourceFailedLocalizationEvent(req, e
+ .getCause()));
synchronized (attempts) {
- LocalResourceRequest req = assoc.getResource().getRequest();
+ List<LocalizerResourceRequestEvent> reqs;
reqs = attempts.get(req);
if (null == reqs) {
LOG.error("Missing pending list for " + req);
@@ -674,13 +718,6 @@ public class ResourceLocalizationService
}
attempts.remove(req);
}
- // let the other containers know about the localization failure
- for (LocalizerResourceRequestEvent reqEvent : reqs) {
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(
- reqEvent.getContext().getContainerId(),
- reqEvent.getResource().getRequest(), e.getCause()));
- }
} catch (CancellationException e) {
// ignore; shutting down
}
@@ -760,20 +797,34 @@ public class ResourceLocalizationService
return null;
}
- // TODO this sucks. Fix it later
- @SuppressWarnings("unchecked") // dispatcher not typed
LocalizerHeartbeatResponse update(
List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
+ String user = context.getUser();
+ ApplicationId applicationId =
+ context.getContainerId().getApplicationAttemptId().getApplicationId();
// The localizer has just spawned. Start giving it resources for
// remote-fetching.
if (remoteResourceStatuses.isEmpty()) {
LocalResource next = findNextResource();
if (next != null) {
response.setLocalizerAction(LocalizerAction.LIVE);
- response.addResource(next);
+ try {
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
+ ResourceLocalizationSpec rsrc =
+ NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+ getPathForLocalization(next));
+ rsrcs.add(rsrc);
+ response.setResourceSpecs(rsrcs);
+ } catch (IOException e) {
+ LOG.error("local path for PRIVATE localization could not be found."
+ + "Disks might have failed.", e);
+ } catch (URISyntaxException e) {
+ // TODO fail? Already translated several times...
+ }
} else if (pending.isEmpty()) {
// TODO: Synchronization
response.setLocalizerAction(LocalizerAction.DIE);
@@ -782,6 +833,12 @@ public class ResourceLocalizationService
}
return response;
}
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
+ /*
+ * TODO : It doesn't support multiple downloads per ContainerLocalizer
+ * at the same time. We need to think whether we should support this.
+ */
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
@@ -801,10 +858,10 @@ public class ResourceLocalizationService
case FETCH_SUCCESS:
// notify resource
try {
- assoc.getResource().handle(
- new ResourceLocalizedEvent(req,
- ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
- stat.getLocalSize()));
+ getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+ .handle(
+ new ResourceLocalizedEvent(req, ConverterUtils
+ .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
} catch (URISyntaxException e) { }
if (pending.isEmpty()) {
// TODO: Synchronization
@@ -814,7 +871,17 @@ public class ResourceLocalizationService
response.setLocalizerAction(LocalizerAction.LIVE);
LocalResource next = findNextResource();
if (next != null) {
- response.addResource(next);
+ try {
+ ResourceLocalizationSpec resource =
+ NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+ getPathForLocalization(next));
+ rsrcs.add(resource);
+ } catch (IOException e) {
+ LOG.error("local path for PRIVATE localization could not be " +
+ "found. Disks might have failed.", e);
+ } catch (URISyntaxException e) {
+ //TODO fail? Already translated several times...
+ }
}
break;
case FETCH_PENDING:
@@ -824,24 +891,45 @@ public class ResourceLocalizationService
LOG.info("DEBUG: FAILED " + req, stat.getException());
assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE);
- // TODO: Why is this event going directly to the container. Why not
- // the resource itself? What happens to the resource? Is it removed?
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(context.getContainerId(),
- req, stat.getException()));
+ getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+ .handle(
+ new ResourceFailedLocalizationEvent(req, stat.getException()));
break;
default:
LOG.info("Unknown status: " + stat.getStatus());
response.setLocalizerAction(LocalizerAction.DIE);
- dispatcher.getEventHandler().handle(
- new ContainerResourceFailedEvent(context.getContainerId(),
- req, stat.getException()));
+ getLocalResourcesTracker(req.getVisibility(), user, applicationId)
+ .handle(
+ new ResourceFailedLocalizationEvent(req, stat.getException()));
break;
}
}
+ response.setResourceSpecs(rsrcs);
return response;
}
+ private Path getPathForLocalization(LocalResource rsrc) throws IOException,
+ URISyntaxException {
+ String user = context.getUser();
+ ApplicationId appId =
+ context.getContainerId().getApplicationAttemptId().getApplicationId();
+ LocalResourceVisibility vis = rsrc.getVisibility();
+ LocalResourcesTracker tracker =
+ getLocalResourcesTracker(vis, user, appId);
+ String cacheDirectory = null;
+ if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
+ cacheDirectory = getUserFileCachePath(user);
+ } else {// APPLICATION ONLY
+ cacheDirectory = getUserAppCachePath(user, appId.toString());
+ }
+ Path dirPath =
+ dirsHandler.getLocalPathForWrite(cacheDirectory,
+ ContainerLocalizer.getEstimatedSize(rsrc), false);
+ dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+ dirPath);
+ return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+ }
+
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java Fri Apr 12 23:05:28 2013
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no
enum ResourceState {
INIT,
DOWNLOADING,
- LOCALIZED
+ LOCALIZED,
+ FAILED
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Fri Apr 12 23:05:28 2013
@@ -29,5 +29,7 @@ public enum ResourceEventType {
/** See {@link ResourceLocalizedEvent} */
LOCALIZED,
/** See {@link ResourceReleaseEvent} */
- RELEASE
+ RELEASE,
+ /** See {@link ResourceFailedLocalizationEvent} */
+ LOCALIZATION_FAILED
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java Fri Apr 12 23:05:28 2013
@@ -72,7 +72,7 @@ public class ContainerInfo {
}
this.user = container.getUser();
- Resource res = container.getLaunchContext().getResource();
+ Resource res = container.getResource();
if (res != null) {
this.totalMemoryNeededMB = res.getMemory();
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto Fri Apr 12 23:05:28 2013
@@ -47,7 +47,12 @@ enum LocalizerActionProto {
DIE = 2;
}
+message ResourceLocalizationSpecProto {
+ optional LocalResourceProto resource = 1;
+ optional URLProto destination_directory = 2;
+}
+
message LocalizerHeartbeatResponseProto {
optional LocalizerActionProto action = 1;
- repeated LocalResourceProto resources = 2;
+ repeated ResourceLocalizationSpecProto resources = 2;
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java Fri Apr 12 23:05:28 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
/**
* This class allows a node manager to run without without communicating with a
@@ -73,9 +74,9 @@ public class MockNodeStatusUpdater exten
LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++);
- NodeHeartbeatResponse nhResponse = recordFactory
- .newRecordInstance(NodeHeartbeatResponse.class);
- nhResponse.setResponseId(heartBeatID);
+ NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils
+ .newNodeHeartbeatResponse(heartBeatID, null, null,
+ null, null, 1000L);
return nhResponse;
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Fri Apr 12 23:05:28 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.Test;
+import static org.mockito.Mockito.*;
+
public class TestEventFlow {
@@ -117,12 +120,15 @@ public class TestEventFlow {
applicationAttemptId.setApplicationId(applicationId);
applicationAttemptId.setAttemptId(0);
cID.setApplicationAttemptId(applicationAttemptId);
- launchContext.setContainerId(cID);
+ Container mockContainer = mock(Container.class);
+ when(mockContainer.getId()).thenReturn(cID);
+ when(mockContainer.getResource()).thenReturn(recordFactory
+ .newRecordInstance(Resource.class));
launchContext.setUser("testing");
- launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
StartContainerRequest request =
recordFactory.newRecordInstance(StartContainerRequest.class);
request.setContainerLaunchContext(launchContext);
+ request.setContainer(mockContainer);
containerManager.startContainer(request);
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java Fri Apr 12 23:05:28 2013
@@ -99,7 +99,9 @@ public class TestNodeManagerReboot {
Records.newRecord(ContainerLaunchContext.class);
// Construct the Container-id
ContainerId cId = createContainerId();
- containerLaunchContext.setContainerId(cId);
+ org.apache.hadoop.yarn.api.records.Container mockContainer =
+ mock(org.apache.hadoop.yarn.api.records.Container.class);
+ when(mockContainer.getId()).thenReturn(cId);
containerLaunchContext.setUser(user);
@@ -122,12 +124,13 @@ public class TestNodeManagerReboot {
containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>();
containerLaunchContext.setCommands(commands);
- containerLaunchContext.setResource(Records
- .newRecord(Resource.class));
- containerLaunchContext.getResource().setMemory(1024);
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(1024);
+ when(mockContainer.getResource()).thenReturn(resource);
StartContainerRequest startRequest =
Records.newRecord(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
+ startRequest.setContainer(mockContainer);
containerManager.startContainer(startRequest);
GetContainerStatusRequest request =
@@ -160,7 +163,10 @@ public class TestNodeManagerReboot {
"container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
- nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
+ // restart the NodeManager
+ nm.stop();
+ nm = new MyNodeManager();
+ nm.start();
numTries = 0;
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
@@ -250,26 +256,6 @@ public class TestNodeManagerReboot {
return delService;
}
- // mimic part of reboot process
- @Override
- public void handle(NodeManagerEvent event) {
- switch (event.getType()) {
- case SHUTDOWN:
- this.stop();
- break;
- case REBOOT:
- this.stop();
- this.createNewMyNodeManager().start();
- break;
- default:
- LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
- }
- }
-
- private MyNodeManager createNewMyNodeManager() {
- return new MyNodeManager();
- }
-
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Fri Apr 12 23:05:28 2013
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -28,6 +31,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
import junit.framework.Assert;
@@ -38,6 +44,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -49,9 +56,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
@@ -71,6 +81,7 @@ public class TestNodeManagerShutdown {
.getRecordFactory(null);
static final String user = "nobody";
private FileContext localFS;
+ private CyclicBarrier syncBarrier = new CyclicBarrier(2);
@Before
public void setup() throws UnsupportedFileSystemException {
@@ -91,16 +102,69 @@ public class TestNodeManagerShutdown {
NodeManager nm = getNodeManager();
nm.init(createNMConfig());
nm.start();
+ startContainers(nm);
+
+ final int MAX_TRIES=20;
+ int numTries = 0;
+ while (!processStartFile.exists() && numTries < MAX_TRIES) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {ex.printStackTrace();}
+ numTries++;
+ }
+
+ nm.stop();
+ // Now verify the contents of the file
+ // Script generates a message when it receives a sigterm
+ // so we look for that
+ BufferedReader reader =
+ new BufferedReader(new FileReader(processStartFile));
+
+ boolean foundSigTermMessage = false;
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) {
+ break;
+ }
+ if (line.contains("SIGTERM")) {
+ foundSigTermMessage = true;
+ break;
+ }
+ }
+ Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
+ reader.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testKillContainersOnResync() throws IOException, InterruptedException {
+ NodeManager nm = new TestNodeManager();
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ nm.start();
+ startContainers(nm);
+
+ assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
+ nm.getNMDispatcher().getEventHandler().
+ handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
+ try {
+ syncBarrier.await();
+ } catch (BrokenBarrierException e) {
+ }
+ assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
+ }
+
+ private void startContainers(NodeManager nm) throws IOException {
ContainerManagerImpl containerManager = nm.getContainerManager();
File scriptFile = createUnhaltingScriptFile();
- ContainerLaunchContext containerLaunchContext =
+ ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
+ Container mockContainer = mock(Container.class);
// Construct the Container-id
ContainerId cId = createContainerId();
- containerLaunchContext.setContainerId(cId);
+ when(mockContainer.getId()).thenReturn(cId);
containerLaunchContext.setUser(user);
@@ -124,11 +188,12 @@ public class TestNodeManagerShutdown {
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
- containerLaunchContext.setResource(recordFactory
- .newRecordInstance(Resource.class));
- containerLaunchContext.getResource().setMemory(1024);
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+ Resource resource = BuilderUtils.newResource(1024, 1);
+ when(mockContainer.getResource()).thenReturn(resource);
+ StartContainerRequest startRequest =
+ recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
+ startRequest.setContainer(mockContainer);
containerManager.startContainer(startRequest);
GetContainerStatusRequest request =
@@ -137,37 +202,6 @@ public class TestNodeManagerShutdown {
ContainerStatus containerStatus =
containerManager.getContainerStatus(request).getStatus();
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
-
- final int MAX_TRIES=20;
- int numTries = 0;
- while (!processStartFile.exists() && numTries < MAX_TRIES) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException ex) {ex.printStackTrace();}
- numTries++;
- }
-
- nm.stop();
-
- // Now verify the contents of the file
- // Script generates a message when it receives a sigterm
- // so we look for that
- BufferedReader reader =
- new BufferedReader(new FileReader(processStartFile));
-
- boolean foundSigTermMessage = false;
- while (true) {
- String line = reader.readLine();
- if (line == null) {
- break;
- }
- if (line.contains("SIGTERM")) {
- foundSigTermMessage = true;
- break;
- }
- }
- Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
- reader.close();
}
private ContainerId createContainerId() {
@@ -226,4 +260,48 @@ public class TestNodeManagerShutdown {
}
};
}
+
+ class TestNodeManager extends NodeManager {
+
+ private int registrationCount = 0;
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new TestNodeStatusUpdaterImpl(context, dispatcher,
+ healthChecker, metrics);
+ }
+
+ public int getNMRegistrationCount() {
+ return registrationCount;
+ }
+
+ class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
+
+ public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ }
+
+ @Override
+ protected void registerWithRM() throws YarnRemoteException {
+ super.registerWithRM();
+ registrationCount++;
+ }
+
+ @Override
+ protected void rebootNodeStatusUpdater() {
+ ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container> containers =
+ getNMContext().getContainers();
+ // ensure that containers are empty before restart nodeStatusUpdater
+ Assert.assertTrue(containers.isEmpty());
+ super.rebootNodeStatusUpdater();
+ try {
+ syncBarrier.await();
+ } catch (InterruptedException e) {
+ } catch (BrokenBarrierException e) {
+ }
+ }
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Fri Apr 12 23:05:28 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -29,6 +30,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -43,14 +46,17 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -58,12 +64,15 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -90,7 +99,8 @@ public class TestNodeStatusUpdater {
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = createNMConfig();
private NodeManager nm;
- protected NodeManager rebootedNodeManager;
+ private boolean containerStatusBackupSuccessfully = true;
+ private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
@After
public void tearDown() {
@@ -159,9 +169,15 @@ public class TestNodeStatusUpdater {
throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID);
+ NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
+ Dispatcher mockDispatcher = mock(Dispatcher.class);
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
+ org.apache.hadoop.yarn.api.records.Container mockContainer =
+ mock(org.apache.hadoop.yarn.api.records.Container.class);
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
@@ -172,10 +188,12 @@ public class TestNodeStatusUpdater {
firstContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
- launchContext.setContainerId(firstContainerID);
- launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
- launchContext.getResource().setMemory(2);
- Container container = new ContainerImpl(conf , null, launchContext, null, null);
+ when(mockContainer.getId()).thenReturn(firstContainerID);
+ Resource resource = BuilderUtils.newResource(2, 1);
+ when(mockContainer.getResource()).thenReturn(resource);
+ Container container =
+ new ContainerImpl(conf, mockDispatcher, launchContext,
+ mockContainer, null, mockMetrics);
this.context.getContainers().put(firstContainerID, container);
} else if (heartBeatID == 2) {
// Checks on the RM end
@@ -196,10 +214,12 @@ public class TestNodeStatusUpdater {
secondContainerID.setId(heartBeatID);
ContainerLaunchContext launchContext = recordFactory
.newRecordInstance(ContainerLaunchContext.class);
- launchContext.setContainerId(secondContainerID);
- launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
- launchContext.getResource().setMemory(3);
- Container container = new ContainerImpl(conf, null, launchContext, null, null);
+ when(mockContainer.getId()).thenReturn(secondContainerID);
+ Resource resource = BuilderUtils.newResource(3, 1);
+ when(mockContainer.getResource()).thenReturn(resource);
+ Container container =
+ new ContainerImpl(conf, mockDispatcher, launchContext,
+ mockContainer, null, mockMetrics);
this.context.getContainers().put(secondContainerID, container);
} else if (heartBeatID == 3) {
// Checks on the RM end
@@ -214,27 +234,43 @@ public class TestNodeStatusUpdater {
Assert.assertEquals(2, activeContainers.size());
}
- NodeHeartbeatResponse nhResponse = recordFactory
- .newRecordInstance(NodeHeartbeatResponse.class);
- nhResponse.setResponseId(heartBeatID);
+ NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+ newNodeHeartbeatResponse(heartBeatID, null, null, null, null, 1000L);
return nhResponse;
}
}
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
- public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
+ public ResourceTracker resourceTracker;
private Context context;
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
this.context = context;
+ resourceTracker = new MyResourceTracker(this.context);
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+ }
+
+ private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
+ public ResourceTracker resourceTracker;
+
+ public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ resourceTracker = new MyResourceTracker4(context);
}
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
+
}
private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
@@ -289,6 +325,21 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
+ private ResourceTracker resourceTracker;
+
+ public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ resourceTracker = new MyResourceTracker5();
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+ }
+
private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater;
@@ -305,6 +356,32 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeManager2 extends NodeManager {
+ public boolean isStopped = false;
+ private NodeStatusUpdater nodeStatusUpdater;
+ private CyclicBarrier syncBarrier;
+ public MyNodeManager2 (CyclicBarrier syncBarrier) {
+ this.syncBarrier = syncBarrier;
+ }
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ nodeStatusUpdater =
+ new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
+ metrics);
+ return nodeStatusUpdater;
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ isStopped = true;
+ try {
+ syncBarrier.await();
+ } catch (Exception e) {
+ }
+ }
+ }
//
private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
@@ -325,10 +402,9 @@ public class TestNodeStatusUpdater {
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
- NodeHeartbeatResponse nhResponse = recordFactory
- .newRecordInstance(NodeHeartbeatResponse.class);
- nhResponse.setResponseId(heartBeatID);
- nhResponse.setNodeAction(heartBeatNodeAction);
+ NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+ newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
+ null, null, 1000L);
return nhResponse;
}
}
@@ -361,10 +437,9 @@ public class TestNodeStatusUpdater {
LOG.info("Got heartBeatId: [" + heartBeatID +"]");
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
- NodeHeartbeatResponse nhResponse =
- recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
- nhResponse.setResponseId(heartBeatID);
- nhResponse.setNodeAction(heartBeatNodeAction);
+ NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
+ newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
+ null, null, 1000L);
if (nodeStatus.getKeepAliveApplications() != null
&& nodeStatus.getKeepAliveApplications().size() > 0) {
@@ -386,6 +461,124 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyResourceTracker4 implements ResourceTracker {
+
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+ private Context context;
+
+ public MyResourceTracker4(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setNodeAction(registerNodeAction);
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ try {
+ if (heartBeatID == 0) {
+ Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+ .size(), 0);
+ Assert.assertEquals(context.getContainers().size(), 0);
+ } else if (heartBeatID == 1) {
+ Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+ .size(), 5);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(0).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(0)
+ .getContainerId().getId() == 1);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(1).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(1)
+ .getContainerId().getId() == 2);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(2).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(2)
+ .getContainerId().getId() == 3);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(3).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(3)
+ .getContainerId().getId() == 4);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(4).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(4)
+ .getContainerId().getId() == 5);
+ throw new YarnException("Lost the heartbeat response");
+ } else if (heartBeatID == 2) {
+ Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+ .size(), 7);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(0).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(0)
+ .getContainerId().getId() == 3);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(1).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(1)
+ .getContainerId().getId() == 4);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(2).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(2)
+ .getContainerId().getId() == 1);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(3).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(3)
+ .getContainerId().getId() == 2);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(4).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(4)
+ .getContainerId().getId() == 5);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(5).getState() == ContainerState.RUNNING
+ && request.getNodeStatus().getContainersStatuses().get(5)
+ .getContainerId().getId() == 6);
+ Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+ .get(6).getState() == ContainerState.COMPLETE
+ && request.getNodeStatus().getContainersStatuses().get(6)
+ .getContainerId().getId() == 7);
+ }
+ } catch (AssertionError error) {
+ LOG.info(error);
+ containerStatusBackupSuccessfully = false;
+ } finally {
+ heartBeatID++;
+ }
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartBeatID);
+ NodeHeartbeatResponse nhResponse =
+ YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
+ heartBeatNodeAction, null, null, null, 1000L);
+ return nhResponse;
+ }
+ }
+
+ private class MyResourceTracker5 implements ResourceTracker {
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+
+ RegisterNodeManagerResponse response = recordFactory
+ .newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setNodeAction(registerNodeAction );
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ heartBeatID++;
+ throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+ }
+ }
+
@Before
public void clearError() {
nmStartError = null;
@@ -473,8 +666,8 @@ public class TestNodeStatusUpdater {
}
@Override
- protected void cleanupContainers() {
- super.cleanupContainers();
+ protected void cleanupContainers(NodeManagerEventType eventType) {
+ super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
}
};
@@ -528,50 +721,6 @@ public class TestNodeStatusUpdater {
}
@Test
- public void testNodeReboot() throws Exception {
- nm = getNodeManager(NodeAction.REBOOT);
- YarnConfiguration conf = createNMConfig();
- nm.init(conf);
- Assert.assertEquals(STATE.INITED, nm.getServiceState());
- nm.start();
-
- int waitCount = 0;
- while (heartBeatID < 1 && waitCount++ != 20) {
- Thread.sleep(500);
- }
- Assert.assertFalse(heartBeatID < 1);
-
- // NM takes a while to reach the STOPPED state.
- waitCount = 0;
- while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
- LOG.info("Waiting for NM to stop..");
- Thread.sleep(1000);
- }
- Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
-
- waitCount = 0;
- while (null == rebootedNodeManager && waitCount++ != 20) {
- LOG.info("Waiting for NM to reinitialize..");
- Thread.sleep(1000);
- }
-
- waitCount = 0;
- while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
- LOG.info("Waiting for NM to start..");
- Thread.sleep(1000);
- }
- Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
-
- rebootedNodeManager.stop();
- waitCount = 0;
- while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
- LOG.info("Waiting for NM to stop..");
- Thread.sleep(1000);
- }
- Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
- }
-
- @Test
public void testNMShutdownForRegistrationFailure() {
nm = new NodeManager() {
@@ -727,6 +876,151 @@ public class TestNodeStatusUpdater {
}
}
+ /**
+ * Test completed containerStatus get back up when heart beat lost
+ */
+ @Test(timeout = 20000)
+ public void testCompletedContainerStatusBackup() throws Exception {
+ nm = new NodeManager() {
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ MyNodeStatusUpdater2 myNodeStatusUpdater =
+ new MyNodeStatusUpdater2(context, dispatcher, healthChecker,
+ metrics);
+ return myNodeStatusUpdater;
+ }
+
+ @Override
+ protected NMContext createNMContext(
+ NMContainerTokenSecretManager containerTokenSecretManager) {
+ return new MyNMContext(containerTokenSecretManager);
+ }
+
+ };
+
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ nm.start();
+
+ int waitCount = 0;
+ while (heartBeatID <= 3 && waitCount++ != 20) {
+ Thread.sleep(500);
+ }
+ if(!containerStatusBackupSuccessfully) {
+ Assert.fail("ContainerStatus Backup failed");
+ }
+ nm.stop();
+ }
+
+ @Test(timeout = 20000)
+ public void testNodeStatusUpdaterRetryAndNMShutdown()
+ throws InterruptedException {
+ final long connectionWaitSecs = 1;
+ final long connectionRetryIntervalSecs = 1;
+ YarnConfiguration conf = createNMConfig();
+ conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+ connectionWaitSecs);
+ conf.setLong(YarnConfiguration
+ .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+ connectionRetryIntervalSecs);
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ nm = new MyNodeManager2(syncBarrier);
+ nm.init(conf);
+ nm.start();
+ try {
+ syncBarrier.await();
+ } catch (Exception e) {
+ }
+ Assert.assertTrue(((MyNodeManager2) nm).isStopped);
+ Assert.assertTrue("calculate heartBeatCount based on" +
+ " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
+ }
+
+ private class MyNMContext extends NMContext {
+ ConcurrentMap<ContainerId, Container> containers =
+ new ConcurrentSkipListMap<ContainerId, Container>();
+
+ public MyNMContext(NMContainerTokenSecretManager
+ containerTokenSecretManager) {
+ super(containerTokenSecretManager);
+ }
+
+ @Override
+ public ConcurrentMap<ContainerId, Container> getContainers() {
+ if (heartBeatID == 0) {
+ return containers;
+ } else if (heartBeatID == 1) {
+ ContainerStatus containerStatus1 =
+ createContainerStatus(1, ContainerState.RUNNING);
+ Container container1 = getMockContainer(containerStatus1);
+ containers.put(containerStatus1.getContainerId(), container1);
+
+ ContainerStatus containerStatus2 =
+ createContainerStatus(2, ContainerState.RUNNING);
+ Container container2 = getMockContainer(containerStatus2);
+ containers.put(containerStatus2.getContainerId(), container2);
+
+ ContainerStatus containerStatus3 =
+ createContainerStatus(3, ContainerState.COMPLETE);
+ Container container3 = getMockContainer(containerStatus3);
+ containers.put(containerStatus3.getContainerId(), container3);
+ completedContainerStatusList.add(containerStatus3);
+
+ ContainerStatus containerStatus4 =
+ createContainerStatus(4, ContainerState.COMPLETE);
+ Container container4 = getMockContainer(containerStatus4);
+ containers.put(containerStatus4.getContainerId(), container4);
+ completedContainerStatusList.add(containerStatus4);
+
+ ContainerStatus containerStatus5 =
+ createContainerStatus(5, ContainerState.RUNNING);
+ Container container5 = getMockContainer(containerStatus5);
+ containers.put(containerStatus5.getContainerId(), container5);
+
+ return containers;
+ } else if (heartBeatID == 2) {
+ ContainerStatus containerStatus6 =
+ createContainerStatus(6, ContainerState.RUNNING);
+ Container container6 = getMockContainer(containerStatus6);
+ containers.put(containerStatus6.getContainerId(), container6);
+
+ ContainerStatus containerStatus7 =
+ createContainerStatus(7, ContainerState.COMPLETE);
+ Container container7 = getMockContainer(containerStatus7);
+ containers.put(containerStatus7.getContainerId(), container7);
+ completedContainerStatusList.add(containerStatus7);
+
+ return containers;
+ } else {
+ containers.clear();
+
+ return containers;
+ }
+ }
+
+ private ContainerStatus createContainerStatus(int id,
+ ContainerState containerState) {
+ ApplicationId applicationId =
+ BuilderUtils.newApplicationId(System.currentTimeMillis(), id);
+ ApplicationAttemptId applicationAttemptId =
+ BuilderUtils.newApplicationAttemptId(applicationId, id);
+ ContainerId contaierId =
+ BuilderUtils.newContainerId(applicationAttemptId, id);
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(contaierId, containerState,
+ "test_containerStatus: id=" + id + ", containerState: "
+ + containerState, 0);
+ return containerStatus;
+ }
+
+ private Container getMockContainer(ContainerStatus containerStatus) {
+ Container container = mock(Container.class);
+ when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
+ return container;
+ }
+ }
+
private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig();
nm.init(conf);
@@ -773,12 +1067,6 @@ public class TestNodeStatusUpdater {
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
-
- @Override
- NodeManager createNewNodeManager() {
- rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
- return rebootedNodeManager;
- }
};
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java?rev=1467511&r1=1467510&r2=1467511&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java Fri Apr 12 23:05:28 2013
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
@@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.ipc.RPCUti
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.util.ConverterUtils;
-
import org.junit.Test;
-import static org.junit.Assert.*;
public class TestPBRecordImpl {
@@ -54,9 +60,8 @@ public class TestPBRecordImpl {
static LocalResource createResource() {
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
assertTrue(ret instanceof LocalResourcePBImpl);
- ret.setResource(
- ConverterUtils.getYarnUrlFromPath(
- new Path("hdfs://y.ak:8020/foo/bar")));
+ ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(
+ "hdfs://y.ak:8020/foo/bar")));
ret.setSize(4344L);
ret.setTimestamp(3141592653589793L);
ret.setVisibility(LocalResourceVisibility.PUBLIC);
@@ -90,16 +95,27 @@ public class TestPBRecordImpl {
return ret;
}
- static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() {
+ static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse()
+ throws URISyntaxException {
LocalizerHeartbeatResponse ret =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
ret.setLocalizerAction(LocalizerAction.LIVE);
- ret.addResource(createResource());
+ LocalResource rsrc = createResource();
+ ArrayList<ResourceLocalizationSpec> rsrcs =
+ new ArrayList<ResourceLocalizationSpec>();
+ ResourceLocalizationSpec resource =
+ recordFactory.newRecordInstance(ResourceLocalizationSpec.class);
+ resource.setResource(rsrc);
+ resource.setDestinationDirectory(ConverterUtils
+ .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis())));
+ rsrcs.add(resource);
+ ret.setResourceSpecs(rsrcs);
+ System.out.println(resource);
return ret;
}
- @Test
+ @Test(timeout=10000)
public void testLocalResourceStatusSerDe() throws Exception {
LocalResourceStatus rsrcS = createLocalResourceStatus();
assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
@@ -119,7 +135,7 @@ public class TestPBRecordImpl {
assertEquals(createResource(), rsrcD.getResource());
}
- @Test
+ @Test(timeout=10000)
public void testLocalizerStatusSerDe() throws Exception {
LocalizerStatus rsrcS = createLocalizerStatus();
assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
@@ -141,7 +157,7 @@ public class TestPBRecordImpl {
assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
}
- @Test
+ @Test(timeout=10000)
public void testLocalizerHeartbeatResponseSerDe() throws Exception {
LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
@@ -158,8 +174,8 @@ public class TestPBRecordImpl {
new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
assertEquals(rsrcS, rsrcD);
- assertEquals(createResource(), rsrcS.getLocalResource(0));
- assertEquals(createResource(), rsrcD.getLocalResource(0));
+ assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource());
+ assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
}
}