You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/09/11 08:29:06 UTC

svn commit: r1167677 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/a...

Author: vinodkv
Date: Sun Sep 11 06:29:05 2011
New Revision: 1167677

URL: http://svn.apache.org/viewvc?rev=1167677&view=rev
Log:
MAPREDUCE-2691. svn merge -c r1167676 --ignore-ancestry ../../trunk/

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java
      - copied unchanged from r1167676, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationCleanupEvent.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Sun Sep 11 06:29:05 2011
@@ -1252,6 +1252,9 @@ Release 0.23.0 - Unreleased
    MAPREDUCE-2971. ant build mapreduce fails protected access jc.displayJobList
    (jobs) (Thomas Graves via mahadev)
 
+   MAPREDUCE-2691. Finishing up the cleanup of distributed cache file resources
+   and related tests. (Siddharth Seth via vinodkv)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sun Sep 11 06:29:05 2011
@@ -711,7 +711,7 @@ public abstract class TaskAttemptImpl im
         String linkName = name.toUri().getPath();
         container.setLocalResource(
             linkName,
-            BuilderUtils.newLocalResource(recordFactory,
+            BuilderUtils.newLocalResource(
                 p.toUri(), type, 
                 visibilities[i]
                   ? LocalResourceVisibility.PUBLIC

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Sun Sep 11 06:29:05 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
@@ -86,12 +87,11 @@ public class BuilderUtils {
     }
   }
 
-  public static LocalResource newLocalResource(RecordFactory recordFactory,
-      URI uri, LocalResourceType type, LocalResourceVisibility visibility,
-      long size, long timestamp) {
+  public static LocalResource newLocalResource(URL url, LocalResourceType type,
+      LocalResourceVisibility visibility, long size, long timestamp) {
     LocalResource resource =
-        recordFactory.newRecordInstance(LocalResource.class);
-    resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
+      recordFactory.newRecordInstance(LocalResource.class);
+    resource.setResource(url);
     resource.setType(type);
     resource.setVisibility(visibility);
     resource.setSize(size);
@@ -99,6 +99,13 @@ public class BuilderUtils {
     return resource;
   }
 
+  public static LocalResource newLocalResource(URI uri,
+      LocalResourceType type, LocalResourceVisibility visibility, long size,
+      long timestamp) {
+    return newLocalResource(ConverterUtils.getYarnUrlFromURI(uri), type,
+        visibility, size, timestamp);
+  }
+
   public static ApplicationId newApplicationId(RecordFactory recordFactory,
       long clustertimestamp, CharSequence id) {
     ApplicationId applicationId =
@@ -125,6 +132,15 @@ public class BuilderUtils {
     return applicationId;
   }
 
+  public static ApplicationAttemptId newApplicationAttemptId(
+      ApplicationId appId, int attemptId) {
+    ApplicationAttemptId appAttemptId =
+        recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(attemptId);
+    return appAttemptId;
+  }
+
   public static ApplicationId convert(long clustertimestamp, CharSequence id) {
     ApplicationId applicationId =
         recordFactory.newRecordInstance(ApplicationId.class);
@@ -133,6 +149,24 @@ public class BuilderUtils {
     return applicationId;
   }
 
+  public static ContainerId newContainerId(ApplicationAttemptId appAttemptId,
+      int containerId) {
+    ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
+    id.setAppId(appAttemptId.getApplicationId());
+    id.setId(containerId);
+    id.setAppAttemptId(appAttemptId);
+    return id;
+  }
+
+  public static ContainerId newContainerId(int appId, int appAttemptId,
+      long timestamp, int id) {
+    ApplicationId applicationId = newApplicationId(timestamp, appId);
+    ApplicationAttemptId applicationAttemptId = newApplicationAttemptId(
+        applicationId, appAttemptId);
+    ContainerId cId = newContainerId(applicationAttemptId, id);
+    return cId;
+  }
+
   public static ContainerId newContainerId(RecordFactory recordFactory,
       ApplicationId appId, ApplicationAttemptId appAttemptId,
       int containerId) {
@@ -227,4 +261,20 @@ public class BuilderUtils {
     report.setStartTime(startTime);
     return report;
   }
+  
+  public static Resource newResource(int memory) {
+    Resource resource = recordFactory.newRecordInstance(Resource.class);
+    resource.setMemory(memory);
+    return resource;
+  }
+  
+  public static URL newURL(String scheme, String host, int port, String file) {
+    URL url = recordFactory.newRecordInstance(URL.class);
+    url.setScheme(scheme);
+    url.setHost(host);
+    url.setPort(port);
+    url.setFile(file);
+    return url;
+  }
+  
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Sun Sep 11 06:29:05 2011
@@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.no
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -81,6 +84,12 @@ public class ContainerImpl implements Co
     new HashMap<LocalResourceRequest,String>();
   private final Map<Path,String> localizedResources =
     new HashMap<Path,String>();
+  private final List<LocalResourceRequest> publicRsrcs =
+    new ArrayList<LocalResourceRequest>();
+  private final List<LocalResourceRequest> privateRsrcs =
+    new ArrayList<LocalResourceRequest>();
+  private final List<LocalResourceRequest> appRsrcs =
+    new ArrayList<LocalResourceRequest>();
 
   public ContainerImpl(Dispatcher dispatcher,
       ContainerLaunchContext launchContext, Credentials creds,
@@ -361,7 +370,7 @@ public class ContainerImpl implements Co
     }
   }
 
-  @SuppressWarnings("fallthrough")
+  @SuppressWarnings({"fallthrough", "unchecked"})
   private void finished() {
     switch (getContainerState()) {
       case EXITED_WITH_SUCCESS:
@@ -404,6 +413,24 @@ public class ContainerImpl implements Co
         containerID, exitCode));
   }
 
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  public void cleanup() {
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
+      new HashMap<LocalResourceVisibility, 
+                  Collection<LocalResourceRequest>>();
+    if (!publicRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.PUBLIC, publicRsrcs);
+    }
+    if (!privateRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.PRIVATE, privateRsrcs);
+    }
+    if (!appRsrcs.isEmpty()) {
+      rsrc.put(LocalResourceVisibility.APPLICATION, appRsrcs);
+    }
+    dispatcher.getEventHandler().handle(
+        new ContainerLocalizationCleanupEvent(this, rsrc));
+  }
+
   static class ContainerTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
 
@@ -439,12 +466,6 @@ public class ContainerImpl implements Co
       // Send requests for public, private resources
       Map<String,LocalResource> cntrRsrc = ctxt.getAllLocalResources();
       if (!cntrRsrc.isEmpty()) {
-        ArrayList<LocalResourceRequest> publicRsrc =
-          new ArrayList<LocalResourceRequest>();
-        ArrayList<LocalResourceRequest> privateRsrc =
-          new ArrayList<LocalResourceRequest>();
-        ArrayList<LocalResourceRequest> appRsrc =
-          new ArrayList<LocalResourceRequest>();
         try {
           for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
             try {
@@ -453,13 +474,13 @@ public class ContainerImpl implements Co
             container.pendingResources.put(req, rsrc.getKey());
             switch (rsrc.getValue().getVisibility()) {
             case PUBLIC:
-              publicRsrc.add(req);
+              container.publicRsrcs.add(req);
               break;
             case PRIVATE:
-              privateRsrc.add(req);
+              container.privateRsrcs.add(req);
               break;
             case APPLICATION:
-              appRsrc.add(req);
+              container.appRsrcs.add(req);
               break;
             }
             } catch (URISyntaxException e) {
@@ -471,27 +492,25 @@ public class ContainerImpl implements Co
         } catch (URISyntaxException e) {
           // malformed resource; abort container launch
           LOG.warn("Failed to parse resource-request", e);
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationEvent(
-               LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+          container.cleanup();
           container.metrics.endInitingContainer();
           return ContainerState.LOCALIZATION_FAILED;
         }
-        if (!publicRsrc.isEmpty()) {
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationRequestEvent(
-                container, publicRsrc, LocalResourceVisibility.PUBLIC));
+        Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+            new HashMap<LocalResourceVisibility, 
+                        Collection<LocalResourceRequest>>();
+        if (!container.publicRsrcs.isEmpty()) {
+          req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
         }
-        if (!privateRsrc.isEmpty()) {
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationRequestEvent(
-                container, privateRsrc, LocalResourceVisibility.PRIVATE));
+        if (!container.privateRsrcs.isEmpty()) {
+          req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
         }
-        if (!appRsrc.isEmpty()) {
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationRequestEvent(
-                container, appRsrc, LocalResourceVisibility.APPLICATION));
+        if (!container.appRsrcs.isEmpty()) {
+          req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
         }
+        
+        container.dispatcher.getEventHandler().handle(
+              new ContainerLocalizationRequestEvent(container, req));
         return ContainerState.LOCALIZING;
       } else {
         container.dispatcher.getEventHandler().handle(
@@ -546,7 +565,6 @@ public class ContainerImpl implements Co
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ExitedWithSuccessTransition extends ContainerTransition {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
@@ -554,13 +572,10 @@ public class ContainerImpl implements Co
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ExitedWithFailureTransition extends ContainerTransition {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
@@ -572,13 +587,10 @@ public class ContainerImpl implements Co
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ResourceFailedTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -592,30 +604,24 @@ public class ContainerImpl implements Co
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
       container.metrics.endInitingContainer();
     }
   }
   
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class KillDuringLocalizationTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
       container.metrics.endInitingContainer();
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
       container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class LocalizedResourceDuringKillTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -647,7 +653,6 @@ public class ContainerImpl implements Co
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ContainerKilledTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -657,13 +662,10 @@ public class ContainerImpl implements Co
 
       // The process/process-grp is killed. Decrement reference counts and
       // cleanup resources
-      container.dispatcher.getEventHandler().handle(
-          new ContainerLocalizationEvent(
-            LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+      container.cleanup();
     }
   }
 
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class ContainerDoneTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -697,7 +699,8 @@ public class ContainerImpl implements Co
         newState =
             stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.warn("Can't handle this event at current state", e);
+        LOG.warn("Can't handle this event at current state: Current: ["
+            + oldState + "], eventType: [" + event.getType() + "]", e);
       }
       if (oldState != newState) {
         LOG.info("Container " + containerID + " transitioned from "

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Sun Sep 11 06:29:05 2011
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -274,7 +273,7 @@ public class ContainerLocalizer {
           stat.setLocalPath(
               ConverterUtils.getYarnUrlFromPath(localPath));
           stat.setLocalSize(
-              FileUtil.getDU(new File(localPath.getParent().toString())));
+              FileUtil.getDU(new File(localPath.getParent().toUri())));
           stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
         } catch (ExecutionException e) {
           stat.setStatus(ResourceStatusType.FETCH_FAILURE);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Sun Sep 11 06:29:05 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.nod
  * {@link LocalResourceVisibility}.
  * 
  */
+
 class LocalResourcesTrackerImpl implements LocalResourcesTracker {
 
   static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
@@ -83,7 +84,7 @@ class LocalResourcesTrackerImpl implemen
   @Override
   public boolean remove(LocalizedResource rem, DeletionService delService) {
     // current synchronization guaranteed by crude RLS event for cleanup
-    LocalizedResource rsrc = localrsrc.remove(rem.getRequest());
+    LocalizedResource rsrc = localrsrc.get(rem.getRequest());
     if (null == rsrc) {
       LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
           " from " + getUser());
@@ -93,10 +94,11 @@ class LocalResourcesTrackerImpl implemen
         || ResourceState.DOWNLOADING.equals(rsrc.getState())
         || rsrc != rem) {
       // internal error
-      LOG.error("Attempt to remove resource with non-zero refcount");
+      LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
       assert false;
       return false;
     }
+    localrsrc.remove(rem.getRequest());
     if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
       delService.delete(getUser(), rsrc.getLocalPath());
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Sun Sep 11 06:29:05 2011
@@ -120,7 +120,8 @@ public class LocalizedResource implement
     for (ContainerId c : ref) {
       sb.append("(").append(c.toString()).append(")");
     }
-    sb.append("],").append(getTimestamp()).append("}");
+    sb.append("],").append(getTimestamp()).append(",")
+      .append(getState()).append("}");
     return sb.toString();
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Sun Sep 11 06:29:05 2011
@@ -22,6 +22,7 @@ import java.io.File;
 
 import java.net.URISyntaxException;
 
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -43,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -63,7 +65,6 @@ import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -93,7 +94,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
@@ -101,6 +102,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
@@ -198,7 +200,7 @@ public class ResourceLocalizationService
       conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
     localizationServerAddress = NetUtils.createSocketAddr(
       conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
-    localizerTracker = new LocalizerTracker(conf);
+    localizerTracker = createLocalizerTracker(conf);
     dispatcher.register(LocalizerEventType.class, localizerTracker);
     cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
         cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
@@ -218,6 +220,10 @@ public class ResourceLocalizationService
     super.start();
   }
 
+  LocalizerTracker createLocalizerTracker(Configuration conf) {
+    return new LocalizerTracker(conf);
+  }
+
   Server createServer() {
     YarnRPC rpc = YarnRPC.create(getConfig());
     Configuration conf = new Configuration(getConfig()); // Clone to separate
@@ -252,6 +258,9 @@ public class ResourceLocalizationService
   public void handle(LocalizationEvent event) {
     String userName;
     String appIDStr;
+    Container c;
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
+    LocalResourcesTracker tracker;
     // TODO: create log dir as $logdir/$user/$appId
     switch (event.getType()) {
     case INIT_APPLICATION_RESOURCES:
@@ -276,28 +285,16 @@ public class ResourceLocalizationService
     case INIT_CONTAINER_RESOURCES:
       ContainerLocalizationRequestEvent rsrcReqs =
         (ContainerLocalizationRequestEvent) event;
-      Container c = rsrcReqs.getContainer();
+      c = rsrcReqs.getContainer();
       LocalizerContext ctxt = new LocalizerContext(
           c.getUser(), c.getContainerID(), c.getCredentials());
-      final LocalResourcesTracker tracker;
-      LocalResourceVisibility vis = rsrcReqs.getVisibility();
-      switch (vis) {
-      default:
-      case PUBLIC:
-        tracker = publicRsrc;
-        break;
-      case PRIVATE:
-        tracker = privateRsrc.get(c.getUser());
-        break;
-      case APPLICATION:
-        tracker =
-          appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
-        break;
-      }
-      // We get separate events one each for all resources of one visibility. So
-      // all the resources in this event are of the same visibility.
-      for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
-        tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
+      rsrcs = rsrcReqs.getRequestedResources();
+      for (LocalResourceVisibility vis : rsrcs.keySet()) {
+        tracker = getLocalResourcesTracker(vis, c.getUser(), 
+            c.getContainerID().getAppId());
+        for (LocalResourceRequest req : rsrcs.get(vis)) {
+          tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
+        }
       }
       break;
     case CACHE_CLEANUP:
@@ -311,14 +308,23 @@ public class ResourceLocalizationService
       }
       break;
     case CLEANUP_CONTAINER_RESOURCES:
-      Container container =
-        ((ContainerLocalizationEvent)event).getContainer();
+      ContainerLocalizationCleanupEvent rsrcCleanup =
+        (ContainerLocalizationCleanupEvent) event;
+      c = rsrcCleanup.getContainer();
+      rsrcs = rsrcCleanup.getResources();
+      for (LocalResourceVisibility vis : rsrcs.keySet()) {
+        tracker = getLocalResourcesTracker(vis, c.getUser(), 
+            c.getContainerID().getAppId());
+        for (LocalResourceRequest req : rsrcs.get(vis)) {
+          tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
+        }
+      }
 
       // Delete the container directories
-      userName = container.getUser();
-      String containerIDStr = container.toString();
+      userName = c.getUser();
+      String containerIDStr = c.toString();
       appIDStr =
-        ConverterUtils.toString(container.getContainerID().getAppId());
+        ConverterUtils.toString(c.getContainerID().getAppId());
       for (Path localDir : localDirs) {
 
         // Delete the user-owned container-dir
@@ -336,8 +342,7 @@ public class ResourceLocalizationService
         delService.delete(null, containerSysDir,  new Path[] {});
       }
 
-      dispatcher.getEventHandler().handle(new ContainerEvent(
-            container.getContainerID(),
+      dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
             ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
       break;
     case DESTROY_APPLICATION_RESOURCES:
@@ -379,6 +384,19 @@ public class ResourceLocalizationService
     }
   }
 
+  LocalResourcesTracker getLocalResourcesTracker(
+      LocalResourceVisibility visibility, String user, ApplicationId appId) {
+    switch (visibility) {
+      default:
+      case PUBLIC:
+        return publicRsrc;
+      case PRIVATE:
+        return privateRsrc.get(user);
+      case APPLICATION:
+        return appRsrc.get(ConverterUtils.toString(appId));
+    }
+  }
+
   /**
    * Sub-component handling the spawning of {@link ContainerLocalizer}s
    */
@@ -526,6 +544,7 @@ public class ResourceLocalizationService
     }
 
     @Override
+    @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       try {
         // TODO shutdown, better error handling esp. DU
@@ -651,6 +670,7 @@ public class ResourceLocalizationService
     }
 
     // TODO this sucks. Fix it later
+    @SuppressWarnings("unchecked") // dispatcher not typed
     LocalizerHeartbeatResponse update(
         List<LocalResourceStatus> remoteResourceStatuses) {
       LocalizerHeartbeatResponse response =
@@ -795,6 +815,7 @@ public class ResourceLocalizationService
     }
 
     @Override
+    @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
       dispatcher.getEventHandler().handle(
           new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Sun Sep 11 06:29:05 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
 import java.util.Collection;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -26,27 +27,23 @@ import org.apache.hadoop.yarn.server.nod
 public class ContainerLocalizationRequestEvent extends
     ContainerLocalizationEvent {
 
-  private final LocalResourceVisibility vis;
-  private final Collection<LocalResourceRequest> reqs;
+  private final Map<LocalResourceVisibility, Collection<LocalResourceRequest>> 
+    rsrc;
 
   /**
-   * Event requesting the localization of the reqs all with visibility vis
+   * Event requesting the localization of the rsrc.
    * @param c
-   * @param reqs
-   * @param vis
+   * @param rsrc
    */
   public ContainerLocalizationRequestEvent(Container c,
-      Collection<LocalResourceRequest> reqs, LocalResourceVisibility vis) {
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc) {
     super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
-    this.vis = vis;
-    this.reqs = reqs;
+    this.rsrc = rsrc;
   }
 
-  public LocalResourceVisibility getVisibility() {
-    return vis;
+  public
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
+      getRequestedResources() {
+    return rsrc;
   }
-
-  public Collection<LocalResourceRequest> getRequestedResources() {
-    return reqs;
-  }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceReleaseEvent.java Sun Sep 11 06:29:05 2011
@@ -17,8 +17,6 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
-import java.net.URISyntaxException;
-
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 
@@ -26,8 +24,8 @@ public class ResourceReleaseEvent extend
 
   private final ContainerId container;
 
-  public ResourceReleaseEvent(LocalResourceRequest rsrc, ContainerId container)
-      throws URISyntaxException {
+  public ResourceReleaseEvent(LocalResourceRequest rsrc, 
+      ContainerId container) {
     super(rsrc, ResourceEventType.RELEASE);
     this.container = container;
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Sun Sep 11 06:29:05 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.junit.Assert.fail;
 
+import java.util.Collection;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -79,14 +81,17 @@ public class DummyContainerManager exten
           ContainerLocalizationRequestEvent rsrcReqs =
             (ContainerLocalizationRequestEvent) event;
           // simulate localization of all requested resources
-          for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
-            LOG.info("DEBUG: " + req + ":" +
-                rsrcReqs.getContainer().getContainerID());
-            dispatcher.getEventHandler().handle(
-                new ContainerResourceLocalizedEvent(
-                  rsrcReqs.getContainer().getContainerID(), req,
-                  new Path("file:///local" + req.getPath().toUri().getPath())));
-          }
+            for (Collection<LocalResourceRequest> rc : rsrcReqs
+                .getRequestedResources().values()) {
+              for (LocalResourceRequest req : rc) {
+                LOG.info("DEBUG: " + req + ":"
+                    + rsrcReqs.getContainer().getContainerID());
+                dispatcher.getEventHandler().handle(
+                    new ContainerResourceLocalizedEvent(rsrcReqs.getContainer()
+                        .getContainerID(), req, new Path("file:///local"
+                        + req.getPath().toUri().getPath())));
+              }
+            }
           break;
         case CLEANUP_CONTAINER_RESOURCES:
           Container container =

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Sun Sep 11 06:29:05 2011
@@ -17,208 +17,203 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import java.net.URISyntaxException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
-
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Random;
 import java.util.Map.Entry;
-import java.util.AbstractMap.SimpleEntry;
+import java.util.Random;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
-import static org.junit.Assert.*;
-
 import org.mockito.ArgumentMatcher;
-import static org.mockito.Mockito.*;
 
 public class TestContainer {
 
   final NodeManagerMetrics metrics = NodeManagerMetrics.create();
 
+  
   /**
    * Verify correct container request events sent to localizer.
    */
   @Test
-  @SuppressWarnings("unchecked") // mocked generic
   public void testLocalizationRequest() throws Exception {
-    DrainDispatcher dispatcher = new DrainDispatcher();
-    dispatcher.init(null);
+    WrappedContainer wc = null;
     try {
-      dispatcher.start();
-      EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
-      dispatcher.register(LocalizationEventType.class, localizerBus);
-      // null serviceData; no registered AuxServicesEventType handler
-
-      ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
-      ContainerId cId = getMockContainerId(7, 314159265358979L, 4344);
-      when(ctxt.getUser()).thenReturn("yak");
-      when(ctxt.getContainerId()).thenReturn(cId);
-
-      Random r = new Random();
-      long seed = r.nextLong();
-      r.setSeed(seed);
-      System.out.println("testLocalizationRequest seed: " + seed);
-      final Map<String,LocalResource> localResources = createLocalResources(r);
-      when(ctxt.getAllLocalResources()).thenReturn(localResources);
-
-      final Container c = newContainer(dispatcher, ctxt);
-      assertEquals(ContainerState.NEW, c.getContainerState());
+      wc = new WrappedContainer(7, 314159265358979L, 4344, "yak");
+      assertEquals(ContainerState.NEW, wc.c.getContainerState());
+      wc.initContainer();
 
       // Verify request for public/private resources to localizer
-      c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
-      dispatcher.await();
-      ContainerReqMatcher matchesPublicReq =
-        new ContainerReqMatcher(localResources,
-            EnumSet.of(LocalResourceVisibility.PUBLIC));
-      ContainerReqMatcher matchesPrivateReq =
-        new ContainerReqMatcher(localResources,
-            EnumSet.of(LocalResourceVisibility.PRIVATE));
-      ContainerReqMatcher matchesAppReq =
-        new ContainerReqMatcher(localResources,
-            EnumSet.of(LocalResourceVisibility.APPLICATION));
-      verify(localizerBus).handle(argThat(matchesPublicReq));
-      verify(localizerBus).handle(argThat(matchesPrivateReq));
-      verify(localizerBus).handle(argThat(matchesAppReq));
-      assertEquals(ContainerState.LOCALIZING, c.getContainerState());
-    } finally {
-      dispatcher.stop();
+      ResourcesRequestedMatcher matchesReq =
+          new ResourcesRequestedMatcher(wc.localResources, EnumSet.of(
+              LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
+              LocalResourceVisibility.APPLICATION));
+      verify(wc.localizerBus).handle(argThat(matchesReq));
+      assertEquals(ContainerState.LOCALIZING, wc.c.getContainerState());
     }
+    finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    } 
   }
 
   /**
    * Verify container launch when all resources already cached.
    */
   @Test
-  @SuppressWarnings("unchecked") // mocked generic
   public void testLocalizationLaunch() throws Exception {
-    DrainDispatcher dispatcher = new DrainDispatcher();
-    dispatcher.init(null);
+    WrappedContainer wc = null;
     try {
-      dispatcher.start();
-      EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
-      dispatcher.register(LocalizationEventType.class, localizerBus);
-      EventHandler<ContainersLauncherEvent> launcherBus =
-        mock(EventHandler.class);
-      dispatcher.register(ContainersLauncherEventType.class, launcherBus);
-      // null serviceData; no registered AuxServicesEventType handler
-
-      ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
-      ContainerId cId = getMockContainerId(8, 314159265358979L, 4344);
-      when(ctxt.getUser()).thenReturn("yak");
-      when(ctxt.getContainerId()).thenReturn(cId);
-
-      Random r = new Random();
-      long seed = r.nextLong();
-      r.setSeed(seed);
-      System.out.println("testLocalizationLaunch seed: " + seed);
-      final Map<String,LocalResource> localResources = createLocalResources(r);
-      when(ctxt.getAllLocalResources()).thenReturn(localResources);
-      final Container c = newContainer(dispatcher, ctxt);
-      assertEquals(ContainerState.NEW, c.getContainerState());
-
-      c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
-      dispatcher.await();
-
-      // Container prepared for localization events
-      Path cache = new Path("file:///cache");
-      Map<Path,String> localPaths = new HashMap<Path,String>();
-      for (Entry<String,LocalResource> rsrc : localResources.entrySet()) {
-        assertEquals(ContainerState.LOCALIZING, c.getContainerState());
-        LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
-        Path p = new Path(cache, rsrc.getKey());
-        localPaths.put(p, rsrc.getKey());
-        // rsrc copied to p
-        c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), req, p));
-      }
-      dispatcher.await();
+      wc = new WrappedContainer(8, 314159265358979L, 4344, "yak");
+      assertEquals(ContainerState.NEW, wc.c.getContainerState());
+      wc.initContainer();
+      Map<Path, String> localPaths = wc.localizeResources();
 
       // all resources should be localized
-      assertEquals(ContainerState.LOCALIZED, c.getContainerState());
-      for (Entry<Path,String> loc : c.getLocalizedResources().entrySet()) {
+      assertEquals(ContainerState.LOCALIZED, wc.c.getContainerState());
+      for (Entry<Path,String> loc : wc.c.getLocalizedResources().entrySet()) {
         assertEquals(localPaths.remove(loc.getKey()), loc.getValue());
       }
       assertTrue(localPaths.isEmpty());
 
+      final WrappedContainer wcf = wc;
       // verify container launch
       ArgumentMatcher<ContainersLauncherEvent> matchesContainerLaunch =
         new ArgumentMatcher<ContainersLauncherEvent>() {
           @Override
           public boolean matches(Object o) {
             ContainersLauncherEvent launchEvent = (ContainersLauncherEvent) o;
-            return c == launchEvent.getContainer();
+            return wcf.c == launchEvent.getContainer();
           }
         };
-      verify(launcherBus).handle(argThat(matchesContainerLaunch));
+      verify(wc.launcherBus).handle(argThat(matchesContainerLaunch));
     } finally {
-      dispatcher.stop();
+      if (wc != null) {
+        wc.finished();
+      }
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
+  public void testCleanupOnFailure() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(10, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.localizeResources();
+      wc.launchContainer();
+      reset(wc.localizerBus);
+      wc.containerFailed(ExitCode.KILLED.getExitCode());
+      assertEquals(ContainerState.EXITED_WITH_FAILURE, 
+          wc.c.getContainerState());
+      verifyCleanupCall(wc);
+    }
+    finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    } 
+  }
+  
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
+  public void testCleanupOnSuccess() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(11, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.localizeResources();
+      wc.launchContainer();
+      reset(wc.localizerBus);
+      wc.containerSuccessful();
+      assertEquals(ContainerState.EXITED_WITH_SUCCESS,
+          wc.c.getContainerState());
+      
+      verifyCleanupCall(wc);
+    }
+    finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+  
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
+  public void testCleanupOnKillRequest() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(12, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.localizeResources();
+      wc.launchContainer();
+      reset(wc.localizerBus);
+      wc.killContainer();
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      wc.containerKilledOnRequest();
+      
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+  
   /**
    * Verify serviceData correctly sent.
    */
   @Test
-  @SuppressWarnings("unchecked") // mocked generic
   public void testServiceData() throws Exception {
-    DrainDispatcher dispatcher = new DrainDispatcher();
-    dispatcher.init(null);
-    dispatcher.start();
+    WrappedContainer wc = null;
     try {
-      EventHandler<LocalizationEvent> localizerBus = mock(EventHandler.class);
-      dispatcher.register(LocalizationEventType.class, localizerBus);
-      EventHandler<AuxServicesEvent> auxBus = mock(EventHandler.class);
-      dispatcher.register(AuxServicesEventType.class, auxBus);
-      EventHandler<ContainersLauncherEvent> launchBus = mock(EventHandler.class);
-      dispatcher.register(ContainersLauncherEventType.class, launchBus);
-
-      ContainerLaunchContext ctxt = mock(ContainerLaunchContext.class);
-      final ContainerId cId = getMockContainerId(9, 314159265358979L, 4344);
-      when(ctxt.getUser()).thenReturn("yak");
-      when(ctxt.getContainerId()).thenReturn(cId);
-      when(ctxt.getAllLocalResources()).thenReturn(
-          Collections.<String,LocalResource>emptyMap());
-
-      Random r = new Random();
-      long seed = r.nextLong();
-      r.setSeed(seed);
-      System.out.println("testServiceData seed: " + seed);
-      final Map<String,ByteBuffer> serviceData = createServiceData(r);
-      when(ctxt.getAllServiceData()).thenReturn(serviceData);
-
-      final Container c = newContainer(dispatcher, ctxt);
-      assertEquals(ContainerState.NEW, c.getContainerState());
-
-      // Verify propagation of service data to AuxServices
-      c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
-      dispatcher.await();
-      for (final Map.Entry<String,ByteBuffer> e : serviceData.entrySet()) {
+      wc = new WrappedContainer(9, 314159265358979L, 4344, "yak", false, true);
+      assertEquals(ContainerState.NEW, wc.c.getContainerState());
+      wc.initContainer();
+      
+      for (final Map.Entry<String,ByteBuffer> e : wc.serviceData.entrySet()) {
         ArgumentMatcher<AuxServicesEvent> matchesServiceReq =
           new ArgumentMatcher<AuxServicesEvent>() {
             @Override
@@ -228,9 +223,10 @@ public class TestContainer {
                 && 0 == e.getValue().compareTo(evt.getServiceData());
             }
           };
-        verify(auxBus).handle(argThat(matchesServiceReq));
+        verify(wc.auxBus).handle(argThat(matchesServiceReq));
       }
 
+      final WrappedContainer wcf = wc;
       // verify launch on empty resource request
       ArgumentMatcher<ContainersLauncherEvent> matchesLaunchReq =
         new ArgumentMatcher<ContainersLauncherEvent>() {
@@ -238,61 +234,103 @@ public class TestContainer {
           public boolean matches(Object o) {
             ContainersLauncherEvent evt = (ContainersLauncherEvent) o;
             return evt.getType() == ContainersLauncherEventType.LAUNCH_CONTAINER
-              && cId == evt.getContainer().getContainerID();
+              && wcf.cId == evt.getContainer().getContainerID();
           }
         };
-      verify(launchBus).handle(argThat(matchesLaunchReq));
+      verify(wc.launcherBus).handle(argThat(matchesLaunchReq));
     } finally {
-      dispatcher.stop();
+      if (wc != null) {
+        wc.finished();
+      }
     }
   }
 
-  // Accept iff the resource request payload matches.
-  static class ContainerReqMatcher extends ArgumentMatcher<LocalizationEvent> {
+  private void verifyCleanupCall(WrappedContainer wc) throws Exception {
+    ResourcesReleasedMatcher matchesReq =
+        new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
+            LocalResourceVisibility.PUBLIC, LocalResourceVisibility.PRIVATE,
+            LocalResourceVisibility.APPLICATION));
+    verify(wc.localizerBus).handle(argThat(matchesReq));
+  }
+
+  private static class ResourcesReleasedMatcher extends
+      ArgumentMatcher<LocalizationEvent> {
     final HashSet<LocalResourceRequest> resources =
-      new HashSet<LocalResourceRequest>();
-    ContainerReqMatcher(Map<String,LocalResource> allResources,
+        new HashSet<LocalResourceRequest>();
+
+    ResourcesReleasedMatcher(Map<String, LocalResource> allResources,
         EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
-      for (Entry<String,LocalResource> e : allResources.entrySet()) {
+      for (Entry<String, LocalResource> e : allResources.entrySet()) {
         if (vis.contains(e.getValue().getVisibility())) {
           resources.add(new LocalResourceRequest(e.getValue()));
         }
       }
     }
+
     @Override
     public boolean matches(Object o) {
-      ContainerLocalizationRequestEvent evt = (ContainerLocalizationRequestEvent) o;
+      if (!(o instanceof ContainerLocalizationCleanupEvent)) {
+        return false;
+      }
+      ContainerLocalizationCleanupEvent evt =
+          (ContainerLocalizationCleanupEvent) o;
       final HashSet<LocalResourceRequest> expected =
-        new HashSet<LocalResourceRequest>(resources);
-      for (LocalResourceRequest rsrc : evt.getRequestedResources()) {
-        if (!expected.remove(rsrc)) {
-          return false;
+          new HashSet<LocalResourceRequest>(resources);
+      for (Collection<LocalResourceRequest> rc : evt.getResources().values()) {
+        for (LocalResourceRequest rsrc : rc) {
+          if (!expected.remove(rsrc)) {
+            return false;
+          }
         }
       }
       return expected.isEmpty();
     }
   }
 
-  static Entry<String,LocalResource> getMockRsrc(Random r,
-      LocalResourceVisibility vis) {
-    LocalResource rsrc = mock(LocalResource.class);
+  // Accept iff the resource payload matches.
+  private static class ResourcesRequestedMatcher extends
+      ArgumentMatcher<LocalizationEvent> {
+    final HashSet<LocalResourceRequest> resources =
+        new HashSet<LocalResourceRequest>();
 
-    String name = Long.toHexString(r.nextLong());
-    URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
-    when(uri.getScheme()).thenReturn("file");
-    when(uri.getHost()).thenReturn(null);
-    when(uri.getFile()).thenReturn("/local/" + vis + "/" + name);
+    ResourcesRequestedMatcher(Map<String, LocalResource> allResources,
+        EnumSet<LocalResourceVisibility> vis) throws URISyntaxException {
+      for (Entry<String, LocalResource> e : allResources.entrySet()) {
+        if (vis.contains(e.getValue().getVisibility())) {
+          resources.add(new LocalResourceRequest(e.getValue()));
+        }
+      }
+    }
 
-    when(rsrc.getResource()).thenReturn(uri);
-    when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
-    when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
-    when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
-    when(rsrc.getVisibility()).thenReturn(vis);
+    @Override
+    public boolean matches(Object o) {
+      ContainerLocalizationRequestEvent evt =
+          (ContainerLocalizationRequestEvent) o;
+      final HashSet<LocalResourceRequest> expected =
+          new HashSet<LocalResourceRequest>(resources);
+      for (Collection<LocalResourceRequest> rc : evt.getRequestedResources()
+          .values()) {
+        for (LocalResourceRequest rsrc : rc) {
+          if (!expected.remove(rsrc)) {
+            return false;
+          }
+        }
+      }
+      return expected.isEmpty();
+    }
+  }
 
-    return new SimpleEntry<String,LocalResource>(name, rsrc);
+  private static Entry<String, LocalResource> getMockRsrc(Random r,
+      LocalResourceVisibility vis) {
+    String name = Long.toHexString(r.nextLong());
+    URL url = BuilderUtils.newURL("file", null, 0, "/local" + vis + "/" + name);
+    LocalResource rsrc =
+        BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
+            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
+    return new SimpleEntry<String, LocalResource>(name, rsrc);
   }
 
-  static Map<String,LocalResource> createLocalResources(Random r) {
+  private static Map<String,LocalResource> createLocalResources(Random r) {
     Map<String,LocalResource> localResources =
       new HashMap<String,LocalResource>();
     for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@@ -313,17 +351,7 @@ public class TestContainer {
     return localResources;
   }
 
-  static ContainerId getMockContainerId(int appId, long timestamp, int id) {
-    ApplicationId aId = mock(ApplicationId.class);
-    when(aId.getId()).thenReturn(appId);
-    when(aId.getClusterTimestamp()).thenReturn(timestamp);
-    ContainerId cId = mock(ContainerId.class);
-    when(cId.getId()).thenReturn(id);
-    when(cId.getAppId()).thenReturn(aId);
-    return cId;
-  }
-
-  static Map<String,ByteBuffer> createServiceData(Random r) {
+  private static Map<String,ByteBuffer> createServiceData(Random r) {
     Map<String,ByteBuffer> serviceData =
       new HashMap<String,ByteBuffer>();
     for (int i = r.nextInt(5) + 5; i >= 0; --i) {
@@ -335,7 +363,134 @@ public class TestContainer {
     return serviceData;
   }
 
-  Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
+  private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
     return new ContainerImpl(disp, ctx, null, metrics);
   }
+  
+  @SuppressWarnings("unchecked")
+  private class WrappedContainer {
+    final DrainDispatcher dispatcher;
+    final EventHandler<LocalizationEvent> localizerBus;
+    final EventHandler<ContainersLauncherEvent> launcherBus;
+    final EventHandler<ContainersMonitorEvent> monitorBus;
+    final EventHandler<AuxServicesEvent> auxBus;
+
+    final ContainerLaunchContext ctxt;
+    final ContainerId cId;
+    final Container c;
+    final Map<String, LocalResource> localResources;
+    final Map<String, ByteBuffer> serviceData;
+    final String user;
+
+    WrappedContainer(int appId, long timestamp, int id, String user) {
+      this(appId, timestamp, id, user, true, false);
+    }
+
+    WrappedContainer(int appId, long timestamp, int id, String user,
+        boolean withLocalRes, boolean withServiceData) {
+      dispatcher = new DrainDispatcher();
+      dispatcher.init(null);
+
+      localizerBus = mock(EventHandler.class);
+      launcherBus = mock(EventHandler.class);
+      monitorBus = mock(EventHandler.class);
+      auxBus = mock(EventHandler.class);
+      dispatcher.register(LocalizationEventType.class, localizerBus);
+      dispatcher.register(ContainersLauncherEventType.class, launcherBus);
+      dispatcher.register(ContainersMonitorEventType.class, monitorBus);
+      dispatcher.register(AuxServicesEventType.class, auxBus);
+      this.user = user;
+
+      ctxt = mock(ContainerLaunchContext.class);
+      cId = BuilderUtils.newContainerId(appId, 1, timestamp, id);
+      when(ctxt.getUser()).thenReturn(this.user);
+      when(ctxt.getContainerId()).thenReturn(cId);
+
+      Resource resource = BuilderUtils.newResource(1024);
+      when(ctxt.getResource()).thenReturn(resource);
+
+      if (withLocalRes) {
+        Random r = new Random();
+        long seed = r.nextLong();
+        r.setSeed(seed);
+        System.out.println("WrappedContainerLocalResource seed: " + seed);
+        localResources = createLocalResources(r);
+      } else {
+        localResources = Collections.<String, LocalResource> emptyMap();
+      }
+      when(ctxt.getAllLocalResources()).thenReturn(localResources);
+
+      if (withServiceData) {
+        Random r = new Random();
+        long seed = r.nextLong();
+        r.setSeed(seed);
+        System.out.println("ServiceData seed: " + seed);
+        serviceData = createServiceData(r);
+      } else {
+        serviceData = Collections.<String, ByteBuffer> emptyMap();
+      }
+      when(ctxt.getAllServiceData()).thenReturn(serviceData);
+
+      c = newContainer(dispatcher, ctxt);
+      dispatcher.start();
+    }
+
+    private void drainDispatcherEvents() {
+      dispatcher.await();
+    }
+
+    public void finished() {
+      dispatcher.stop();
+    }
+
+    public void initContainer() {
+      c.handle(new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
+      drainDispatcherEvents();
+    }
+
+    public Map<Path, String> localizeResources() throws URISyntaxException {
+      Path cache = new Path("file:///cache");
+      Map<Path, String> localPaths = new HashMap<Path, String>();
+      for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
+        assertEquals(ContainerState.LOCALIZING, c.getContainerState());
+        LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
+        Path p = new Path(cache, rsrc.getKey());
+        localPaths.put(p, rsrc.getKey());
+        // rsrc copied to p
+        c.handle(new ContainerResourceLocalizedEvent(c.getContainerID(), 
+                 req, p));
+      }
+      drainDispatcherEvents();
+      return localPaths;
+    }
+
+    public void launchContainer() {
+      c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED));
+      drainDispatcherEvents();
+    }
+
+    public void containerSuccessful() {
+      c.handle(new ContainerEvent(cId,
+          ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
+      drainDispatcherEvents();
+    }
+
+    public void containerFailed(int exitCode) {
+      c.handle(new ContainerExitEvent(cId,
+          ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, exitCode));
+      drainDispatcherEvents();
+    }
+
+    public void killContainer() {
+      c.handle(new ContainerKillEvent(cId, "KillRequest"));
+      drainDispatcherEvents();
+    }
+
+    public void containerKilledOnRequest() {
+      c.handle(new ContainerExitEvent(cId,
+          ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED
+              .getExitCode()));
+      drainDispatcherEvents();
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1167677&r1=1167676&r2=1167677&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Sun Sep 11 06:29:05 2011
@@ -21,10 +21,17 @@ package org.apache.hadoop.yarn.server.no
 import java.net.InetSocketAddress;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+
+import junit.framework.Assert;
 
 import org.apache.avro.ipc.Server;
 import org.apache.hadoop.conf.Configuration;
@@ -63,11 +70,15 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -134,6 +145,190 @@ public class TestResourceLocalizationSer
 
   @Test
   @SuppressWarnings("unchecked") // mocked generics
+  public void testResourceRelease() throws Exception {
+    Configuration conf = new Configuration();
+    AbstractFileSystem spylfs =
+      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doNothing().when(spylfs).mkdir(
+        isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[4];
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+      sDirs[i] = localDirs.get(i).toString();
+    }
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+
+    Server ignore = mock(Server.class);
+    LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+    //Ignore actual localization
+    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerBus);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    DeletionService delService = new DeletionService(exec);
+    delService.init(null);
+    delService.start();
+
+    ResourceLocalizationService rawService =
+      new ResourceLocalizationService(dispatcher, exec, delService);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(ignore).when(spyService).createServer();
+    doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
+        isA(Configuration.class));
+    doReturn(lfs).when(spyService)
+        .getLocalFileContext(isA(Configuration.class));
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      final String user = "user0";
+      // init application
+      final Application app = mock(Application.class);
+      final ApplicationId appId =
+          BuilderUtils.newApplicationId(314159265358979L, 3);
+      when(app.getUser()).thenReturn(user);
+      when(app.getAppId()).thenReturn(appId);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+            
+      //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
+      LocalResourcesTracker appTracker =
+          spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user, appId);
+      LocalResourcesTracker privTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+              user, appId);
+      LocalResourcesTracker pubTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+              user, appId);
+
+      // init container.
+      final Container c = getMockContainer(appId, 42);
+      
+      // init resources
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+      
+      // Send localization requests for one resource of each type.
+      final LocalResource privResource = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq =
+          new LocalResourceRequest(privResource);
+      
+      final LocalResource pubResource = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+      final LocalResource pubResource2 = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq2 =
+          new LocalResourceRequest(pubResource2);
+      
+      final LocalResource appResource = getAppMockedResource(r);
+      final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
+      
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+          new HashMap<LocalResourceVisibility, 
+                      Collection<LocalResourceRequest>>();
+      req.put(LocalResourceVisibility.PRIVATE,
+          Collections.singletonList(privReq));
+      req.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq));
+      req.put(LocalResourceVisibility.APPLICATION,
+          Collections.singletonList(appReq));
+      
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+        new HashMap<LocalResourceVisibility, 
+                    Collection<LocalResourceRequest>>();
+      req2.put(LocalResourceVisibility.PRIVATE,
+          Collections.singletonList(privReq));
+      req2.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq2));
+      
+      Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
+      pubRsrcs.add(pubReq);
+      pubRsrcs.add(pubReq2);
+      
+      // Send Request event
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
+      dispatcher.await();
+
+      int privRsrcCount = 0;
+      for (LocalizedResource lr : privTracker) {
+        privRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
+        Assert.assertEquals(privReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, privRsrcCount);
+
+      int pubRsrcCount = 0;
+      for (LocalizedResource lr : pubTracker) {
+        pubRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+        pubRsrcs.remove(lr.getRequest());
+      }
+      Assert.assertEquals(0, pubRsrcs.size());
+      Assert.assertEquals(2, pubRsrcCount);
+
+      int appRsrcCount = 0;
+      for (LocalizedResource lr : appTracker) {
+        appRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+        Assert.assertEquals(appReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, appRsrcCount);
+      
+      //Send Cleanup Event
+      spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
+      req2.remove(LocalResourceVisibility.PRIVATE);
+      spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
+      dispatcher.await();
+      
+      pubRsrcs.add(pubReq);
+      pubRsrcs.add(pubReq2);
+
+      privRsrcCount = 0;
+      for (LocalizedResource lr : privTracker) {
+        privRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+        Assert.assertEquals(privReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, privRsrcCount);
+
+      pubRsrcCount = 0;
+      for (LocalizedResource lr : pubTracker) {
+        pubRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
+        pubRsrcs.remove(lr.getRequest());
+      }
+      Assert.assertEquals(0, pubRsrcs.size());
+      Assert.assertEquals(2, pubRsrcCount);
+
+      appRsrcCount = 0;
+      for (LocalizedResource lr : appTracker) {
+        appRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 0, lr.getRefCount());
+        Assert.assertEquals(appReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, appRsrcCount);
+    } finally {
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
+  
+  @Test
+  @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
     Configuration conf = new Configuration();
     AbstractFileSystem spylfs =
@@ -175,9 +370,8 @@ public class TestResourceLocalizationSer
 
       // init application
       final Application app = mock(Application.class);
-      final ApplicationId appId = mock(ApplicationId.class);
-      when(appId.getClusterTimestamp()).thenReturn(314159265358979L);
-      when(appId.getId()).thenReturn(3);
+      final ApplicationId appId =
+          BuilderUtils.newApplicationId(314159265358979L, 3);
       when(app.getUser()).thenReturn("user0");
       when(app.getAppId()).thenReturn(appId);
       spyService.handle(new ApplicationLocalizationEvent(
@@ -205,11 +399,13 @@ public class TestResourceLocalizationSer
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
           isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
           anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
-      final LocalResource resource = getMockResource(r);
+      final LocalResource resource = getPrivateMockedResource(r);
       final LocalResourceRequest req = new LocalResourceRequest(resource);
-      spyService.handle(new ContainerLocalizationRequestEvent(
-                c, Collections.singletonList(req),
-                LocalResourceVisibility.PRIVATE));
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+        new HashMap<LocalResourceVisibility, 
+                    Collection<LocalResourceRequest>>();
+      rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
+      spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
       // Sigh. Thread init of private localizer not accessible
       Thread.sleep(500);
       dispatcher.await();
@@ -265,42 +461,44 @@ public class TestResourceLocalizationSer
     }
   }
 
-  static URL getPath(String path) {
-    URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
-    when(uri.getScheme()).thenReturn("file");
-    when(uri.getHost()).thenReturn(null);
-    when(uri.getFile()).thenReturn(path);
-    return uri;
+  private static URL getPath(String path) {
+    URL url = BuilderUtils.newURL("file", null, 0, path);
+    return url;
   }
 
-  static LocalResource getMockResource(Random r) {
-    LocalResource rsrc = mock(LocalResource.class);
-
+  private static LocalResource getMockedResource(Random r, 
+      LocalResourceVisibility vis) {
     String name = Long.toHexString(r.nextLong());
-    URL uri = getPath("/local/PRIVATE/" + name);
-
-    when(rsrc.getResource()).thenReturn(uri);
-    when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
-    when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
-    when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
-    when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
+    URL url = getPath("/local/PRIVATE/" + name);
+    LocalResource rsrc =
+        BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
+            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L);
     return rsrc;
   }
+  
+  private static LocalResource getAppMockedResource(Random r) {
+    return getMockedResource(r, LocalResourceVisibility.APPLICATION);
+  }
+  
+  private static LocalResource getPublicMockedResource(Random r) {
+    return getMockedResource(r, LocalResourceVisibility.PUBLIC);
+  }
+  
+  private static LocalResource getPrivateMockedResource(Random r) {
+    return getMockedResource(r, LocalResourceVisibility.PRIVATE);
+  }
 
-  static Container getMockContainer(ApplicationId appId, int id) {
+  private static Container getMockContainer(ApplicationId appId, int id) {
     Container c = mock(Container.class);
-    ApplicationAttemptId appAttemptId = Records.newRecord(ApplicationAttemptId.class);
-    appAttemptId.setApplicationId(appId);
-    appAttemptId.setAttemptId(1);
-    ContainerId cId = Records.newRecord(ContainerId.class);
-    cId.setAppAttemptId(appAttemptId);
-    cId.setAppId(appId);
-    cId.setId(id);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+    ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
     when(c.getUser()).thenReturn("user0");
     when(c.getContainerID()).thenReturn(cId);
     Credentials creds = new Credentials();
     creds.addToken(new Text("tok" + id), getToken(id));
     when(c.getCredentials()).thenReturn(creds);
+    when(c.toString()).thenReturn(cId.toString());
     return c;
   }