You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/03 07:00:29 UTC

svn commit: r1463823 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/main/resources/ hadoop-yarn/hadoop-yarn-server/...

Author: vinodkv
Date: Wed Apr  3 05:00:28 2013
New Revision: 1463823

URL: http://svn.apache.org/r1463823
Log:
YARN-467. Modify public distributed cache to localize files such that no local directory hits unix file count limits and thus prevent job failures. Contributed by Omkar Vinit Joshi.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Apr  3 05:00:28 2013
@@ -178,7 +178,11 @@ Release 2.0.5-beta - UNRELEASED
 
     YARN-382. SchedulerUtils improve way normalizeRequest sets the resource
     capabilities. (Zhijie Shen via bikas)
- 
+
+    YARN-467. Modify public distributed cache to localize files such that no
+    local directory hits unix file count limits and thus prevent job failures.
+    (Omkar Vinit Joshi via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Wed Apr  3 05:00:28 2013
@@ -256,4 +256,18 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
 
+  <!-- Null pointer exception needs to be ignored here as this is never going to occur. -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl" />
+    <Method name="decrementFileCountForLocalCacheDirectory" />
+    <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
+  </Match>
+
+  <!-- Null pointer exception needs to be ignored here as this is never going to occur. -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourcesTrackerImpl" />
+    <Method name="getPathForLocalization" />
+    <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
+  </Match>
+
 </FindBugsFilter>

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Apr  3 05:00:28 2013
@@ -340,7 +340,15 @@ public class YarnConfiguration extends C
   /**List of directories to store localized files in.*/
   public static final String NM_LOCAL_DIRS = NM_PREFIX + "local-dirs";
   public static final String DEFAULT_NM_LOCAL_DIRS = "/tmp/nm-local-dir";
-  
+
+  /**
+   * Number of files in each localized directories
+   * Avoid tuning this too low. 
+   */
+  public static final String NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY =
+    NM_PREFIX + "local-cache.max-files-per-directory";
+  public static final int DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY = 8192;
+
   /** Address where the localizer IPC is.*/
   public static final String NM_LOCALIZER_ADDRESS =
     NM_PREFIX + "localizer.address";

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Apr  3 05:00:28 2013
@@ -360,6 +360,25 @@
   </property>
 
   <property>
+    <description>It limits the maximum number of files which will be localized
+      in a single local directory. If the limit is reached then sub-directories
+      will be created and new files will be localized in them. If it is set to
+      a value less than or equal to 36 [which are sub-directories (0-9 and then
+      a-z)] then NodeManager will fail to start. For example; [for public
+      cache] if this is configured with a value of 40 ( 4 files +
+      36 sub-directories) and the local-dir is "/tmp/local-dir1" then it will
+      allow 4 files to be created directly inside "/tmp/local-dir1/filecache".
+      For files that are localized further it will create a sub-directory "0"
+      inside "/tmp/local-dir1/filecache" and will localize files inside it
+      until it becomes full. If a file is removed from a sub-directory that
+      is marked full, then that sub-directory will be used back again to
+      localize files.
+   </description>
+    <name>yarn.nodemanager.local-cache.max-files-per-directory</name>
+    <value>8192</value>
+  </property>
+
+  <property>
     <description>Address where the localizer IPC is.</description>
     <name>yarn.nodemanager.localizer.address</name>
     <value>0.0.0.0:8040</value>

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1463823&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Wed Apr  3 05:00:28 2013
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * {@link LocalCacheDirectoryManager} is used for managing hierarchical
+ * directories for local cache. It will allow to restrict the number of files in
+ * a directory to
+ * {@link YarnConfiguration#NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY} which
+ * includes 36 sub-directories (named from 0 to 9 and a to z). Root directory is
+ * represented by an empty string. It internally maintains a vacant directory
+ * queue. As soon as the file count for the directory reaches its limit; new
+ * files will not be created in it until at least one file is deleted from it.
+ * New sub directories are not created unless a
+ * {@link LocalCacheDirectoryManager#getRelativePathForLocalization()} request
+ * is made and nonFullDirectories are empty.
+ * 
+ * Note : this structure only returns relative localization path but doesn't
+ * create one on disk.
+ */
+public class LocalCacheDirectoryManager {
+
+  private final int perDirectoryFileLimit;
+  // total 36 = a to z plus 0 to 9
+  public static final int DIRECTORIES_PER_LEVEL = 36;
+
+  private Queue<Directory> nonFullDirectories;
+  private HashMap<String, Directory> knownDirectories;
+  private int totalSubDirectories;
+
+  public LocalCacheDirectoryManager(Configuration conf) {
+    totalSubDirectories = 0;
+    Directory rootDir = new Directory(totalSubDirectories);
+    nonFullDirectories = new LinkedList<Directory>();
+    knownDirectories = new HashMap<String, Directory>();
+    knownDirectories.put("", rootDir);
+    nonFullDirectories.add(rootDir);
+    this.perDirectoryFileLimit =
+        conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+          YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY) - 36;
+  }
+
+  /**
+   * This method will return relative path from the first available vacant
+   * directory.
+   * 
+   * @return {@link String} relative path for localization
+   */
+  public synchronized String getRelativePathForLocalization() {
+    if (nonFullDirectories.isEmpty()) {
+      totalSubDirectories++;
+      Directory newDir = new Directory(totalSubDirectories);
+      nonFullDirectories.add(newDir);
+      knownDirectories.put(newDir.getRelativePath(), newDir);
+    }
+    Directory subDir = nonFullDirectories.peek();
+    if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
+      nonFullDirectories.remove();
+    }
+    return subDir.getRelativePath();
+  }
+
+  /**
+   * This method will reduce the file count for the directory represented by
+   * path. The root directory of this Local cache directory manager is
+   * represented by an empty string.
+   */
+  public synchronized void decrementFileCountForPath(String relPath) {
+    relPath = relPath == null ? "" : relPath.trim();
+    Directory subDir = knownDirectories.get(relPath);
+    int oldCount = subDir.getCount();
+    if (subDir.decrementAndGetCount() < perDirectoryFileLimit
+        && oldCount >= perDirectoryFileLimit) {
+      nonFullDirectories.add(subDir);
+    }
+  }
+
+  /*
+   * It limits the number of files and sub directories in the directory to the
+   * limit LocalCacheDirectoryManager#perDirectoryFileLimit.
+   */
+  static class Directory {
+
+    private final String relativePath;
+    private int fileCount;
+
+    public Directory(int directoryNo) {
+      fileCount = 0;
+      if (directoryNo == 0) {
+        relativePath = "";
+      } else {
+        String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL);
+        StringBuffer sb = new StringBuffer();
+        if (tPath.length() == 1) {
+          sb.append(tPath.charAt(0));
+        } else {
+          // this is done to make sure we also reuse 0th sub directory
+          sb.append(Integer.toString(
+            Integer.parseInt(tPath.substring(0, 1), DIRECTORIES_PER_LEVEL) - 1,
+            DIRECTORIES_PER_LEVEL));
+        }
+        for (int i = 1; i < tPath.length(); i++) {
+          sb.append(Path.SEPARATOR).append(tPath.charAt(i));
+        }
+        relativePath = sb.toString();
+      }
+    }
+
+    public int incrementAndGetCount() {
+      return ++fileCount;
+    }
+
+    public int decrementAndGetCount() {
+      return --fileCount;
+    }
+
+    public String getRelativePath() {
+      return relativePath;
+    }
+
+    public int getCount() {
+      return fileCount;
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Wed Apr  3 05:00:28 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -35,6 +36,11 @@ interface LocalResourcesTracker
 
   boolean remove(LocalizedResource req, DeletionService delService);
 
+  Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
+
   String getUser();
 
+  // TODO: Remove this in favour of EventHandler.handle
+  void localizationCompleted(LocalResourceRequest req, boolean success);
+
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Wed Apr  3 05:00:28 2013
@@ -26,12 +26,13 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+
 
 /**
  * A collection of {@link LocalizedResource}s all of same
@@ -49,17 +50,43 @@ class LocalResourcesTrackerImpl implemen
   private final String user;
   private final Dispatcher dispatcher;
   private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
+  private Configuration conf;
+  /*
+   * This flag controls whether this resource tracker uses hierarchical
+   * directories or not. For PRIVATE and PUBLIC resource trackers it
+   * will be set whereas for APPLICATION resource tracker it would
+   * be false.
+   */
+  private final boolean useLocalCacheDirectoryManager;
+  private ConcurrentHashMap<Path, LocalCacheDirectoryManager> directoryManagers;
+  /*
+   * It is used to keep track of resource into hierarchical directory
+   * while it is getting downloaded. It is useful for reference counting
+   * in case resource localization fails.
+   */
+  private ConcurrentHashMap<LocalResourceRequest, Path>
+    inProgressLocalResourcesMap;
 
-  public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
+  public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+      boolean useLocalCacheDirectoryManager, Configuration conf) {
     this(user, dispatcher,
-        new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>());
+      new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
+      useLocalCacheDirectoryManager, conf);
   }
 
   LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
-      ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc) {
+      ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
+      boolean useLocalCacheDirectoryManager, Configuration conf) {
     this.user = user;
     this.dispatcher = dispatcher;
     this.localrsrc = localrsrc;
+    this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
+    if ( this.useLocalCacheDirectoryManager) {
+      directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
+      inProgressLocalResourcesMap =
+        new ConcurrentHashMap<LocalResourceRequest, Path>();
+    }
+    this.conf = conf;
   }
 
   @Override
