You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/09/11 08:21:39 UTC
svn commit: r1167676 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/ya...
Author: vinodkv
Date: Sun Sep 11 06:21:39 2011
New Revision: 1167676
URL: http://svn.apache.org/viewvc?rev=1167676&view=rev
Log:
MAPREDUCE-2691. Finish up the cleanup of distributed cache file resources and related tests. Contributed by Siddharth Seth.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun Sep 11 06:21:39 2011
@@ -1269,6 +1269,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2971. ant build mapreduce fails protected access jc.displayJobList
(jobs) (Thomas Graves via mahadev)
+ MAPREDUCE-2691. Finishing up the cleanup of distributed cache file resources
+ and related tests. (Siddharth Seth via vinodkv)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sun Sep 11 06:21:39 2011
@@ -711,7 +711,7 @@ public abstract class TaskAttemptImpl im
String linkName = name.toUri().getPath();
container.setLocalResource(
linkName,
- BuilderUtils.newLocalResource(recordFactory,
+ BuilderUtils.newLocalResource(
p.toUri(), type,
visibilities[i]
? LocalResourceVisibility.PUBLIC
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Sun Sep 11 06:21:39 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -86,12 +87,11 @@ public class BuilderUtils {
}
}
- public static LocalResource newLocalResource(RecordFactory recordFactory,
- URI uri, LocalResourceType type, LocalResourceVisibility visibility,
- long size, long timestamp) {
+ public static LocalResource newLocalResource(URL url, LocalResourceType type,
+ LocalResourceVisibility visibility, long size, long timestamp) {
LocalResource resource =
- recordFactory.newRecordInstance(LocalResource.class);
- resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
+ recordFactory.newRecordInstance(LocalResource.class);
+ resource.setResource(url);
resource.setType(type);
resource.setVisibility(visibility);
resource.setSize(size);
@@ -99,6 +99,13 @@ public class BuilderUtils {
return resource;
}
+ public static LocalResource newLocalResource(URI uri,
+ LocalResourceType type, LocalResourceVisibility visibility, long size,
+ long timestamp) {
+ return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
+ visibility, size, timestamp);
+ }
+
public static ApplicationId newApplicationId(RecordFactory recordFactory,
long clustertimestamp, CharSequence id) {
ApplicationId applicationId =
@@ -125,6 +132,15 @@ public class BuilderUtils {
return applicationId;
}
+ public static ApplicationAttemptId newApplicationAttemptId(
+ ApplicationId appId, int attemptId) {
+ ApplicationAttemptId appAttemptId =
+ recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ appAttemptId.setApplicationId(appId);
+ appAttemptId.setAttemptId(attemptId);
+ return appAttemptId;
+ }
+
public static ApplicationId convert(long clustertimestamp, CharSequence id) {
ApplicationId applicationId =
recordFactory.newRecordInstance(ApplicationId.class);
@@ -133,6 +149,24 @@ public class BuilderUtils {
return applicationId;
}
+ public static ContainerId newContainerId(ApplicationAttemptId appAttemptId,
+ int containerId) {
+ ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
+ id.setAppId(appAttemptId.getApplicationId());
+ id.setId(containerId);
+ id.setAppAttemptId(appAttemptId);
+ return id;
+ }
+
+ public static ContainerId newContainerId(int appId, int appAttemptId,
+ long timestamp, int id) {
+ ApplicationId applicationId = newApplicationId(timestamp, appId);
+ ApplicationAttemptId applicationAttemptId = newApplicationAttemptId(
+ applicationId, appAttemptId);
+ ContainerId cId = newContainerId(applicationAttemptId, id);
+ return cId;
+ }
+
public static ContainerId newContainerId(RecordFactory recordFactory,
ApplicationId appId, ApplicationAttemptId appAttemptId,
int containerId) {
@@ -227,4 +261,20 @@ public class BuilderUtils {
report.setStartTime(startTime);
return report;
}
+
+ public static Resource newResource(int memory) {
+ Resource resource = recordFactory.newRecordInstance(Resource.class);
+ resource.setMemory(memory);
+ return resource;
+ }
+
+ public static URL newURL(String scheme, String host, int port, String file) {
+ URL url = recordFactory.newRecordInstance(URL.class);
+ url.setScheme(scheme);
+ url.setHost(host);
+ url.setPort(port);
+ url.setFile(file);
+ return url;
+ }
+
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Sun Sep 11 06:21:39 2011
@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.no
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -81,6 +84,12 @@ public class ContainerImpl implements Co
new HashMap<LocalResourceRequest,String>();
private final Map<Path,String> localizedResources =
new HashMap<Path,String>();
+ private final List<LocalResourceRequest> publicRsrcs =
+ new ArrayList<LocalResourceRequest>();
+ private final List<LocalResourceRequest> privateRsrcs =
+ new ArrayList<LocalResourceRequest>();
+ private final List<LocalResourceRequest> appRsrcs =
+ new ArrayList<LocalResourceRequest>();
public ContainerImpl(Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@@ -361,7 +370,7 @@ public class ContainerImpl implements Co
}
}
- @SuppressWarnings("fallthrough")
+ @SuppressWarnings({"fallthrough", "unchecked"})
private void finished() {
switch (getContainerState()) {
case EXITED_WITH_SUCCESS:
@@ -404,6 +413,24 @@ public class ContainerImpl implements Co
containerID, exitCode));
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ public void cleanup() {
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ if (!publicRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
+ }
+ if (!privateRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
+ }
+ if (!appRsrcs.isEmpty()) {
+ rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
+ }
+ dispatcher.getEventHandler().handle(
+ new ContainerLocalizationCleanupEvent(this, rsrc));
+ }
+
static class ContainerTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@@ -439,12 +466,6 @@ public class ContainerImpl implements Co
// Send requests for public, private resources
Map<String,LocalResource> cntrRsrc = ctxt.getAllLocalResources();
if (!cntrRsrc.isEmpty()) {
- ArrayList<LocalResourceRequest> publicRsrc =
- new ArrayList<LocalResourceRequest>();
- ArrayList<LocalResourceRequest> privateRsrc =
- new ArrayList<LocalResourceRequest>();
- ArrayList<LocalResourceRequest> appRsrc =
- new ArrayList<LocalResourceRequest>();
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
@@ -453,13 +474,13 @@ public class ContainerImpl implements Co
container.pendingResources.put(req, rsrc.getKey());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
- publicRsrc.add(req);
+ container.publicRsrcs.add(req);
break;
case PRIVATE:
- privateRsrc.add(req);
+ container.privateRsrcs.add(req);
break;
case APPLICATION:
- appRsrc.add(req);
+ container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
@@ -471,27 +492,25 @@ public class ContainerImpl implements Co
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
- if (!publicRsrc.isEmpty()) {
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationRequestEvent(
- container, publicRsrc, LocalResourceVisibility.PUBLIC));
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ if (!container.publicRsrcs.isEmpty()) {
+ req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
}
- if (!privateRsrc.isEmpty()) {
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationRequestEvent(
- container, privateRsrc, LocalResourceVisibility.PRIVATE));
+ if (!container.privateRsrcs.isEmpty()) {
+ req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
}
- if (!appRsrc.isEmpty()) {
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationRequestEvent(
- container, appRsrc, LocalResourceVisibility.APPLICATION));
+ if (!container.appRsrcs.isEmpty()) {
+ req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
}
+
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.dispatcher.getEventHandler().handle(
@@ -546,7 +565,6 @@ public class ContainerImpl implements Co
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@@ -554,13 +572,10 @@ public class ContainerImpl implements Co
// Inform the localizer to decrement reference counts and cleanup
// resources.
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@@ -572,13 +587,10 @@ public class ContainerImpl implements Co
// Inform the localizer to decrement reference counts and cleanup
// resources.
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ResourceFailedTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -592,30 +604,24 @@ public class ContainerImpl implements Co
// Inform the localizer to decrement reference counts and cleanup
// resources.
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
container.metrics.endInitingContainer();
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class KillDuringLocalizationTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the localizer to decrement reference counts and cleanup
// resources.
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
container.metrics.endInitingContainer();
ContainerKillEvent killEvent = (ContainerKillEvent) event;
container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class LocalizedResourceDuringKillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -647,7 +653,6 @@ public class ContainerImpl implements Co
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerKilledTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -657,13 +662,10 @@ public class ContainerImpl implements Co
// The process/process-grp is killed. Decrement reference counts and
// cleanup resources
- container.dispatcher.getEventHandler().handle(
- new ContainerLocalizationEvent(
- LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ container.cleanup();
}
}
- @SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -697,7 +699,8 @@ public class ContainerImpl implements Co
newState =
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
- LOG.warn("Can't handle this event at current state", e);
+ LOG.warn("Can't handle this event at current state: Current: ["
+ + oldState + "], eventType: [" + event.getType() + "]", e);
}
if (oldState != newState) {
LOG.info("Container " + containerID + " transitioned from "
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Sun Sep 11 06:21:39 2011
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -274,7 +273,7 @@ public class ContainerLocalizer {
stat.setLocalPath(
ConverterUtils.getYarnUrlFromPath(localPath));
stat.setLocalSize(
- FileUtil.getDU(new File(localPath.getParent().toString())));
+ FileUtil.getDU(new File(localPath.getParent().toUri())));
stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
} catch (ExecutionException e) {
stat.setStatus(ResourceStatusType.FETCH_FAILURE);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Sun Sep 11 06:21:39 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.nod
* {@link LocalResourceVisibility}.
*
*/
+
class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
@@ -83,7 +84,7 @@ class LocalResourcesTrackerImpl implemen
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
- LocalizedResource rsrc = localrsrc.remove(rem.getRequest());
+ LocalizedResource rsrc = localrsrc.get(rem.getRequest());
if (null == rsrc) {
LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
" from " + getUser());
@@ -93,10 +94,11 @@ class LocalResourcesTrackerImpl implemen
|| ResourceState.DOWNLOADING.equals(rsrc.getState())
|| rsrc != rem) {
// internal error
- LOG.error("Attempt to remove resource with non-zero refcount");
+ LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
assert false;
return false;
}
+ localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), rsrc.getLocalPath());
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Sun Sep 11 06:21:39 2011
@@ -120,7 +120,8 @@ public class LocalizedResource implement
for (ContainerId c : ref) {
sb.append("(").append(c.toString()).append(")");
}
- sb.append("],").append(getTimestamp()).append("}");
+ sb.append("],").append(getTimestamp()).append(",")
+ .append(getState()).append("}");
return sb.toString();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Sun Sep 11 06:21:39 2011
@@ -22,6 +22,7 @@ import java.io.File;
import java.net.URISyntaxException;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -63,7 +65,6 @@ import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -93,7 +94,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -101,6 +102,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
@@ -198,7 +200,7 @@ public class ResourceLocalizationService
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizationServerAddress = NetUtils.createSocketAddr(
conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
- localizerTracker = new LocalizerTracker(conf);
+ localizerTracker = createLocalizerTracker(conf);
dispatcher.register(LocalizerEventType.class, localizerTracker);
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
@@ -218,6 +220,10 @@ public class ResourceLocalizationService
super.start();
}
+ LocalizerTracker createLocalizerTracker(Configuration conf) {
+ return new LocalizerTracker(conf);
+ }
+
Server createServer() {
YarnRPC rpc = YarnRPC.create(getConfig());
Configuration conf = new Configuration(getConfig()); // Clone to separate
@@ -252,6 +258,9 @@ public class ResourceLocalizationService
public void handle(LocalizationEvent event) {
String userName;
String appIDStr;
+ Container c;
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
+ LocalResourcesTracker tracker;
// TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) {
case INIT_APPLICATION_RESOURCES:
@@ -276,28 +285,16 @@ public class ResourceLocalizationService
case INIT_CONTAINER_RESOURCES:
ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event;
- Container c = rsrcReqs.getContainer();
+ c = rsrcReqs.getContainer();
LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerID(), c.getCredentials());
- final LocalResourcesTracker tracker;
- LocalResourceVisibility vis = rsrcReqs.getVisibility();
- switch (vis) {
- default:
- case PUBLIC:
- tracker = publicRsrc;
- break;
- case PRIVATE:
- tracker = privateRsrc.get(c.getUser());
- break;
- case APPLICATION:
- tracker =
- appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
- break;
- }
- // We get separate events one each for all resources of one visibility. So
- // all the resources in this event are of the same visibility.
- for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
- tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
+ rsrcs = rsrcReqs.getRequestedResources();
+ for (LocalResourceVisibility vis : rsrcs.keySet()) {
+ tracker = getLocalResourcesTracker(vis, c.getUser(),
+ c.getContainerID().getAppId());
+ for (LocalResourceRequest req : rsrcs.get(vis)) {
+ tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
+ }
}
break;
case CACHE_CLEANUP:
@@ -311,14 +308,23 @@ public class ResourceLocalizationService
}
break;
case CLEANUP_CONTAINER_RESOURCES:
- Container container =
- ((ContainerLocalizationEvent)event).getContainer();
+ ContainerLocalizationCleanupEvent rsrcCleanup =
+ (ContainerLocalizationCleanupEvent) event;
+ c = rsrcCleanup.getContainer();
+ rsrcs = rsrcCleanup.getResources();
+ for (LocalResourceVisibility vis : rsrcs.keySet()) {
+ tracker = getLocalResourcesTracker(vis, c.getUser(),
+ c.getContainerID().getAppId());
+ for (LocalResourceRequest req : rsrcs.get(vis)) {
+ tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
+ }
+ }
// Delete the container directories
- userName = container.getUser();
- String containerIDStr = container.toString();
+ userName = c.getUser();
+ String containerIDStr = c.toString();
appIDStr =
- ConverterUtils.toString(container.getContainerID().getAppId());
+ ConverterUtils.toString(c.getContainerID().getAppId());
for (Path localDir : localDirs) {
// Delete the user-owned container-dir
@@ -336,8 +342,7 @@ public class ResourceLocalizationService
delService.delete(null, containerSysDir, new Path[] {});
}
- dispatcher.getEventHandler().handle(new ContainerEvent(
- container.getContainerID(),
+ dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
break;
case DESTROY_APPLICATION_RESOURCES:
@@ -379,6 +384,19 @@ public class ResourceLocalizationService
}
}
+ LocalResourcesTracker getLocalResourcesTracker(
+ LocalResourceVisibility visibility, String user, ApplicationId appId) {
+ switch (visibility) {
+ default:
+ case PUBLIC:
+ return publicRsrc;
+ case PRIVATE:
+ return privateRsrc.get(user);
+ case APPLICATION:
+ return appRsrc.get(ConverterUtils.toString(appId));
+ }
+ }
+
/**
* Sub-component handling the spawning of {@link ContainerLocalizer}s
*/
@@ -526,6 +544,7 @@ public class ResourceLocalizationService
}
@Override
+ @SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
try {
// TODO shutdown, better error handling esp. DU
@@ -651,6 +670,7 @@ public class ResourceLocalizationService
}
// TODO this sucks. Fix it later
+ @SuppressWarnings("unchecked") // dispatcher not typed
LocalizerHeartbeatResponse update(
List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
@@ -795,6 +815,7 @@ public class ResourceLocalizationService
}
@Override
+ @SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
dispatcher.getEventHandler().handle(
new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java?rev=1167676&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java Sun Sep 11 06:21:39 2011
@@ -0,0 +1,49 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+public class ContainerLocalizationCleanupEvent extends
+ ContainerLocalizationEvent {
+
+ private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+ rsrc;
+
+ /**
+ * Event requesting the cleanup of the rsrc.
+ * @param c
+ * @param rsrc
+ */
+ public ContainerLocalizationCleanupEvent(Container c,
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
+ super(LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, c);
+ this.rsrc = rsrc;
+ }
+
+ public
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+ getResources() {
+ return rsrc;
+ }
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Sun Sep 11 06:21:39 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import java.util.Collection;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -26,27 +27,23 @@ import org.apache.hadoop.yarn.server.nod
public class ContainerLocalizationRequestEvent extends
ContainerLocalizationEvent {
- private final LocalResourceVisibility vis;
- private final Collection<LocalResourceRequest> reqs;
+ private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+ rsrc;
/**
- * Event requesting the localization of the reqs all with visibility vis
+ * Event requesting the localization of the rsrc.
* @param c
- * @param reqs
- * @param vis
+ * @param rsrc
*/
public ContainerLocalizationRequestEvent(Container c,
- Collection<LocalResourceRequest> reqs, LocalResourceVisibility vis) {
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
- this.vis = vis;
- this.reqs = reqs;
+ this.rsrc = rsrc;
}
- public LocalResourceVisibility getVisibility() {
- return vis;
+ public
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+ getRequestedResources() {
+ return rsrc;
}
-
- public Collection<LocalResourceRequest> getRequestedResources() {
- return reqs;
- }
-}
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java Sun Sep 11 06:21:39 2011
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
-import java.net.URISyntaxException;
-
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
@@ -26,8 +24,8 @@ public class ResourceReleaseEvent extend
private final ContainerId container;
- public ResourceReleaseEvent(LocalResourceRequest rsrc, ContainerId container)
- throws URISyntaxException {
+ public ResourceReleaseEvent(LocalResourceRequest rsrc,
+ ContainerId container) {
super(rsrc, ResourceEventType.RELEASE);
this.container = container;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Sun Sep 11 06:21:39 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
import static org.junit.Assert.fail;
+import java.util.Collection;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -79,14 +81,17 @@ public class DummyContainerManager exten
ContainerLocalizationRequestEvent rsrcReqs =
(ContainerLocalizationRequestEvent) event;
// simulate localization of all requested resources
- for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
- LOG.info("DEBUG: " + req + ":" +
- rsrcReqs.getContainer().getContainerID());
- dispatcher.getEventHandler().handle(
- new ContainerResourceLocalizedEvent(
- rsrcReqs.getContainer().getContainerID(), req,
- new Path("file:///local" + req.getPath().toUri().getPath())));
- }
+ for (Collection<LocalResourceRequest> rc : rsrcReqs
+ .getRequestedResources().values()) {
+ for (LocalResourceRequest req : rc) {
+ LOG.info("DEBUG: " + req + ":"
+ + rsrcReqs.getContainer().getContainerID());
+ dispatcher.getEventHandler().handle(
+ new ContainerResourceLocalizedEvent(rsrcReqs.getContainer()
+ .getContainerID(), req, new Path("file:///local"
+ + req.getPath().toUri().getPath())));
+ }
+ }
break;
case CLEANUP_CONTAINER_RESOURCES:
Container container =
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Sun Sep 11 06:21:39 2011
@@ -17,208 +17,203 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import java.net.URISyntaxException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
-
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Random;
import java.util.Map.Entry;
-import java.util.AbstractMap.SimpleEntry;
+import java.util.Random;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
-import static org.junit.Assert.*;
-
import org.mockito.ArgumentMatcher;
-import static org.mockito.Mockito.*;
public class TestContainer {
final NodeManagerMetrics metrics = NodeManagerMetrics.create();
+
/**
* Verify correct container request events sent to localizer.
*/
@Test
- @SuppressWarnings("unchecked") // mocked generic
public void testLocalizationRequest() throws Exception {
- DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(null);
+ WrappedContainer wc = null;
try {
- dispatcher.start();
- EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
- dispatcher.register(LocalizationEventType.class, localizerBus);
- // null serviceData; no registered AuxServicesEventType handler
-
- ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
- ContainerId cId = getMockContainerId(7, 314159265358979L, 4344);
- when(ctxt.getUser()).thenReturn("yak");
- when(ctxt.getContainerId()).thenReturn(cId);
-
- Random r = new Random();
- long seed = r.nextLong();
- r.setSeed(seed);
- System.out.println("testLocalizationRequest seed: " + seed);
- final Map<String,LocalResource> localResources = createLocalResources(r);
- when(ctxt.getAllLocalResources()).thenReturn(localResources);
-
- final Container c = newContainer(dispatcher, ctxt);
- assertEquals(ContainerState.NEW, c.getContainerState());
+ wc = new WrappedContainer(7, 314159265358979L, 4344, "yak");
+ assertEquals(ContainerState.NEW, wc.c.getContainerState());
+ wc.initContainer();
// Verify request for public/private resources to localizer
- c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
- dispatcher.await();
- ContainerReqMatcher matchesPublicReq =
- new ContainerReqMatcher(localResources,
- EnumSet.of(LocalResourceVisibility.PUBLIC));
- ContainerReqMatcher matchesPrivateReq =
- new ContainerReqMatcher(localResources,
- EnumSet.of(LocalResourceVisibility.PRIVATE));
- ContainerReqMatcher matchesAppReq =
- new ContainerReqMatcher(localResources,
- EnumSet.of(LocalResourceVisibility.APPLICATION));
- verify(localizerBus).handle(argThat(matchesPublicReq));
- verify(localizerBus).handle(argThat(matchesPrivateReq));
- verify(localizerBus).handle(argThat(matchesAppReq));
- assertEquals(ContainerState.LOCALIZING, c.getContainerState());
- } finally {
- dispatcher.stop();
+ ResourcesRequestedMatcher matchesReq =
+ new ResourcesRequestedMatcher(wc.localResources, EnumSet.of(
+ LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
+ LocalResourceVisibility.APPLICATION));
+ verify(wc.localizerBus).handle(argThat(matchesReq));
+ assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState());
}
+ finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
}
/**
* Verify container launch when all resources already cached.
*/
@Test
- @SuppressWarnings("unchecked") // mocked generic
public void testLocalizationLaunch() throws Exception {
- DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(null);
+ WrappedContainer wc = null;
try {
- dispatcher.start();
- EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
- dispatcher.register(LocalizationEventType.class, localizerBus);
- EventHandler<ContainersLauncherEvent> launcherBus =
- mock(EventHandler.class);
- dispatcher.register(ContainersLauncherEventType.class, launcherBus);
- // null serviceData; no registered AuxServicesEventType handler
-
- ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
- ContainerId cId = getMockContainerId(8, 314159265358979L, 4344);
- when(ctxt.getUser()).thenReturn("yak");
- when(ctxt.getContainerId()).thenReturn(cId);
-
- Random r = new Random();
- long seed = r.nextLong();
- r.setSeed(seed);
- System.out.println("testLocalizationLaunch seed: " + seed);
- final Map<String,LocalResource> localResources = createLocalResources(r);
- when(ctxt.getAllLocalResources()).thenReturn(localResources);
- final Container c = newContainer(dispatcher, ctxt);
- assertEquals(ContainerState.NEW, c.getContainerState());
-
- c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
- dispatcher.await();
-
- // Container prepared for localization events
- Path cache = new Path("file:///cache");
- Map<Path,String> localPaths = new HashMap<Path,String>();
- for (Entry<String,LocalResource> rsrc : localResources.entrySet()) {
- assertEquals(ContainerState.LOCALIZING, c.getContainerState());
- LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
- Path p = new Path(cache, rsrc.getKey());
- localPaths.put(p, rsrc.getKey());
- // rsrc copied to p
- c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), req, p));
- }
- dispatcher.await();
+ wc = new WrappedContainer(8, 314159265358979L, 4344, "yak");
+ assertEquals(ContainerState.NEW, wc.c.getContainerState());
+ wc.initContainer();
+ Map<Path, String> localPaths = wc.localizeResources();
// all resources should be localized
- assertEquals(ContainerState.LOCALIZED, c.getContainerState());
- for (Entry<Path,String> loc : c.getLocalizedResources().entrySet()) {
+ assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+ for (Entry<Path,String> loc : wc.c.getLocalizedResources().entrySet()) {
assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
}
assertTrue(localPaths.isEmpty());
+ final WrappedContainer wcf = wc;
// verify container launch
ArgumentMatcher<ContainersLauncherEvent> matchesContainerLaunch =
new ArgumentMatcher<ContainersLauncherEvent>() {
@Override
public boolean matches(Object o) {
ContainersLauncherEvent launchEvent = (ContainersLauncherEvent) o;
- return c == launchEvent.getContainer();
+ return wcf.c == launchEvent.getContainer();
}
};
- verify(launcherBus).handle(argThat(matchesContainerLaunch));
+ verify(wc.launcherBus).handle(argThat(matchesContainerLaunch));
} finally {
- dispatcher.stop();
+ if (wc != null) {
+ wc.finished();
+ }
}
}
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
+ public void testCleanupOnFailure() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(10, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ reset(wc.localizerBus);
+ wc.containerFailed(ExitCode.KILLED.getExitCode());
+ assertEquals(ContainerState.EXITED_WITH_FAILURE,
+ wc.c.getContainerState());
+ verifyCleanupCall(wc);
+ }
+ finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
+ public void testCleanupOnSuccess() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(11, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ reset(wc.localizerBus);
+ wc.containerSuccessful();
+ assertEquals(ContainerState.EXITED_WITH_SUCCESS,
+ wc.c.getContainerState());
+
+ verifyCleanupCall(wc);
+ }
+ finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generic
+ public void testCleanupOnKillRequest() throws Exception {
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(12, 314159265358979L, 4344, "yak");
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ reset(wc.localizerBus);
+ wc.killContainer();
+ assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+ wc.containerKilledOnRequest();
+
+ verifyCleanupCall(wc);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
/**
* Verify serviceData correctly sent.
*/
@Test
- @SuppressWarnings("unchecked") // mocked generic
public void testServiceData() throws Exception {
- DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(null);
- dispatcher.start();
+ WrappedContainer wc = null;
try {
- EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
- dispatcher.register(LocalizationEventType.class, localizerBus);
- EventHandler<AuxServicesEvent> auxBus = mock(EventHandler.class);
- dispatcher.register(AuxServicesEventType.class, auxBus);
- EventHandler<ContainersLauncherEvent> launchBus = mock(EventHandler.class);
- dispatcher.register(ContainersLauncherEventType.class, launchBus);
-
- ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
- final ContainerId cId = getMockContainerId(9, 314159265358979L, 4344);
- when(ctxt.getUser()).thenReturn("yak");
- when(ctxt.getContainerId()).thenReturn(cId);
- when(ctxt.getAllLocalResources()).thenReturn(
- Collections.<String,LocalResource>emptyMap());
-
- Random r = new Random();
- long seed = r.nextLong();
- r.setSeed(seed);
- System.out.println("testServiceData seed: " + seed);
- final Map<String,ByteBuffer> serviceData = createServiceData(r);
- when(ctxt.getAllServiceData()).thenReturn(serviceData);
-
- final Container c = newContainer(dispatcher, ctxt);
- assertEquals(ContainerState.NEW, c.getContainerState());
-
- // Verify propagation of service data to AuxServices
- c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
- dispatcher.await();
- for (final Map.Entry<String,ByteBuffer> e : serviceData.entrySet()) {
+ wc = new WrappedContainer(9, 314159265358979L, 4344, "yak", false, true);
+ assertEquals(ContainerState.NEW, wc.c.getContainerState());
+ wc.initContainer();
+
+ for (final Map.Entry<String,ByteBuffer> e : wc.serviceData.entrySet()) {
ArgumentMatcher<AuxServicesEvent> matchesServiceReq =
new ArgumentMatcher<AuxServicesEvent>() {
@Override
@@ -228,9 +223,10 @@ public class TestContainer {
&& 0 == e.getValue().compareTo(evt.getServiceData());
}
};
- verify(auxBus).handle(argThat(matchesServiceReq));
+ verify(wc.auxBus).handle(argThat(matchesServiceReq));
}
+ final WrappedContainer wcf = wc;
// verify launch on empty resource request
ArgumentMatcher<ContainersLauncherEvent> matchesLaunchReq =
new ArgumentMatcher<ContainersLauncherEvent>() {
@@ -238,61 +234,103 @@ public class TestContainer {
public boolean matches(Object o) {
ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
- && cId == evt.getContainer().getContainerID();
+ && wcf.cId == evt.getContainer().getContainerID();
}
};
- verify(launchBus).handle(argThat(matchesLaunchReq));
+ verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
} finally {
- dispatcher.stop();
+ if (wc != null) {
+ wc.finished();
+ }
}
}
- // Accept iff the resource request payload matches.
- static class ContainerReqMatcher extends ArgumentMatcher<LocalizationEvent> {
+ private void verifyCleanupCall(WrappedContainer wc) throws Exception {
+ ResourcesReleasedMatcher matchesReq =
+ new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
+ LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
+ LocalResourceVisibility.APPLICATION));
+ verify(wc.localizerBus).handle(argThat(matchesReq));
+ }
+
+ private static class ResourcesReleasedMatcher extends
+ ArgumentMatcher<LocalizationEvent> {
final HashSet<LocalResourceRequest> resources =
- new HashSet<LocalResourceRequest>();
- ContainerReqMatcher(Map<String,LocalResource> allResources,
+ new HashSet<LocalResourceRequest>();
+
+ ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
- for (Entry<String,LocalResource> e : allResources.entrySet()) {
+ for (Entry<String, LocalResource> e : allResources.entrySet()) {
if (vis.contains(e.getValue().getVisibility())) {
resources.add(new LocalResourceRequest(e.getValue()));
}
}
}
+
@Override
public boolean matches(Object o) {
- ContainerLocalizationRequestEvent evt = (ContainerLocalizationRequestEvent) o;
+ if (!(o instanceof ContainerLocalizationCleanupEvent)) {
+ return false;
+ }
+ ContainerLocalizationCleanupEvent evt =
+ (ContainerLocalizationCleanupEvent) o;
final HashSet<LocalResourceRequest> expected =
- new HashSet<LocalResourceRequest>(resources);
- for (LocalResourceRequest rsrc : evt.getRequestedResources()) {
- if (!expected.remove(rsrc)) {
- return false;
+ new HashSet<LocalResourceRequest>(resources);
+ for (Collection<LocalResourceRequest> rc : evt.getResources().values()) {
+ for (LocalResourceRequest rsrc : rc) {
+ if (!expected.remove(rsrc)) {
+ return false;
+ }
}
}
return expected.isEmpty();
}
}
- static Entry<String,LocalResource> getMockRsrc(Random r,
- LocalResourceVisibility vis) {
- LocalResource rsrc = mock(LocalResource.class);
+ // Accept iff the resource payload matches.
+ private static class ResourcesRequestedMatcher extends
+ ArgumentMatcher<LocalizationEvent> {
+ final HashSet<LocalResourceRequest> resources =
+ new HashSet<LocalResourceRequest>();
- String name = Long.toHexString(r.nextLong());
- URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
- when(uri.getScheme()).thenReturn("file");
- when(uri.getHost()).thenReturn(null);
- when(uri.getFile()).thenReturn("/local/" + vis + "/" + name);
+ ResourcesRequestedMatcher(Map<String, LocalResource> allResources,
+ EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
+ for (Entry<String, LocalResource> e : allResources.entrySet()) {
+ if (vis.contains(e.getValue().getVisibility())) {
+ resources.add(new LocalResourceRequest(e.getValue()));
+ }
+ }
+ }
- when(rsrc.getResource()).thenReturn(uri);
- when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
- when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
- when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
- when(rsrc.getVisibility()).thenReturn(vis);
+ @Override
+ public boolean matches(Object o) {
+ ContainerLocalizationRequestEvent evt =
+ (ContainerLocalizationRequestEvent) o;
+ final HashSet<LocalResourceRequest> expected =
+ new HashSet<LocalResourceRequest>(resources);
+ for (Collection<LocalResourceRequest> rc : evt.getRequestedResources()
+ .values()) {
+ for (LocalResourceRequest rsrc : rc) {
+ if (!expected.remove(rsrc)) {
+ return false;
+ }
+ }
+ }
+ return expected.isEmpty();
+ }
+ }
- return new SimpleEntry<String,LocalResource>(name, rsrc);
+ private static Entry<String, LocalResource> getMockRsrc(Random r,
+ LocalResourceVisibility vis) {
+ String name = Long.toHexString(r.nextLong());
+ URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
+ LocalResource rsrc =
+ BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
+ r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+ return new SimpleEntry<String, LocalResource>(name, rsrc);
}
- static Map<String,LocalResource> createLocalResources(Random r) {
+ private static Map<String,LocalResource> createLocalResources(Random r) {
Map<String,LocalResource> localResources =
new HashMap<String,LocalResource>();
for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@@ -313,17 +351,7 @@ public class TestContainer {
return localResources;
}
- static ContainerId getMockContainerId(int appId, long timestamp, int id) {
- ApplicationId aId = mock(ApplicationId.class);
- when(aId.getId()).thenReturn(appId);
- when(aId.getClusterTimestamp()).thenReturn(timestamp);
- ContainerId cId = mock(ContainerId.class);
- when(cId.getId()).thenReturn(id);
- when(cId.getAppId()).thenReturn(aId);
- return cId;
- }
-
- static Map<String,ByteBuffer> createServiceData(Random r) {
+ private static Map<String,ByteBuffer> createServiceData(Random r) {
Map<String,ByteBuffer> serviceData =
new HashMap<String,ByteBuffer>();
for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@@ -335,7 +363,134 @@ public class TestContainer {
return serviceData;
}
- Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
+ private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
return new ContainerImpl(disp, ctx, null, metrics);
}
+
+ @SuppressWarnings("unchecked")
+ private class WrappedContainer {
+ final DrainDispatcher dispatcher;
+ final EventHandler<LocalizationEvent> localizerBus;
+ final EventHandler<ContainersLauncherEvent> launcherBus;
+ final EventHandler<ContainersMonitorEvent> monitorBus;
+ final EventHandler<AuxServicesEvent> auxBus;
+
+ final ContainerLaunchContext ctxt;
+ final ContainerId cId;
+ final Container c;
+ final Map<String, LocalResource> localResources;
+ final Map<String, ByteBuffer> serviceData;
+ final String user;
+
+ WrappedContainer(int appId, long timestamp, int id, String user) {
+ this(appId, timestamp, id, user, true, false);
+ }
+
+ WrappedContainer(int appId, long timestamp, int id, String user,
+ boolean withLocalRes, boolean withServiceData) {
+ dispatcher = new DrainDispatcher();
+ dispatcher.init(null);
+
+ localizerBus = mock(EventHandler.class);
+ launcherBus = mock(EventHandler.class);
+ monitorBus = mock(EventHandler.class);
+ auxBus = mock(EventHandler.class);
+ dispatcher.register(LocalizationEventType.class, localizerBus);
+ dispatcher.register(ContainersLauncherEventType.class, launcherBus);
+ dispatcher.register(ContainersMonitorEventType.class, monitorBus);
+ dispatcher.register(AuxServicesEventType.class, auxBus);
+ this.user = user;
+
+ ctxt = mock(ContainerLaunchContext.class);
+ cId = BuilderUtils.newContainerId(appId, 1, timestamp, id);
+ when(ctxt.getUser()).thenReturn(this.user);
+ when(ctxt.getContainerId()).thenReturn(cId);
+
+ Resource resource = BuilderUtils.newResource(1024);
+ when(ctxt.getResource()).thenReturn(resource);
+
+ if (withLocalRes) {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("WrappedContainerLocalResource seed: " + seed);
+ localResources = createLocalResources(r);
+ } else {
+ localResources = Collections.<String, LocalResource> emptyMap();
+ }
+ when(ctxt.getAllLocalResources()).thenReturn(localResources);
+
+ if (withServiceData) {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("ServiceData seed: " + seed);
+ serviceData = createServiceData(r);
+ } else {
+ serviceData = Collections.<String, ByteBuffer> emptyMap();
+ }
+ when(ctxt.getAllServiceData()).thenReturn(serviceData);
+
+ c = newContainer(dispatcher, ctxt);
+ dispatcher.start();
+ }
+
+ private void drainDispatcherEvents() {
+ dispatcher.await();
+ }
+
+ public void finished() {
+ dispatcher.stop();
+ }
+
+ public void initContainer() {
+ c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
+ drainDispatcherEvents();
+ }
+
+ public Map<Path, String> localizeResources() throws URISyntaxException {
+ Path cache = new Path("file:///cache");
+ Map<Path, String> localPaths = new HashMap<Path, String>();
+ for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
+ assertEquals(ContainerState.LOCALIZING, c.getContainerState());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
+ Path p = new Path(cache, rsrc.getKey());
+ localPaths.put(p, rsrc.getKey());
+ // rsrc copied to p
+ c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(),
+ req, p));
+ }
+ drainDispatcherEvents();
+ return localPaths;
+ }
+
+ public void launchContainer() {
+ c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED));
+ drainDispatcherEvents();
+ }
+
+ public void containerSuccessful() {
+ c.handle(new ContainerEvent(cId,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
+ drainDispatcherEvents();
+ }
+
+ public void containerFailed(int exitCode) {
+ c.handle(new ContainerExitEvent(cId,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode));
+ drainDispatcherEvents();
+ }
+
+ public void killContainer() {
+ c.handle(new ContainerKillEvent(cId, "KillRequest"));
+ drainDispatcherEvents();
+ }
+
+ public void containerKilledOnRequest() {
+ c.handle(new ContainerExitEvent(cId,
+ ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED
+ .getExitCode()));
+ drainDispatcherEvents();
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1167676&r1=1167675&r2=1167676&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Sun Sep 11 06:21:39 2011
@@ -21,10 +21,17 @@ package org.apache.hadoop.yarn.server.no
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import java.util.Set;
+
+import junit.framework.Assert;
import org.apache.avro.ipc.Server;
import org.apache.hadoop.conf.Configuration;
@@ -63,11 +70,15 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -134,6 +145,190 @@ public class TestResourceLocalizationSer
@Test
@SuppressWarnings("unchecked") // mocked generics
+ public void testResourceRelease() throws Exception {
+ Configuration conf = new Configuration();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doNothing().when(spylfs).mkdir(
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[4];
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+
+ Server ignore = mock(Server.class);
+ LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+ //Ignore actual localization
+ EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+ dispatcher.register(LocalizerEventType.class, localizerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = new DeletionService(exec);
+ delService.init(null);
+ delService.start();
+
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher, exec, delService);
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(ignore).when(spyService).createServer();
+ doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
+ isA(Configuration.class));
+ doReturn(lfs).when(spyService)
+ .getLocalFileContext(isA(Configuration.class));
+ try {
+ spyService.init(conf);
+ spyService.start();
+
+ final String user = "user0";
+ // init application
+ final Application app = mock(Application.class);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
+ when(app.getUser()).thenReturn(user);
+ when(app.getAppId()).thenReturn(appId);
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+ dispatcher.await();
+
+ //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
+ LocalResourcesTracker appTracker =
+ spyService.getLocalResourcesTracker(
+ LocalResourceVisibility.APPLICATION, user, appId);
+ LocalResourcesTracker privTracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+ user, appId);
+ LocalResourcesTracker pubTracker =
+ spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+ user, appId);
+
+ // init container.
+ final Container c = getMockContainer(appId, 42);
+
+ // init resources
+ Random r = new Random();
+ long seed = r.nextLong();
+ System.out.println("SEED: " + seed);
+ r.setSeed(seed);
+
+ // Send localization requests for one resource of each type.
+ final LocalResource privResource = getPrivateMockedResource(r);
+ final LocalResourceRequest privReq =
+ new LocalResourceRequest(privResource);
+
+ final LocalResource pubResource = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+ final LocalResource pubResource2 = getPublicMockedResource(r);
+ final LocalResourceRequest pubReq2 =
+ new LocalResourceRequest(pubResource2);
+
+ final LocalResource appResource = getAppMockedResource(r);
+ final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ req.put(LocalResourceVisibility.PRIVATE,
+ Collections.singletonList(privReq));
+ req.put(LocalResourceVisibility.PUBLIC,
+ Collections.singletonList(pubReq));
+ req.put(LocalResourceVisibility.APPLICATION,
+ Collections.singletonList(appReq));
+
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ req2.put(LocalResourceVisibility.PRIVATE,
+ Collections.singletonList(privReq));
+ req2.put(LocalResourceVisibility.PUBLIC,
+ Collections.singletonList(pubReq2));
+
+ Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
+ pubRsrcs.add(pubReq);
+ pubRsrcs.add(pubReq2);
+
+ // Send Request event
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+ spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
+ dispatcher.await();
+
+ int privRsrcCount = 0;
+ for (LocalizedResource lr : privTracker) {
+ privRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
+ Assert.assertEquals(privReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, privRsrcCount);
+
+ int pubRsrcCount = 0;
+ for (LocalizedResource lr : pubTracker) {
+ pubRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+ pubRsrcs.remove(lr.getRequest());
+ }
+ Assert.assertEquals(0, pubRsrcs.size());
+ Assert.assertEquals(2, pubRsrcCount);
+
+ int appRsrcCount = 0;
+ for (LocalizedResource lr : appTracker) {
+ appRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+ Assert.assertEquals(appReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, appRsrcCount);
+
+ //Send Cleanup Event
+ spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
+ req2.remove(LocalResourceVisibility.PRIVATE);
+ spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
+ dispatcher.await();
+
+ pubRsrcs.add(pubReq);
+ pubRsrcs.add(pubReq2);
+
+ privRsrcCount = 0;
+ for (LocalizedResource lr : privTracker) {
+ privRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+ Assert.assertEquals(privReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, privRsrcCount);
+
+ pubRsrcCount = 0;
+ for (LocalizedResource lr : pubTracker) {
+ pubRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
+ pubRsrcs.remove(lr.getRequest());
+ }
+ Assert.assertEquals(0, pubRsrcs.size());
+ Assert.assertEquals(2, pubRsrcCount);
+
+ appRsrcCount = 0;
+ for (LocalizedResource lr : appTracker) {
+ appRsrcCount++;
+ Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
+ Assert.assertEquals(appReq, lr.getRequest());
+ }
+ Assert.assertEquals(1, appRsrcCount);
+ } finally {
+ dispatcher.stop();
+ delService.stop();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") // mocked generics
public void testLocalizationHeartbeat() throws Exception {
Configuration conf = new Configuration();
AbstractFileSystem spylfs =
@@ -175,9 +370,8 @@ public class TestResourceLocalizationSer
// init application
final Application app = mock(Application.class);
- final ApplicationId appId = mock(ApplicationId.class);
- when(appId.getClusterTimestamp()).thenReturn(314159265358979L);
- when(appId.getId()).thenReturn(3);
+ final ApplicationId appId =
+ BuilderUtils.newApplicationId(314159265358979L, 3);
when(app.getUser()).thenReturn("user0");
when(app.getAppId()).thenReturn(appId);
spyService.handle(new ApplicationLocalizationEvent(
@@ -205,11 +399,13 @@ public class TestResourceLocalizationSer
doReturn(out).when(spylfs).createInternal(isA(Path.class),
isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
- final LocalResource resource = getMockResource(r);
+ final LocalResource resource = getPrivateMockedResource(r);
final LocalResourceRequest req = new LocalResourceRequest(resource);
- spyService.handle(new ContainerLocalizationRequestEvent(
- c, Collections.singletonList(req),
- LocalResourceVisibility.PRIVATE));
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+ new HashMap<LocalResourceVisibility,
+ Collection<LocalResourceRequest>>();
+ rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
+ spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
Thread.sleep(500);
dispatcher.await();
@@ -265,42 +461,44 @@ public class TestResourceLocalizationSer
}
}
- static URL getPath(String path) {
- URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
- when(uri.getScheme()).thenReturn("file");
- when(uri.getHost()).thenReturn(null);
- when(uri.getFile()).thenReturn(path);
- return uri;
+ private static URL getPath(String path) {
+ URL url = BuilderUtils.newURL("file", null, 0, path);
+ return url;
}
- static LocalResource getMockResource(Random r) {
- LocalResource rsrc = mock(LocalResource.class);
-
+ private static LocalResource getMockedResource(Random r,
+ LocalResourceVisibility vis) {
String name = Long.toHexString(r.nextLong());
- URL uri = getPath("/local/PRIVATE/" + name);
-
- when(rsrc.getResource()).thenReturn(uri);
- when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
- when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
- when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
- when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
+ URL url = getPath("/local/PRIVATE/" + name);
+ LocalResource rsrc =
+ BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
+ r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
return rsrc;
}
+
+ private static LocalResource getAppMockedResource(Random r) {
+ return getMockedResource(r, LocalResourceVisibility.APPLICATION);
+ }
+
+ private static LocalResource getPublicMockedResource(Random r) {
+ return getMockedResource(r, LocalResourceVisibility.PUBLIC);
+ }
+
+ private static LocalResource getPrivateMockedResource(Random r) {
+ return getMockedResource(r, LocalResourceVisibility.PRIVATE);
+ }
- static Container getMockContainer(ApplicationId appId, int id) {
+ private static Container getMockContainer(ApplicationId appId, int id) {
Container c = mock(Container.class);
- ApplicationAttemptId appAttemptId = Records.newRecord(ApplicationAttemptId.class);
- appAttemptId.setApplicationId(appId);
- appAttemptId.setAttemptId(1);
- ContainerId cId = Records.newRecord(ContainerId.class);
- cId.setAppAttemptId(appAttemptId);
- cId.setAppId(appId);
- cId.setId(id);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 1);
+ ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
when(c.getUser()).thenReturn("user0");
when(c.getContainerID()).thenReturn(cId);
Credentials creds = new Credentials();
creds.addToken(new Text("tok" + id), getToken(id));
when(c.getCredentials()).thenReturn(creds);
+ when(c.toString()).thenReturn(cId.toString());
return c;
}