You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/07 17:23:21 UTC

svn commit: r1180071 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodeman...

Author: vinodkv
Date: Fri Oct  7 15:23:20 2011
New Revision: 1180071

URL: http://svn.apache.org/viewvc?rev=1180071&view=rev
Log:
MAPREDUCE-2751. Modified NodeManager to stop leaving around local files after application finishes. Contributed by Siddharth Seth.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Oct  7 15:23:20 2011
@@ -1540,6 +1540,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3141. Fix the broken MRAppMaster to work over YARN in security
     mode.(vinodkv)
 
+    MAPREDUCE-2751. Modified NodeManager to stop leaving around local files
+    after application finishes. (Siddharth Seth via vinodkv)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Fri Oct  7 15:23:20 2011
@@ -199,13 +199,18 @@ public class DefaultContainerExecutor ex
       throws IOException, InterruptedException {
     if (baseDirs == null || baseDirs.length == 0) {
       LOG.info("Deleting absolute path : " + subDir);
-      lfs.delete(subDir, true);
+      if (!lfs.delete(subDir, true)) {
+        //Maybe retry
+        LOG.warn("delete returned false for path: [" + subDir + "]");
+      }
       return;
     }
     for (Path baseDir : baseDirs) {
       Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
       LOG.info("Deleting path : " + del);
-      lfs.delete(del, true);
+      if (!lfs.delete(del, true)) {
+        LOG.warn("delete returned false for path: [" + del + "]");
+      }
     }
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Fri Oct  7 15:23:20 2011
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import static java.util.concurrent.TimeUnit.*;
@@ -125,6 +124,7 @@ public class DeletionService extends Abs
         }
       } else {
         try {
+          LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
           exec.deleteAsUser(user, subDir, baseDirs);
         } catch (IOException e) {
           LOG.warn("Failed to delete as user " + user, e);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Fri Oct  7 15:23:20 2011
@@ -286,6 +286,8 @@ public class ContainerManagerImpl extend
     StartContainerResponse response =
         recordFactory.newRecordInstance(StartContainerResponse.class);
     response.addAllServiceResponse(auxiluaryServices.getMeta());
+    // TODO launchedContainer misplaced -> doesn't necessarily mean a container
+    // launch. A finished Application will not launch containers.
     metrics.launchedContainer();
     metrics.allocateContainer(launchContext.getResource());
     return response;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Fri Oct  7 15:23:20 2011
@@ -152,6 +152,7 @@ public class ApplicationImpl implements 
   /**
    * Notify services of new application.
    */
+  @SuppressWarnings("unchecked")
   static class AppInitTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
@@ -180,6 +181,7 @@ public class ApplicationImpl implements 
     }
   }
 
+  @SuppressWarnings("unchecked")
   static class AppInitDoneTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
@@ -199,6 +201,7 @@ public class ApplicationImpl implements 
     }
   }
 
+  @SuppressWarnings("unchecked")
   static class DuplicateAppInitTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
@@ -229,6 +232,7 @@ public class ApplicationImpl implements 
     }
   }
 
+  @SuppressWarnings("unchecked")
   void handleAppFinishWithContainersCleanedup() {
     // Delete Application level resources
     this.dispatcher.getEventHandler().handle(
@@ -238,6 +242,7 @@ public class ApplicationImpl implements 
     // TODO: Trigger the LogsManager
   }
 
+  @SuppressWarnings("unchecked")
   static class AppFinishTriggeredTransition
       implements
       MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@@ -286,6 +291,7 @@ public class ApplicationImpl implements 
 
   }
 
+  @SuppressWarnings("unchecked")
   static class AppCompletelyDoneTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Fri Oct  7 15:23:20 2011
@@ -27,7 +27,6 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -124,19 +123,18 @@ public class ContainerLaunch implements 
       FileContext lfs = FileContext.getLocalFSFileContext();
       LocalDirAllocator lDirAllocator =
           new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
+
       Path nmPrivateContainerScriptPath =
           lDirAllocator.getLocalPathForWrite(
-              ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
-                  + appIdStr + Path.SEPARATOR + containerIdStr
-                  + Path.SEPARATOR + CONTAINER_SCRIPT, this.conf);
+              getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+                  + CONTAINER_SCRIPT, this.conf);
       Path nmPrivateTokensPath =
           lDirAllocator.getLocalPathForWrite(
-              ResourceLocalizationService.NM_PRIVATE_DIR
-                  + Path.SEPARATOR
-                  + containerIdStr
+              getContainerPrivateDir(appIdStr, containerIdStr)
                   + Path.SEPARATOR
                   + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
                       containerIdStr), this.conf);