@@ -73,6 +100,7 @@ class LocalResourcesTrackerImpl implemen
         LOG.info("Resource " + rsrc.getLocalPath()
             + " is missing, localizing it again");
         localrsrc.remove(req);
+        decrementFileCountForLocalCacheDirectory(req, rsrc);
         rsrc = null;
       }
       if (null == rsrc) {
@@ -90,7 +118,52 @@ class LocalResourcesTrackerImpl implemen
     rsrc.handle(event);
   }
 
-  /**
+  /*
+   * Update the file-count statistics for a local cache-directory.
+   * This will retrieve the localized path for the resource from
+   * 1) inProgressRsrcMap if the resource was under localization and it
+   * failed.
+   * 2) LocalizedResource if the resource is already localized.
+   * From this path it will identify the local directory under which the
+   * resource was localized. Then rest of the path will be used to decrement
+   * file count for the HierarchicalSubDirectory pointing to this relative
+   * path.
+   */
+  private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
+      LocalizedResource rsrc) {
+    if ( useLocalCacheDirectoryManager) {
+      Path rsrcPath = null;
+      if (inProgressLocalResourcesMap.containsKey(req)) {
+        // This happens when localization of a resource fails.
+        rsrcPath = inProgressLocalResourcesMap.remove(req);
+      } else if (rsrc != null && rsrc.getLocalPath() != null) {
+        rsrcPath = rsrc.getLocalPath().getParent().getParent();
+      }
+      if (rsrcPath != null) {
+        Path parentPath = new Path(rsrcPath.toUri().getRawPath());
+        while (!directoryManagers.containsKey(parentPath)) {
+          parentPath = parentPath.getParent();
+          if ( parentPath == null) {
+            return;
+          }
+        }
+        if ( parentPath != null) {
+          String parentDir = parentPath.toUri().getRawPath().toString();
+          LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
+          String rsrcDir = rsrcPath.toUri().getRawPath(); 
+          if (rsrcDir.equals(parentDir)) {
+            dir.decrementFileCountForPath("");
+          } else {
+            dir.decrementFileCountForPath(
+              rsrcDir.substring(
+              parentDir.length() + 1));
+          }
+        }
+      }
+    }
+  }
+
+/**
    * This module checks if the resource which was localized is already present
    * or not
    * 
@@ -100,7 +173,8 @@ class LocalResourcesTrackerImpl implemen
   public boolean isResourcePresent(LocalizedResource rsrc) {
     boolean ret = true;
     if (rsrc.getState() == ResourceState.LOCALIZED) {
-      File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+      File file = new File(rsrc.getLocalPath().toUri().getRawPath().
+        toString());
       if (!file.exists()) {
         ret = false;
       }
@@ -133,11 +207,11 @@ class LocalResourcesTrackerImpl implemen
       if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
         delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
       }
+      decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
       return true;
     }
   }
 
-
   /**
    * Returns the path up to the random directory component.
    */
