You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/07 17:23:21 UTC
svn commit: r1180071 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodeman...
Author: vinodkv
Date: Fri Oct 7 15:23:20 2011
New Revision: 1180071
URL: http://svn.apache.org/viewvc?rev=1180071&view=rev
Log:
MAPREDUCE-2751. Modified NodeManager to stop leaving around local files after application finishes. Contributed by Siddharth Seth.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Oct 7 15:23:20 2011
@@ -1540,6 +1540,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3141. Fix the broken MRAppMaster to work over YARN in security
mode.(vinodkv)
+ MAPREDUCE-2751. Modified NodeManager to stop leaving around local files
+ after application finishes. (Siddharth Seth via vinodkv)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Fri Oct 7 15:23:20 2011
@@ -199,13 +199,18 @@ public class DefaultContainerExecutor ex
throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) {
LOG.info("Deleting absolute path : " + subDir);
- lfs.delete(subDir, true);
+ if (!lfs.delete(subDir, true)) {
+ //Maybe retry
+ LOG.warn("delete returned false for path: [" + subDir + "]");
+ }
return;
}
for (Path baseDir : baseDirs) {
Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
LOG.info("Deleting path : " + del);
- lfs.delete(del, true);
+ if (!lfs.delete(del, true)) {
+ LOG.warn("delete returned false for path: [" + del + "]");
+ }
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Fri Oct 7 15:23:20 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
@@ -125,6 +124,7 @@ public class DeletionService extends Abs
}
} else {
try {
+ LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
exec.deleteAsUser(user, subDir, baseDirs);
} catch (IOException e) {
LOG.warn("Failed to delete as user " + user, e);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Fri Oct 7 15:23:20 2011
@@ -286,6 +286,8 @@ public class ContainerManagerImpl extend
StartContainerResponse response =
recordFactory.newRecordInstance(StartContainerResponse.class);
response.addAllServiceResponse(auxiluaryServices.getMeta());
+ // TODO launchedContainer misplaced -> doesn't necessarily mean a container
+ // launch. A finished Application will not launch containers.
metrics.launchedContainer();
metrics.allocateContainer(launchContext.getResource());
return response;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Fri Oct 7 15:23:20 2011
@@ -152,6 +152,7 @@ public class ApplicationImpl implements
/**
* Notify services of new application.
*/
+ @SuppressWarnings("unchecked")
static class AppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
@@ -180,6 +181,7 @@ public class ApplicationImpl implements
}
}
+ @SuppressWarnings("unchecked")
static class AppInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
@@ -199,6 +201,7 @@ public class ApplicationImpl implements
}
}
+ @SuppressWarnings("unchecked")
static class DuplicateAppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
@@ -229,6 +232,7 @@ public class ApplicationImpl implements
}
}
+ @SuppressWarnings("unchecked")
void handleAppFinishWithContainersCleanedup() {
// Delete Application level resources
this.dispatcher.getEventHandler().handle(
@@ -238,6 +242,7 @@ public class ApplicationImpl implements
// TODO: Trigger the LogsManager
}
+ @SuppressWarnings("unchecked")
static class AppFinishTriggeredTransition
implements
MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@@ -286,6 +291,7 @@ public class ApplicationImpl implements
}
+ @SuppressWarnings("unchecked")
static class AppCompletelyDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Fri Oct 7 15:23:20 2011
@@ -27,7 +27,6 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -124,19 +123,18 @@ public class ContainerLaunch implements
FileContext lfs = FileContext.getLocalFSFileContext();
LocalDirAllocator lDirAllocator =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
+
Path nmPrivateContainerScriptPath =
lDirAllocator.getLocalPathForWrite(
- ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
- + appIdStr + Path.SEPARATOR + containerIdStr
- + Path.SEPARATOR + CONTAINER_SCRIPT, this.conf);
+ getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ + CONTAINER_SCRIPT, this.conf);
Path nmPrivateTokensPath =
lDirAllocator.getLocalPathForWrite(
- ResourceLocalizationService.NM_PRIVATE_DIR
- + Path.SEPARATOR
- + containerIdStr
+ getContainerPrivateDir(appIdStr, containerIdStr)
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerIdStr), this.conf);
+
DataOutputStream containerScriptOutStream = null;
DataOutputStream tokensOutStream = null;
@@ -229,6 +227,16 @@ public class ContainerLaunch implements
return 0;
}
+ private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
+ return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+ + Path.SEPARATOR;
+ }
+
+ private String getAppPrivateDir(String appIdStr) {
+ return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ + appIdStr;
+ }
+
private static class ShellScriptBuilder {
private final StringBuilder sb;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Oct 7 15:23:20 2011
@@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.no
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -37,6 +40,9 @@ import org.apache.hadoop.yarn.server.nod
class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
+ private static final String RANDOM_DIR_REGEX = "-?\\d+";
+ private static final Pattern RANDOM_DIR_PATTERN = Pattern
+ .compile(RANDOM_DIR_REGEX);
private final String user;
private final Dispatcher dispatcher;
@@ -83,28 +89,44 @@ class LocalResourcesTrackerImpl implemen
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
- // current synchronization guaranteed by crude RLS event for cleanup
+ // current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.get(rem.getRequest());
if (null == rsrc) {
- LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
- " from " + getUser());
+ LOG.error("Attempt to remove absent resource: " + rem.getRequest()
+ + " from " + getUser());
return true;
}
if (rsrc.getRefCount() > 0
- || ResourceState.DOWNLOADING.equals(rsrc.getState())
- || rsrc != rem) {
+ || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) {
// internal error
- LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
+ LOG.error("Attempt to remove resource: " + rsrc
+ + " with non-zero refcount");
assert false;
return false;
}
- localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
- delService.delete(getUser(), rsrc.getLocalPath());
+ delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
return true;
}
+
+ /**
+ * Returns the path upto the random directory component.
+ */
+ private Path getPathToDelete(Path localPath) {
+ Path delPath = localPath.getParent();
+ String name = delPath.getName();
+ Matcher matcher = RANDOM_DIR_PATTERN.matcher(name);
+ if (matcher.matches()) {
+ return delPath;
+ } else {
+ LOG.warn("Random directroy component did not match. " +
+ "Deleting localized path only");
+ return localPath;
+ }
+ }
+
@Override
public String getUser() {
return user;
@@ -114,5 +136,4 @@ class LocalResourcesTrackerImpl implemen
public Iterator<LocalizedResource> iterator() {
return localrsrc.values().iterator();
}
-
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri Oct 7 15:23:20 2011
@@ -304,6 +304,7 @@ public class ResourceLocalizationService
retain.addResources(t);
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
}
+ //TODO Check if appRsrcs should also be added to the retention set.
break;
case CLEANUP_CONTAINER_RESOURCES:
ContainerLocalizationCleanupEvent rsrcCleanup =
@@ -336,6 +337,7 @@ public class ResourceLocalizationService
delService.delete(userName, containerDir, new Path[] {});
// Delete the nmPrivate container-dir
+
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
Path appSysDir = new Path(sysDir, appIDStr);
Path containerSysDir = new Path(appSysDir, containerIDStr);
@@ -762,14 +764,16 @@ public class ResourceLocalizationService
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
+ Path nmPrivateCTokensPath = null;
try {
// Use LocalDirAllocator to get nmPrivateDir
- Path nmPrivateCTokensPath =
+ nmPrivateCTokensPath =
localDirsSelector.getLocalPathForWrite(
NM_PRIVATE_DIR
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
localizerId), getConfig());
+
// 0) init queue, etc.
// 1) write credentials to private dir
DataOutputStream tokenOut = null;
@@ -811,6 +815,7 @@ public class ResourceLocalizationService
for (LocalizerResourceRequestEvent event : scheduled.values()) {
event.getResource().unlock();
}
+ delService.delete(null, nmPrivateCTokensPath, new Path[] {});
}
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java?rev=1180071&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java Fri Oct 7 15:23:20 2011
@@ -0,0 +1,418 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+public class TestApplication {
+
+ /**
+ * All container start events before application running.
+ */
+ @Test
+ public void testApplicationInit1() {
+ WrappedApplication wa = null;
+ try {
+ wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
+ wa.initApplication(1);
+ assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+ assertEquals(1, wa.app.getContainers().size());
+ wa.initApplication(0);
+ wa.initApplication(2);
+ assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+ assertEquals(3, wa.app.getContainers().size());
+ wa.applicationInited();
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+ for (int i = 0; i < wa.containers.size(); i++) {
+ verify(wa.containerBus).handle(
+ argThat(new ContainerInitMatcher(wa.containers.get(i)
+ .getContainerID())));
+ }
+ } finally {
+ if (wa != null)
+ wa.finished();
+ }
+ }
+
+ /**
+ * Container start events after Application Running
+ */
+ @Test
+ public void testApplicationInit2() {
+ WrappedApplication wa = null;
+ try {
+ wa = new WrappedApplication(2, 314159265358979L, "yak", 3);
+ wa.initApplication(0);
+ assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+ assertEquals(1, wa.app.getContainers().size());
+
+ wa.applicationInited();
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+ verify(wa.containerBus).handle(
+ argThat(new ContainerInitMatcher(wa.containers.get(0)
+ .getContainerID())));
+
+ wa.initApplication(1);
+ wa.initApplication(2);
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+ assertEquals(3, wa.app.getContainers().size());
+
+ for (int i = 1; i < wa.containers.size(); i++) {
+ verify(wa.containerBus).handle(
+ argThat(new ContainerInitMatcher(wa.containers.get(i)
+ .getContainerID())));
+ }
+ } finally {
+ if (wa != null)
+ wa.finished();
+ }
+ }
+
+ /**
+ * App state RUNNING after all containers complete, before RM sends
+ * APP_FINISHED
+ */
+ @Test
+ public void testAppRunningAfterContainersComplete() {
+ WrappedApplication wa = null;
+ try {
+ wa = new WrappedApplication(3, 314159265358979L, "yak", 3);
+ wa.initApplication(-1);
+ assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+ wa.applicationInited();
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+ wa.containerFinished(0);
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+ assertEquals(2, wa.app.getContainers().size());
+
+ wa.containerFinished(1);
+ wa.containerFinished(2);
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+ assertEquals(0, wa.app.getContainers().size());
+ } finally {
+ if (wa != null)
+ wa.finished();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAppFinishedOnRunningContainers() {
+ WrappedApplication wa = null;
+ try {
+ wa = new WrappedApplication(4, 314159265358979L, "yak", 3);
+ wa.initApplication(-1);
+ assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+ wa.applicationInited();
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+ wa.containerFinished(0);
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+ assertEquals(2, wa.app.getContainers().size());
+
+ wa.appFinished();
+ assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+ wa.app.getApplicationState());
+ assertEquals(2, wa.app.getContainers().size());
+
+ for (int i = 1; i < wa.containers.size(); i++) {
+ verify(wa.containerBus).handle(
+ argThat(new ContainerKillMatcher(wa.containers.get(i)
+ .getContainerID())));
+ }
+
+ wa.containerFinished(1);
+ assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+ wa.app.getApplicationState());
+ assertEquals(1, wa.app.getContainers().size());
+
+ reset(wa.localizerBus);
+ wa.containerFinished(2);
+ // All containers finished. Cleanup should be called.
+ assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+ wa.app.getApplicationState());
+ assertEquals(0, wa.app.getContainers().size());
+
+ verify(wa.localizerBus).handle(
+ refEq(new ApplicationLocalizationEvent(
+ LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+
+ wa.appResourcesCleanedup();
+ assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+
+ } finally {
+ if (wa != null)
+ wa.finished();
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAppFinishedOnCompletedContainers() {
+ WrappedApplication wa = null;
+ try {
+ wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
+ wa.initApplication(-1);
+ assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+ wa.applicationInited();
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+ reset(wa.localizerBus);
+ wa.containerFinished(0);
+ wa.containerFinished(1);
+ wa.containerFinished(2);
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+ assertEquals(0, wa.app.getContainers().size());
+
+ wa.appFinished();
+ assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+ wa.app.getApplicationState());
+
+ verify(wa.localizerBus).handle(
+ refEq(new ApplicationLocalizationEvent(
+ LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+
+ wa.appResourcesCleanedup();
+ assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+ } finally {
+ if (wa != null)
+ wa.finished();
+ }
+ }
+
+//TODO Re-work after Application transitions are changed.
+// @Test
+ @SuppressWarnings("unchecked")
+ public void testStartContainerAfterAppFinished() {
+ WrappedApplication wa = null;
+ try {
+ wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
+ wa.initApplication(-1);
+ assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+ wa.applicationInited();
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+ reset(wa.localizerBus);
+ wa.containerFinished(0);
+ wa.containerFinished(1);
+ wa.containerFinished(2);
+ assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+ assertEquals(0, wa.app.getContainers().size());
+
+ wa.appFinished();
+ assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+ wa.app.getApplicationState());
+ verify(wa.localizerBus).handle(
+ refEq(new ApplicationLocalizationEvent(
+ LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+
+ wa.appResourcesCleanedup();
+ assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+ } finally {
+ if (wa != null)
+ wa.finished();
+ }
+ }
+
+//TODO Re-work after Application transitions are changed.
+// @Test
+ @SuppressWarnings("unchecked")
+ public void testAppFinishedOnIniting() {
+ // AM may send a startContainer() - AM APP_FINIHSED processed after
+ // APP_FINISHED on another NM
+ WrappedApplication wa = null;
+ try {
+ wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
+ wa.initApplication(0);
+ assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+ assertEquals(1, wa.app.getContainers().size());
+
+ reset(wa.localizerBus);
+ wa.appFinished();
+
+ verify(wa.containerBus).handle(
+ argThat(new ContainerKillMatcher(wa.containers.get(0)
+ .getContainerID())));
+ assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+ wa.app.getApplicationState());
+
+ wa.containerFinished(0);
+ assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+ wa.app.getApplicationState());
+ verify(wa.localizerBus).handle(
+ refEq(new ApplicationLocalizationEvent(
+ LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+
+ wa.initApplication(1);
+ assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+ wa.app.getApplicationState());
+ assertEquals(0, wa.app.getContainers().size());
+
+ wa.appResourcesCleanedup();
+ assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+ } finally {
+ if (wa != null)
+ wa.finished();
+ }
+ }
+
+ private class ContainerKillMatcher extends ArgumentMatcher<ContainerEvent> {
+ private ContainerId cId;
+
+ public ContainerKillMatcher(ContainerId cId) {
+ this.cId = cId;
+ }
+
+ @Override
+ public boolean matches(Object argument) {
+ if (argument instanceof ContainerKillEvent) {
+ ContainerKillEvent event = (ContainerKillEvent) argument;
+ return event.getContainerID().equals(cId);
+ }
+ return false;
+ }
+ }
+
+ private class ContainerInitMatcher extends ArgumentMatcher<ContainerEvent> {
+ private ContainerId cId;
+
+ public ContainerInitMatcher(ContainerId cId) {
+ this.cId = cId;
+ }
+
+ @Override
+ public boolean matches(Object argument) {
+ if (argument instanceof ContainerInitEvent) {
+ ContainerInitEvent event = (ContainerInitEvent) argument;
+ return event.getContainerID().equals(cId);
+ }
+ return false;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private class WrappedApplication {
+ final DrainDispatcher dispatcher;
+ final EventHandler<LocalizationEvent> localizerBus;
+ final EventHandler<ContainersLauncherEvent> launcherBus;
+ final EventHandler<ContainersMonitorEvent> monitorBus;
+ final EventHandler<AuxServicesEvent> auxBus;
+ final EventHandler<ContainerEvent> containerBus;
+ final EventHandler<LogAggregatorEvent> logAggregationBus;
+ final String user;
+ final List<Container> containers;
+
+ final ApplicationId appId;
+ final Application app;
+
+ WrappedApplication(int id, long timestamp, String user, int numContainers) {
+ dispatcher = new DrainDispatcher();
+ dispatcher.init(null);
+
+ localizerBus = mock(EventHandler.class);
+ launcherBus = mock(EventHandler.class);
+ monitorBus = mock(EventHandler.class);
+ auxBus = mock(EventHandler.class);
+ containerBus = mock(EventHandler.class);
+ logAggregationBus = mock(EventHandler.class);
+
+ dispatcher.register(LocalizationEventType.class, localizerBus);
+ dispatcher.register(ContainersLauncherEventType.class, launcherBus);
+ dispatcher.register(ContainersMonitorEventType.class, monitorBus);
+ dispatcher.register(AuxServicesEventType.class, auxBus);
+ dispatcher.register(ContainerEventType.class, containerBus);
+ dispatcher.register(LogAggregatorEventType.class, logAggregationBus);
+
+ this.user = user;
+ this.appId = BuilderUtils.newApplicationId(timestamp, id);
+
+ app = new ApplicationImpl(dispatcher, this.user, appId, null);
+ containers = new ArrayList<Container>();
+ for (int i = 0; i < numContainers; i++) {
+ containers.add(createMockedContainer(this.appId, i));
+ }
+
+ dispatcher.start();
+ }
+
+ private void drainDispatcherEvents() {
+ dispatcher.await();
+ }
+
+ public void finished() {
+ dispatcher.stop();
+ }
+
+ public void initApplication(int containerNum) {
+ if (containerNum == -1) {
+ for (int i = 0; i < containers.size(); i++) {
+ app.handle(new ApplicationInitEvent(containers.get(i)));
+ }
+ } else {
+ app.handle(new ApplicationInitEvent(containers.get(containerNum)));
+ }
+ drainDispatcherEvents();
+ }
+
+ public void containerFinished(int containerNum) {
+ app.handle(new ApplicationContainerFinishedEvent(containers.get(
+ containerNum).getContainerID()));
+ drainDispatcherEvents();
+ }
+
+ public void applicationInited() {
+ app.handle(new ApplicationInitedEvent(appId));
+ drainDispatcherEvents();
+ }
+
+ public void appFinished() {
+ app.handle(new ApplicationEvent(appId,
+ ApplicationEventType.FINISH_APPLICATION));
+ drainDispatcherEvents();
+ }
+
+ public void appResourcesCleanedup() {
+ app.handle(new ApplicationEvent(appId,
+ ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+ drainDispatcherEvents();
+ }
+ }
+
+ private Container createMockedContainer(ApplicationId appId, int containerId) {
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 1);
+ ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId);
+ Container c = mock(Container.class);
+ when(c.getContainerID()).thenReturn(cId);
+ return c;
+ }
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Fri Oct 7 15:23:20 2011
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.util.Conve
import org.junit.Test;
import static org.junit.Assert.*;
+import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
@@ -355,7 +356,8 @@ public class TestResourceLocalizationSer
dispatcher.register(ContainerEventType.class, containerBus);
ContainerExecutor exec = mock(ContainerExecutor.class);
- DeletionService delService = new DeletionService(exec);
+ DeletionService delServiceReal = new DeletionService(exec);
+ DeletionService delService = spy(delServiceReal);
delService.init(null);
delService.start();
@@ -407,12 +409,14 @@ public class TestResourceLocalizationSer
rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
// Sigh. Thread init of private localizer not accessible
- Thread.sleep(500);
+ Thread.sleep(1000);
dispatcher.await();
String appStr = ConverterUtils.toString(appId);
String ctnrStr = c.getContainerID().toString();
- verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class),
- eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+ ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
+ verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class),
+ eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+ Path localizationTokenPath = tokenPathCaptor.getValue();
// heartbeat from localizer
LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
@@ -454,10 +458,13 @@ public class TestResourceLocalizationSer
};
dispatcher.await();
verify(containerBus).handle(argThat(matchesContainerLoc));
+
+ // Verify deletion of localization token.
+ verify(delService).delete((String)isNull(), eq(localizationTokenPath));
} finally {
- delService.stop();
- dispatcher.stop();
spyService.stop();
+ dispatcher.stop();
+ delService.stop();
}
}