+
       DataOutputStream containerScriptOutStream = null;
       DataOutputStream tokensOutStream = null;
 
@@ -229,6 +227,16 @@ public class ContainerLaunch implements 
     return 0;
   }
 
+  private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
+    return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+        + Path.SEPARATOR;
+  }
+
+  private String getAppPrivateDir(String appIdStr) {
+    return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+        + appIdStr;
+  }
+
   private static class ShellScriptBuilder {
     
     private final StringBuilder sb;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Oct  7 15:23:20 2011
@@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.no
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -37,6 +40,9 @@ import org.apache.hadoop.yarn.server.nod
 class LocalResourcesTrackerImpl implements LocalResourcesTracker {
 
   static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
+  private static final String RANDOM_DIR_REGEX = "-?\\d+";
+  private static final Pattern RANDOM_DIR_PATTERN = Pattern
+      .compile(RANDOM_DIR_REGEX);
 
   private final String user;
   private final Dispatcher dispatcher;
@@ -83,28 +89,44 @@ class LocalResourcesTrackerImpl implemen
 
   @Override
   public boolean remove(LocalizedResource rem, DeletionService delService) {
-    // current synchronization guaranteed by crude RLS event for cleanup
+ // current synchronization guaranteed by crude RLS event for cleanup
     LocalizedResource rsrc = localrsrc.get(rem.getRequest());
     if (null == rsrc) {
-      LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
-          " from " + getUser());
+      LOG.error("Attempt to remove absent resource: " + rem.getRequest()
+          + " from " + getUser());
       return true;
     }
     if (rsrc.getRefCount() > 0
-        || ResourceState.DOWNLOADING.equals(rsrc.getState())
-        || rsrc != rem) {
+        || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) {
       // internal error
-      LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
+      LOG.error("Attempt to remove resource: " + rsrc
+          + " with non-zero refcount");
       assert false;
       return false;
     }
-    localrsrc.remove(rem.getRequest());
     if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
-      delService.delete(getUser(), rsrc.getLocalPath());
+      delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
     }
     return true;
   }
 
+
+  /**
+   * Returns the path upto the random directory component.
+   */
+  private Path getPathToDelete(Path localPath) {
+    Path delPath = localPath.getParent();
+    String name = delPath.getName();
+    Matcher matcher = RANDOM_DIR_PATTERN.matcher(name);
+    if (matcher.matches()) {
+      return delPath;
+    } else {
+      LOG.warn("Random directroy component did not match. " +
+      		"Deleting localized path only");
+      return localPath;
+    }
+  }
+
   @Override
   public String getUser() {
     return user;
@@ -114,5 +136,4 @@ class LocalResourcesTrackerImpl implemen
   public Iterator<LocalizedResource> iterator() {
     return localrsrc.values().iterator();
   }
-
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri Oct  7 15:23:20 2011
@@ -304,6 +304,7 @@ public class ResourceLocalizationService
         retain.addResources(t);
         LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
       }
+      //TODO Check if appRsrcs should also be added to the retention set.
       break;
     case CLEANUP_CONTAINER_RESOURCES:
       ContainerLocalizationCleanupEvent rsrcCleanup =
@@ -336,6 +337,7 @@ public class ResourceLocalizationService
         delService.delete(userName, containerDir, new Path[] {});
 
         // Delete the nmPrivate container-dir
+        
         Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
         Path appSysDir = new Path(sysDir, appIDStr);
         Path containerSysDir = new Path(appSysDir, containerIDStr);
@@ -762,14 +764,16 @@ public class ResourceLocalizationService
     @Override
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
+      Path nmPrivateCTokensPath = null;
       try {
         // Use LocalDirAllocator to get nmPrivateDir
-        Path nmPrivateCTokensPath =
+        nmPrivateCTokensPath =
             localDirsSelector.getLocalPathForWrite(
                 NM_PRIVATE_DIR
                     + Path.SEPARATOR
                     + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
                         localizerId), getConfig());
+
         // 0) init queue, etc.
         // 1) write credentials to private dir
         DataOutputStream tokenOut = null;
@@ -811,6 +815,7 @@ public class ResourceLocalizationService
         for (LocalizerResourceRequestEvent event : scheduled.values()) {
           event.getResource().unlock();
         }