@@ -163,4 +237,50 @@ class LocalResourcesTrackerImpl implemen
   public Iterator<LocalizedResource> iterator() {
     return localrsrc.values().iterator();
   }
-}
+
+  /**
+   * @return {@link Path} absolute path for localization which includes local
+   *         directory path and the relative hierarchical path (if use local
+   *         cache directory manager is enabled)
+   * 
+   * @param {@link LocalResourceRequest} Resource localization request to
+   *        localize the resource.
+   * @param {@link Path} local directory path
+   */
+  @Override
+  public Path
+      getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+    if (useLocalCacheDirectoryManager && localDirPath != null) {
+
+      if (!directoryManagers.containsKey(localDirPath)) {
+        directoryManagers.putIfAbsent(localDirPath,
+          new LocalCacheDirectoryManager(conf));
+      }
+      LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
+
+      Path rPath = localDirPath;
+      String hierarchicalPath = dir.getRelativePathForLocalization();
+      // For most of the scenarios we will get root path only which
+      // is an empty string
+      if (!hierarchicalPath.isEmpty()) {
+        rPath = new Path(localDirPath, hierarchicalPath);
+      }
+      inProgressLocalResourcesMap.put(req, rPath);
+      return rPath;
+    } else {
+      return localDirPath;
+    }
+  }
+
+  @Override
+  public void localizationCompleted(LocalResourceRequest req,
+      boolean success) {
+    if (useLocalCacheDirectoryManager) {
+      if (!success) {
+        decrementFileCountForLocalCacheDirectory(req, null);
+      } else {
+        inProgressLocalResourcesMap.remove(req);
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Apr  3 05:00:28 2013
@@ -64,6 +64,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -130,7 +131,7 @@ public class ResourceLocalizationService
   private RecordFactory recordFactory;
   private final ScheduledExecutorService cacheCleanup;
 
-  private final LocalResourcesTracker publicRsrc;
+  private LocalResourcesTracker publicRsrc;
 
   private LocalDirsHandlerService dirsHandler;
 
@@ -158,7 +159,6 @@ public class ResourceLocalizationService
     this.delService = delService;
     this.dirsHandler = dirsHandler;
 
-    this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
     this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
         new ThreadFactoryBuilder()
           .setNameFormat("ResourceLocalizationService Cache Cleanup")
@@ -173,8 +173,26 @@ public class ResourceLocalizationService
     }
   }
 
+  private void validateConf(Configuration conf) {
+    int perDirFileLimit =
+        conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+          YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
+    if (perDirFileLimit <= 36) {
+      LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+          + " parameter is configured with very low value.");
+      throw new YarnException(
+        YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+            + " parameter is configured with a value less than 37.");
+    } else {
+      LOG.info("per directory file limit = " + perDirFileLimit);
+    }
+  }
+
   @Override
   public void init(Configuration conf) {
+    this.validateConf(conf);
+    this.publicRsrc =
+        new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
 
     try {
@@ -212,6 +230,7 @@ public class ResourceLocalizationService
         YarnConfiguration.NM_LOCALIZER_ADDRESS,
         YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
         YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
+
     localizerTracker = createLocalizerTracker(conf);
     addService(localizerTracker);
     dispatcher.register(LocalizerEventType.class, localizerTracker);
@@ -306,15 +325,17 @@ public class ResourceLocalizationService
   private void handleInitApplicationResources(Application app) {
     // 0) Create application tracking structs
     String userName = app.getUser();
-    privateRsrc.putIfAbsent(userName,
-        new LocalResourcesTrackerImpl(userName, dispatcher));
-    if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
-        new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
+    privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
+      dispatcher, false, super.getConfig()));
+    if (null != appRsrc.putIfAbsent(
+      ConverterUtils.toString(app.getAppId()),
+      new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
+        .getConfig()))) {
       LOG.warn("Initializing application " + app + " already present");
       assert false; // TODO: FIXME assert doesn't help
                     // ^ The condition is benign. Tests should fail and it
-                    //   should appear in logs, but it's an internal error
-                    //   that should have no effect on applications
+                    // should appear in logs, but it's an internal error
+                    // that should have no effect on applications
     }
     // 1) Signal container init
     //