+        delService.delete(null, nmPrivateCTokensPath, new Path[] {});
       }
     }
 

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java?rev=1180071&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java Fri Oct  7 15:23:20 2011
@@ -0,0 +1,418 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+public class TestApplication {
+
+  /**
+   * All container start events before application running.
+   */
+  @Test
+  public void testApplicationInit1() {
+    WrappedApplication wa = null;
+    try {
+      wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
+      wa.initApplication(1);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      assertEquals(1, wa.app.getContainers().size());
+      wa.initApplication(0);
+      wa.initApplication(2);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      assertEquals(3, wa.app.getContainers().size());
+      wa.applicationInited();
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+      for (int i = 0; i < wa.containers.size(); i++) {
+        verify(wa.containerBus).handle(
+            argThat(new ContainerInitMatcher(wa.containers.get(i)
+                .getContainerID())));
+      }
+    } finally {
+      if (wa != null)
+        wa.finished();
+    }
+  }
+
+  /**
+   * Container start events after Application Running
+   */
+  @Test
+  public void testApplicationInit2() {
+    WrappedApplication wa = null;
+    try {
+      wa = new WrappedApplication(2, 314159265358979L, "yak", 3);
+      wa.initApplication(0);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      assertEquals(1, wa.app.getContainers().size());
+
+      wa.applicationInited();
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+      verify(wa.containerBus).handle(
+          argThat(new ContainerInitMatcher(wa.containers.get(0)
+              .getContainerID())));
+
+      wa.initApplication(1);
+      wa.initApplication(2);
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+      assertEquals(3, wa.app.getContainers().size());
+
+      for (int i = 1; i < wa.containers.size(); i++) {
+        verify(wa.containerBus).handle(
+            argThat(new ContainerInitMatcher(wa.containers.get(i)
+                .getContainerID())));
+      }
+    } finally {
+      if (wa != null)
+        wa.finished();
+    }
+  }
+
+  /**
+   * App state RUNNING after all containers complete, before RM sends
+   * APP_FINISHED
+   */
+  @Test
+  public void testAppRunningAfterContainersComplete() {
+    WrappedApplication wa = null;
+    try {
+      wa = new WrappedApplication(3, 314159265358979L, "yak", 3);
+      wa.initApplication(-1);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      wa.applicationInited();
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+      wa.containerFinished(0);
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+      assertEquals(2, wa.app.getContainers().size());
+
+      wa.containerFinished(1);
+      wa.containerFinished(2);
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+      assertEquals(0, wa.app.getContainers().size());
+    } finally {
+      if (wa != null)
+        wa.finished();
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAppFinishedOnRunningContainers() {
+    WrappedApplication wa = null;
+    try {
+      wa = new WrappedApplication(4, 314159265358979L, "yak", 3);
+      wa.initApplication(-1);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      wa.applicationInited();
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+      wa.containerFinished(0);
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+      assertEquals(2, wa.app.getContainers().size());
+
+      wa.appFinished();
+      assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+          wa.app.getApplicationState());
+      assertEquals(2, wa.app.getContainers().size());
+
+      for (int i = 1; i < wa.containers.size(); i++) {
+        verify(wa.containerBus).handle(
+            argThat(new ContainerKillMatcher(wa.containers.get(i)
+                .getContainerID())));
+      }
+
+      wa.containerFinished(1);
+      assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+          wa.app.getApplicationState());
+      assertEquals(1, wa.app.getContainers().size());
+
+      reset(wa.localizerBus);
+      wa.containerFinished(2);
+      // All containers finished. Cleanup should be called.
+      assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+          wa.app.getApplicationState());
+      assertEquals(0, wa.app.getContainers().size());
+
+      verify(wa.localizerBus).handle(
+          refEq(new ApplicationLocalizationEvent(
+              LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+
+      wa.appResourcesCleanedup();
+      assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+
+    } finally {
+      if (wa != null)
+        wa.finished();
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testAppFinishedOnCompletedContainers() {
+    WrappedApplication wa = null;
+    try {
+      wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
+      wa.initApplication(-1);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      wa.applicationInited();
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+      reset(wa.localizerBus);
+      wa.containerFinished(0);
+      wa.containerFinished(1);
+      wa.containerFinished(2);
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+      assertEquals(0, wa.app.getContainers().size());
+
+      wa.appFinished();
+      assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+          wa.app.getApplicationState());
+
+      verify(wa.localizerBus).handle(
+          refEq(new ApplicationLocalizationEvent(
+              LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+
+      wa.appResourcesCleanedup();
+      assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+    } finally {
+      if (wa != null)
+        wa.finished();
+    }
+  }
+
+//TODO Re-work after Application transitions are changed.
+//  @Test
+  @SuppressWarnings("unchecked")
+  public void testStartContainerAfterAppFinished() {
+    WrappedApplication wa = null;
+    try {
+      wa = new WrappedApplication(5, 314159265358979L, "yak", 3);
+      wa.initApplication(-1);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      wa.applicationInited();
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+
+      reset(wa.localizerBus);
+      wa.containerFinished(0);
+      wa.containerFinished(1);
+      wa.containerFinished(2);
+      assertEquals(ApplicationState.RUNNING, wa.app.getApplicationState());
+      assertEquals(0, wa.app.getContainers().size());
+
+      wa.appFinished();
+      assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+          wa.app.getApplicationState());
+      verify(wa.localizerBus).handle(
+          refEq(new ApplicationLocalizationEvent(
+              LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+
+      wa.appResourcesCleanedup();
+      assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+    } finally {
+      if (wa != null)
+        wa.finished();
+    }
+  }
+
+//TODO Re-work after Application transitions are changed.
+//  @Test
+  @SuppressWarnings("unchecked")
+  public void testAppFinishedOnIniting() {
+    // AM may send a startContainer() - AM APP_FINIHSED processed after
+    // APP_FINISHED on another NM
+    WrappedApplication wa = null;
+    try {
+      wa = new WrappedApplication(1, 314159265358979L, "yak", 3);
+      wa.initApplication(0);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      assertEquals(1, wa.app.getContainers().size());
+
+      reset(wa.localizerBus);
+      wa.appFinished();
+
+      verify(wa.containerBus).handle(
+          argThat(new ContainerKillMatcher(wa.containers.get(0)
+              .getContainerID())));
+      assertEquals(ApplicationState.FINISHING_CONTAINERS_WAIT,
+          wa.app.getApplicationState());
+
+      wa.containerFinished(0);
+      assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+          wa.app.getApplicationState());
+      verify(wa.localizerBus).handle(
+          refEq(new ApplicationLocalizationEvent(
+              LocalizationEventType.DESTROY_APPLICATION_RESOURCES, wa.app)));
+
+      wa.initApplication(1);
+      assertEquals(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+          wa.app.getApplicationState());
+      assertEquals(0, wa.app.getContainers().size());
+
+      wa.appResourcesCleanedup();
+      assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+    } finally {
+      if (wa != null)
+        wa.finished();
+    }
+  }
+
+  private class ContainerKillMatcher extends ArgumentMatcher<ContainerEvent> {
+    private ContainerId cId;
+
+    public ContainerKillMatcher(ContainerId cId) {
+      this.cId = cId;
+    }
+
+    @Override
+    public boolean matches(Object argument) {
+      if (argument instanceof ContainerKillEvent) {
+        ContainerKillEvent event = (ContainerKillEvent) argument;
+        return event.getContainerID().equals(cId);
+      }
+      return false;
+    }
+  }
+
+  private class ContainerInitMatcher extends ArgumentMatcher<ContainerEvent> {
+    private ContainerId cId;
+
+    public ContainerInitMatcher(ContainerId cId) {
+      this.cId = cId;
+    }
+
+    @Override
+    public boolean matches(Object argument) {
+      if (argument instanceof ContainerInitEvent) {
+        ContainerInitEvent event = (ContainerInitEvent) argument;
+        return event.getContainerID().equals(cId);
+      }
+      return false;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private class WrappedApplication {
+    final DrainDispatcher dispatcher;
+    final EventHandler<LocalizationEvent> localizerBus;
+    final EventHandler<ContainersLauncherEvent> launcherBus;
+    final EventHandler<ContainersMonitorEvent> monitorBus;
+    final EventHandler<AuxServicesEvent> auxBus;
+    final EventHandler<ContainerEvent> containerBus;
+    final EventHandler<LogAggregatorEvent> logAggregationBus;
+    final String user;
+    final List<Container> containers;
+
+    final ApplicationId appId;
+    final Application app;
+
+    WrappedApplication(int id, long timestamp, String user, int numContainers) {
+      dispatcher = new DrainDispatcher();
+      dispatcher.init(null);
+
+      localizerBus = mock(EventHandler.class);
+      launcherBus = mock(EventHandler.class);
+      monitorBus = mock(EventHandler.class);
+      auxBus = mock(EventHandler.class);
+      containerBus = mock(EventHandler.class);
+      logAggregationBus = mock(EventHandler.class);
+
+      dispatcher.register(LocalizationEventType.class, localizerBus);
+      dispatcher.register(ContainersLauncherEventType.class, launcherBus);
+      dispatcher.register(ContainersMonitorEventType.class, monitorBus);
+      dispatcher.register(AuxServicesEventType.class, auxBus);
+      dispatcher.register(ContainerEventType.class, containerBus);
+      dispatcher.register(LogAggregatorEventType.class, logAggregationBus);
+
+      this.user = user;
+      this.appId = BuilderUtils.newApplicationId(timestamp, id);
+
+      app = new ApplicationImpl(dispatcher, this.user, appId, null);
+      containers = new ArrayList<Container>();
+      for (int i = 0; i < numContainers; i++) {
+        containers.add(createMockedContainer(this.appId, i));
+      }
+
+      dispatcher.start();
+    }
+
+    private void drainDispatcherEvents() {
+      dispatcher.await();
+    }
+
+    public void finished() {
+      dispatcher.stop();
+    }
+
+    public void initApplication(int containerNum) {
+      if (containerNum == -1) {
+        for (int i = 0; i < containers.size(); i++) {
+          app.handle(new ApplicationInitEvent(containers.get(i)));
+        }
+      } else {
+        app.handle(new ApplicationInitEvent(containers.get(containerNum)));
+      }
+      drainDispatcherEvents();
+    }
+
+    public void containerFinished(int containerNum) {
+      app.handle(new ApplicationContainerFinishedEvent(containers.get(
+          containerNum).getContainerID()));
+      drainDispatcherEvents();
+    }
+
+    public void applicationInited() {
+      app.handle(new ApplicationInitedEvent(appId));
+      drainDispatcherEvents();
+    }
+
+    public void appFinished() {
+      app.handle(new ApplicationEvent(appId,
+          ApplicationEventType.FINISH_APPLICATION));
+      drainDispatcherEvents();
+    }
+
+    public void appResourcesCleanedup() {
+      app.handle(new ApplicationEvent(appId,
+          ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+      drainDispatcherEvents();
+    }
+  }
+
+  private Container createMockedContainer(ApplicationId appId, int containerId) {
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+    ContainerId cId = BuilderUtils.newContainerId(appAttemptId, containerId);
+    Container c = mock(Container.class);
+    when(c.getContainerID()).thenReturn(cId);
+    return c;
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1180071&r1=1180070&r2=1180071&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Fri Oct  7 15:23:20 2011
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.util.Conve
 import org.junit.Test;
 import static org.junit.Assert.*;
 
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import static org.mockito.Mockito.*;
 
@@ -355,7 +356,8 @@ public class TestResourceLocalizationSer
     dispatcher.register(ContainerEventType.class, containerBus);
 
     ContainerExecutor exec = mock(ContainerExecutor.class);
-    DeletionService delService = new DeletionService(exec);
+    DeletionService delServiceReal = new DeletionService(exec);
+    DeletionService delService = spy(delServiceReal);
     delService.init(null);
     delService.start();
 
@@ -407,12 +409,14 @@ public class TestResourceLocalizationSer
       rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
       spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
       // Sigh. Thread init of private localizer not accessible
-      Thread.sleep(500);
+      Thread.sleep(1000);
       dispatcher.await();
       String appStr = ConverterUtils.toString(appId);
       String ctnrStr = c.getContainerID().toString();
-      verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class),
-            eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+      ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
+      verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class),
+        eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+      Path localizationTokenPath = tokenPathCaptor.getValue();
 
       // heartbeat from localizer
       LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
@@ -454,10 +458,13 @@ public class TestResourceLocalizationSer
         };
       dispatcher.await();
       verify(containerBus).handle(argThat(matchesContainerLoc));
+      
+      // Verify deletion of localization token.
+      verify(delService).delete((String)isNull(), eq(localizationTokenPath));
     } finally {
-      delService.stop();
-      dispatcher.stop();
       spyService.stop();
+      dispatcher.stop();
+      delService.stop();
     }
   }