@@ -620,6 +641,13 @@ public class ResourceLocalizationService
             Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
                 "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
                 ContainerLocalizer.getEstimatedSize(resource), true);
+            Path hierarchicalPath =
+              publicRsrc.getPathForLocalization(key, publicDirDestPath);
+            if (!hierarchicalPath.equals(publicDirDestPath)) {
+              publicDirDestPath = hierarchicalPath;
+              DiskChecker.checkDir(
+                new File(publicDirDestPath.toUri().getPath()));
+            }
             pending.put(queue.submit(new FSDownload(
                 lfs, null, conf, publicDirDestPath, resource, new Random())),
                 request);
@@ -654,19 +682,21 @@ public class ResourceLocalizationService
               assoc.getResource().handle(
                   new ResourceLocalizedEvent(key,
                     local, FileUtil.getDU(new File(local.toUri()))));
+              publicRsrc.localizationCompleted(key, true);
               synchronized (attempts) {
                 attempts.remove(key);
               }
             } catch (ExecutionException e) {
               LOG.info("Failed to download rsrc " + assoc.getResource(),
                   e.getCause());
+              LocalResourceRequest req = assoc.getResource().getRequest();
               dispatcher.getEventHandler().handle(
                   new ContainerResourceFailedEvent(
                     assoc.getContext().getContainerId(),
-                    assoc.getResource().getRequest(), e.getCause()));
+                    req, e.getCause()));
+              publicRsrc.localizationCompleted(req, false);
               List<LocalizerResourceRequestEvent> reqs;
               synchronized (attempts) {
-                LocalResourceRequest req = assoc.getResource().getRequest();
                 reqs = attempts.get(req);
                 if (null == reqs) {
                   LOG.error("Missing pending list for " + req);
@@ -1003,4 +1033,4 @@ public class ResourceLocalizationService
     del.delete(null, dirPath, new Path[] {});
   }
 
-}
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java?rev=1463823&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java Wed Apr  3 05:00:28 2013
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+
+public class TestLocalCacheDirectoryManager {
+
+  @Test(timeout = 10000)
+  public void testHierarchicalSubDirectoryCreation() {
+    // setting per directory file limit to 1.
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
+
+    LocalCacheDirectoryManager hDir = new LocalCacheDirectoryManager(conf);
+    // Test root directory path = ""
+    Assert.assertTrue(hDir.getRelativePathForLocalization().isEmpty());
+
+    // Testing path generation from "0" to "0/0/z/z"
+    for (int i = 1; i <= 37 * 36 * 36; i++) {
+      StringBuffer sb = new StringBuffer();
+      String num = Integer.toString(i - 1, 36);
+      if (num.length() == 1) {
+        sb.append(num.charAt(0));
+      } else {
+        sb.append(Integer.toString(
+          Integer.parseInt(num.substring(0, 1), 36) - 1, 36));
+      }
+      for (int j = 1; j < num.length(); j++) {
+        sb.append(Path.SEPARATOR).append(num.charAt(j));
+      }
+      Assert.assertEquals(sb.toString(), hDir.getRelativePathForLocalization());
+    }
+
+    String testPath1 = "4";
+    String testPath2 = "2";
+    /*
+     * Making sure directory "4" and "2" becomes non-full so that they are
+     * reused for future getRelativePathForLocalization() calls in the order
+     * they are freed.
+     */
+    hDir.decrementFileCountForPath(testPath1);
+    hDir.decrementFileCountForPath(testPath2);
+    // After below call directory "4" should become full.
+    Assert.assertEquals(testPath1, hDir.getRelativePathForLocalization());
+    Assert.assertEquals(testPath2, hDir.getRelativePathForLocalization());
+  }
+
+  @Test(timeout = 10000)
+  public void testMinimumPerDirectoryFileLimit() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
+    Exception e = null;
+    ResourceLocalizationService service =
+        new ResourceLocalizationService(null, null, null, null);
+    try {
+      service.init(conf);
+    } catch (Exception e1) {
+      e = e1;
+    }
+    Assert.assertNotNull(e);
+    Assert.assertEquals(YarnException.class, e.getClass());
+    Assert.assertEquals(e.getMessage(),
+      YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
+          + " parameter is configured with a value less than 37.");
+
+  }
+
+  @Test(timeout = 1000)
+  public void testDirectoryStateChangeFromFullToNonFull() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "40");
+    LocalCacheDirectoryManager dir = new LocalCacheDirectoryManager(conf);
+
+    // checking for first four paths
+    String rootPath = "";
+    String firstSubDir = "0";
+    for (int i = 0; i < 4; i++) {
+      Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
+    }
+    // Releasing two files from the root directory.
+    dir.decrementFileCountForPath(rootPath);
+    dir.decrementFileCountForPath(rootPath);
+    // Space for two files should be available in root directory.
+    Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
+    Assert.assertEquals(rootPath, dir.getRelativePathForLocalization());
+    // As no space is now available in root directory so it should be from
+    // first sub directory
+    Assert.assertEquals(firstSubDir, dir.getRelativePathForLocalization());
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Wed Apr  3 05:00:28 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -50,17 +51,17 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
-import org.mortbay.log.Log;
 
 public class TestLocalResourcesTrackerImpl {
 
-  @Test
+  @Test(timeout=10000)
   @SuppressWarnings("unchecked")
   public void test() {
     String user = "testuser";
     DrainDispatcher dispatcher = null;
     try {
-      dispatcher = createDispatcher(new Configuration());
+      Configuration conf = new Configuration();
+      dispatcher = createDispatcher(conf);
       EventHandler<LocalizerEvent> localizerEventHandler =
           mock(EventHandler.class);
       EventHandler<LocalizerEvent> containerEventHandler =
@@ -86,7 +87,8 @@ public class TestLocalResourcesTrackerIm
       localrsrc.put(req1, lr1);
       localrsrc.put(req2, lr2);
       LocalResourcesTracker tracker =
-          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc);
+          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
+            conf);
 
       ResourceEvent req11Event =
           new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -152,13 +154,14 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
-  @Test
+  @Test(timeout=10000)
   @SuppressWarnings("unchecked")
   public void testConsistency() {
     String user = "testuser";
     DrainDispatcher dispatcher = null;
     try {
-      dispatcher = createDispatcher(new Configuration());
+      Configuration conf = new Configuration();
+      dispatcher = createDispatcher(conf);
       EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
       EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
       dispatcher.register(LocalizerEventType.class, localizerEventHandler);
@@ -172,7 +175,7 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       localrsrc.put(req1, lr1);
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          dispatcher, localrsrc);
+          dispatcher, localrsrc, false, conf);
 
       ResourceEvent req11Event = new ResourceRequestEvent(req1,
           LocalResourceVisibility.PUBLIC, lc1);
@@ -221,6 +224,113 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
+  @Test(timeout = 100000)
+  @SuppressWarnings("unchecked")
+  public void testHierarchicalLocalCacheDirectories() {
+    String user = "testuser";
+    DrainDispatcher dispatcher = null;
+    try {
+      Configuration conf = new Configuration();
+      // setting per directory file limit to 1.
+      conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
+      dispatcher = createDispatcher(conf);
+
+      EventHandler<LocalizerEvent> localizerEventHandler =
+          mock(EventHandler.class);
+      EventHandler<LocalizerEvent> containerEventHandler =
+          mock(EventHandler.class);
+      dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+      dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+      DeletionService mockDelService = mock(DeletionService.class);
+
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
+          new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          dispatcher, localrsrc, true, conf);
+
+      // This is a random path. NO File creation will take place at this place.
+      Path localDir = new Path("/tmp");
+
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.PUBLIC);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+      // Container 1 requests lr1 to be localized
+      ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+          LocalResourceVisibility.PUBLIC, lc1);
+      tracker.handle(reqEvent1);
+
+      // Simulate the process of localization of lr1
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+      // Simulate lr1 getting localized
+      ResourceLocalizedEvent rle =
+          new ResourceLocalizedEvent(lr1,
+              new Path(hierarchicalPath1.toUri().toString() +
+                  Path.SEPARATOR + "file1"), 120);
+      tracker.handle(rle);
+      // Localization successful.
+      tracker.localizationCompleted(lr1, true);
+
+      LocalResourceRequest lr2 = createLocalResourceRequest(user, 3, 3,
+          LocalResourceVisibility.PUBLIC);
+      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+      // localization failed.
+      tracker.localizationCompleted(lr2, false);
+
+      /*
+       * The path returned for two localization should be different because we
+       * are limiting one file per sub-directory.
+       */
+      Assert.assertNotSame(hierarchicalPath1, hierarchicalPath2);
+
+      LocalResourceRequest lr3 = createLocalResourceRequest(user, 2, 2,
+          LocalResourceVisibility.PUBLIC);
+      ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
+          LocalResourceVisibility.PUBLIC, lc1);
+      tracker.handle(reqEvent3);
+      Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
+      tracker.localizationCompleted(lr3, true);
+
+      // Verifying that path created is inside the subdirectory
+      Assert.assertEquals(hierarchicalPath3.toUri().toString(),
+          hierarchicalPath1.toUri().toString() + Path.SEPARATOR + "0");
+
+      // Container 1 releases resource lr1
+      ResourceEvent relEvent1 = new ResourceReleaseEvent(lr1, cId1);
+      tracker.handle(relEvent1);
+
+      // Validate the file counts now
+      int resources = 0;
+      Iterator<LocalizedResource> iter = tracker.iterator();
+      while (iter.hasNext()) {
+        iter.next();
+        resources++;
+      }
+      // There should be only two resources lr1 and lr3 now.
+      Assert.assertEquals(2, resources);
+
+      // Now simulate cache cleanup - removes unused resources.
+      iter = tracker.iterator();
+      while (iter.hasNext()) {
+        LocalizedResource rsrc = iter.next();
+        if (rsrc.getRefCount() == 0) {
+          Assert.assertTrue(tracker.remove(rsrc, mockDelService));
+          resources--;
+        }
+      }
+      // lr1 is not used by anyone and will be removed, only lr3 will hang
+      // around
+      Assert.assertEquals(1, resources);
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
   private boolean createdummylocalizefile(Path path) {
     boolean ret = false;
     File file = new File(path.toUri().getRawPath().toString());

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1463823&r1=1463822&r2=1463823&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Wed Apr  3 05:00:28 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.no
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -76,10 +77,11 @@ public class TestResourceRetention {
 
   LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
       long nRsrcs, long timestamp, long tsstep) {
+    Configuration conf = new Configuration();
     ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
       new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
     LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
-          trackerResources));
+          trackerResources, false, conf));
     for (int i = 0; i < nRsrcs; ++i) {
       final LocalResourceRequest req = new LocalResourceRequest(
           new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,