You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2013/04/10 22:17:54 UTC

svn commit: r1466658 [1/2] - in /hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java...

Author: szetszwo
Date: Wed Apr 10 20:17:39 2013
New Revision: 1466658

URL: http://svn.apache.org/r1466658
Log:
Merge r1464808 through r1466652 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
      - copied unchanged from r1466652, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/ResourceLocalizationSpec.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
      - copied unchanged from r1466652, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/impl/pb/ResourceLocalizationSpecPBImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
      - copied unchanged from r1466652, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/NodeManagerBuilderUtils.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/InvalidResourceRequestException.java
      - copied unchanged from r1466652, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/InvalidResourceRequestException.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DefaultResourceCalculator.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DominantResourceCalculator.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceCalculator.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/CHANGES.txt Wed Apr 10 20:17:39 2013
@@ -25,6 +25,12 @@ Trunk - Unreleased 
 
     YARN-491. TestContainerLogsPage fails on Windows. (Chris Nauroth via hitesh)
 
+    YARN-557. Fix TestUnmanagedAMLauncher failure on Windows. (Chris Nauroth via
+    vinodkv)
+
+    YARN-524 TestYarnVersionInfo failing if generated properties doesn't
+    include an SVN URL. (stevel)
+
   BREAKDOWN OF HADOOP-8562 SUBTASKS
 
     YARN-158. Yarn creating package-info.java must not depend on sh.
@@ -55,9 +61,6 @@ Trunk - Unreleased 
     YARN-359. Fixing commands for container signalling in Windows. (Chris Nauroth
     via vinodkv)
     
-    YARN-524 TestYarnVersionInfo failing if generated properties doesn't
-    include an SVN URL(stevel)
-
 Release 2.0.5-beta - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -123,6 +126,12 @@ Release 2.0.5-beta - UNRELEASED
     YARN-458. YARN daemon addresses must be placed in many different configs. 
     (sandyr via tucu)
 
+    YARN-193. Scheduler.normalizeRequest does not account for allocation
+    requests that exceed maximumAllocation limits (Zhijie Shen via bikas)
+
+    YARN-479. NM retry behavior for connection to RM should be similar for
+    lost heartbeats (Jian He via bikas)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -195,6 +204,16 @@ Release 2.0.5-beta - UNRELEASED
     to implement closeable so that they can be stopped when needed via
     RPC.stopProxy(). (Siddharth Seth via vinodkv)
 
+    YARN-99. Modify private distributed cache to localize files such that no
+    local directory hits unix file count limits and thus prevent job failures.
+    (Omkar Vinit Joshi via vinodkv)
+
+    YARN-112. Fixed a race condition during localization that fails containers.
+    (Omkar Vinit Joshi via vinodkv)
+
+    YARN-534. Change RM restart recovery to also account for AM max-attempts 
+    configuration after the restart. (Jian He via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -509,6 +528,8 @@ Release 0.23.7 - UNRELEASED
     YARN-200. yarn log does not output all needed information, and is in a
     binary format (Ravi Prakash via jlowe)
 
+    YARN-525. make CS node-locality-delay refreshable (Thomas Graves via jlowe)
+
   OPTIMIZATIONS
 
     YARN-357. App submission should not be synchronized (daryn)

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationConstants.java Wed Apr 10 20:17:39 2013
@@ -103,7 +103,7 @@ public interface ApplicationConstants {
      * $USER
      * Final, non-modifiable.
      */
-    USER("USER"),
+    USER(Shell.WINDOWS ? "USERNAME": "USER"),
     
     /**
      * $LOGNAME

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Wed Apr 10 20:17:39 2013
@@ -683,7 +683,7 @@ public class ApplicationMaster {
       ctx.setResource(container.getResource());
 
       String jobUserName = System.getenv(ApplicationConstants.Environment.USER
-          .name());
+          .key());
       ctx.setUser(jobUserName);
       LOG.info("Setting user in ContainerLaunchContext to: " + jobUserName);
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java Wed Apr 10 20:17:39 2013
@@ -30,6 +30,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.junit.AfterClass;
@@ -50,7 +51,7 @@ public class TestUnmanagedAMLauncher {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
     if (yarnCluster == null) {
       yarnCluster = new MiniYARNCluster(
-          TestUnmanagedAMLauncher.class.getName(), 1, 1, 1);
+          TestUnmanagedAMLauncher.class.getSimpleName(), 1, 1, 1);
       yarnCluster.init(conf);
       yarnCluster.start();
       URL url = Thread.currentThread().getContextClassLoader()
@@ -93,7 +94,7 @@ public class TestUnmanagedAMLauncher {
     return envClassPath;
   }
 
-  @Test(timeout=10000)
+  @Test(timeout=30000)
   public void testDSShell() throws Exception {
     String classpath = getTestRuntimeClasspath();
     String javaHome = System.getenv("JAVA_HOME");
@@ -112,7 +113,8 @@ public class TestUnmanagedAMLauncher {
         javaHome
             + "/bin/java -Xmx512m "
             + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster "
-            + "--container_memory 128 --num_containers 1 --priority 0 --shell_command ls" };
+            + "--container_memory 128 --num_containers 1 --priority 0 "
+            + "--shell_command " + (Shell.WINDOWS ? "dir" : "ls") };
 
     LOG.info("Initializing Launcher");
     UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration(

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Apr 10 20:17:39 2013
@@ -122,9 +122,9 @@ public class YarnConfiguration extends C
   public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_MB =
     YARN_PREFIX + "scheduler.maximum-allocation-mb";
   public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB = 8192;
-  public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES =
+  public static final String RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES =
       YARN_PREFIX + "scheduler.maximum-allocation-vcores";
-  public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_CORES = 32;
+  public static final int DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES = 4;
 
   /** Number of threads to handle scheduler interface.*/
   public static final String RM_SCHEDULER_CLIENT_THREAD_COUNT =

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java Wed Apr 10 20:17:39 2013
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
-import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.regex.Pattern;
 
@@ -36,13 +35,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
  * Download a single URL to the local disk.
@@ -51,8 +49,7 @@ import org.apache.hadoop.yarn.util.Conve
 public class FSDownload implements Callable<Path> {
 
   private static final Log LOG = LogFactory.getLog(FSDownload.class);
-  
-  private Random rand;
+
   private FileContext files;
   private final UserGroupInformation userUgi;
   private Configuration conf;
@@ -71,13 +68,12 @@ public class FSDownload implements Calla
 
 
   public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
-      Path destDirPath, LocalResource resource, Random rand) {
+      Path destDirPath, LocalResource resource) {
     this.conf = conf;
     this.destDirPath = destDirPath;
     this.files = files;
     this.userUgi = ugi;
     this.resource = resource;
-    this.rand = rand;
   }
 
   LocalResource getResource() {
@@ -270,11 +266,6 @@ public class FSDownload implements Calla
     } catch (URISyntaxException e) {
       throw new IOException("Invalid resource", e);
     }
-    Path tmp;
-    do {
-      tmp = new Path(destDirPath, String.valueOf(rand.nextLong()));
-    } while (files.util().exists(tmp));
-    destDirPath = tmp;
     createDir(destDirPath, cachePerms);
     final Path dst_work = new Path(destDirPath + "_tmp");
     createDir(dst_work, cachePerms);
@@ -305,8 +296,6 @@ public class FSDownload implements Calla
         files.delete(dst_work, true);
       } catch (FileNotFoundException ignore) {
       }
-      // clear ref to internal var
-      rand = null;
       conf = null;
       resource = null;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Wed Apr 10 20:17:39 2013
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 
@@ -66,6 +67,8 @@ import org.junit.Test;
 public class TestFSDownload {
 
   private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
+  private static AtomicLong uniqueNumberGenerator =
+    new AtomicLong(System.currentTimeMillis());
   
   @AfterClass
   public static void deleteTestDir() throws IOException {
@@ -267,9 +270,11 @@ public class TestFSDownload {
     rsrcVis.put(rsrc, vis);
     Path destPath = dirs.getLocalPathForWrite(
         basedir.toString(), size, conf);
+    destPath = new Path (destPath,
+      Long.toString(uniqueNumberGenerator.incrementAndGet()));
     FSDownload fsd =
       new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
-          destPath, rsrc, new Random(sharedSeed));
+          destPath, rsrc);
     pending.put(rsrc, exec.submit(fsd));
 
     try {
@@ -320,9 +325,11 @@ public class TestFSDownload {
       rsrcVis.put(rsrc, vis);
       Path destPath = dirs.getLocalPathForWrite(
           basedir.toString(), sizes[i], conf);
+      destPath = new Path (destPath,
+          Long.toString(uniqueNumberGenerator.incrementAndGet()));
       FSDownload fsd =
           new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
-              destPath, rsrc, new Random(sharedSeed));
+              destPath, rsrc);
       pending.put(rsrc, exec.submit(fsd));
     }
 
@@ -380,9 +387,10 @@ public class TestFSDownload {
     Path p = new Path(basedir, "" + 1);
     LocalResource rsrc = createTarFile(files, p, size, rand, vis);
     Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    destPath = new Path (destPath,
+        Long.toString(uniqueNumberGenerator.incrementAndGet()));
     FSDownload fsd = new FSDownload(files,
-        UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
-        new Random(sharedSeed));
+        UserGroupInformation.getCurrentUser(), conf, destPath, rsrc);
     pending.put(rsrc, exec.submit(fsd));
     
     try {
@@ -437,9 +445,10 @@ public class TestFSDownload {
     LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
     rsrcjar.setType(LocalResourceType.PATTERN);
     Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    destPathjar = new Path (destPathjar,
+        Long.toString(uniqueNumberGenerator.incrementAndGet()));
     FSDownload fsdjar = new FSDownload(files,
-        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
-        new Random(sharedSeed));
+        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
     pending.put(rsrcjar, exec.submit(fsdjar));
 
     try {
@@ -493,9 +502,10 @@ public class TestFSDownload {
     Path p = new Path(basedir, "" + 1);
     LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
     Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    destPathjar = new Path (destPathjar,
+        Long.toString(uniqueNumberGenerator.incrementAndGet()));
     FSDownload fsdzip = new FSDownload(files,
-        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
-        new Random(sharedSeed));
+        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
     pending.put(rsrczip, exec.submit(fsdzip));
 
     try {
@@ -586,9 +596,11 @@ public class TestFSDownload {
       rsrcVis.put(rsrc, vis);
       Path destPath = dirs.getLocalPathForWrite(
           basedir.toString(), conf);
+      destPath = new Path (destPath,
+          Long.toString(uniqueNumberGenerator.incrementAndGet()));
       FSDownload fsd =
           new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
-              destPath, rsrc, new Random(sharedSeed));
+              destPath, rsrc);
       pending.put(rsrc, exec.submit(fsd));
     }
     
@@ -614,4 +626,38 @@ public class TestFSDownload {
     
 
   }
-}
+
+  @Test(timeout = 1000)
+  public void testUniqueDestinationPath() throws Exception {
+    Configuration conf = new Configuration();
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+        TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor();
+
+    LocalDirAllocator dirs =
+        new LocalDirAllocator(TestFSDownload.class.getName());
+    Path destPath = dirs.getLocalPathForWrite(basedir.toString(), conf);
+    destPath =
+        new Path(destPath, Long.toString(uniqueNumberGenerator
+            .incrementAndGet()));
+    try {
+      Path p = new Path(basedir, "dir" + 0 + ".jar");
+      LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+      LocalResource rsrc = createJar(files, p, vis);
+      FSDownload fsd =
+          new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
+              destPath, rsrc);
+      Future<Path> rPath = singleThreadedExec.submit(fsd);
+      // Now FSDownload will not create a random directory to localize the
+      // resource. Therefore the final localizedPath for the resource should be
+      // destination directory (passed as an argument) + file name.
+      Assert.assertEquals(destPath, rPath.get().getParent());
+    } finally {
+      singleThreadedExec.shutdown();
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Apr 10 20:17:39 2013
@@ -87,10 +87,9 @@ public class NodeStatusUpdaterImpl exten
 
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
-
-  private boolean previousHeartBeatSucceeded;
-  private List<ContainerStatus> previousContainersStatuses =
-      new ArrayList<ContainerStatus>();
+  private long rmConnectWaitMS;
+  private long rmConnectionRetryIntervalMS;
+  private boolean waitForEver;
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -99,7 +98,6 @@ public class NodeStatusUpdaterImpl exten
     this.context = context;
     this.dispatcher = dispatcher;
     this.metrics = metrics;
-    this.previousHeartBeatSucceeded = true;
   }
 
   @Override
@@ -137,8 +135,8 @@ public class NodeStatusUpdaterImpl exten
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
     
     LOG.info("Initialized nodemanager for " + nodeId + ":" +
-    		" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
-    		" physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
+        " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
+        " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
     
     super.init(conf);
   }
@@ -192,12 +190,12 @@ public class NodeStatusUpdaterImpl exten
 
   private void registerWithRM() throws YarnRemoteException {
     Configuration conf = getConfig();
-    long rmConnectWaitMS =
+    rmConnectWaitMS =
         conf.getInt(
             YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
             YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
         * 1000;
-    long rmConnectionRetryIntervalMS =
+    rmConnectionRetryIntervalMS =
         conf.getLong(
             YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
             YarnConfiguration
@@ -210,7 +208,7 @@ public class NodeStatusUpdaterImpl exten
           " should not be negative.");
     }
 
-    boolean waitForEver = (rmConnectWaitMS == -1000);
+    waitForEver = (rmConnectWaitMS == -1000);
 
     if(! waitForEver) {
       if(rmConnectWaitMS < 0) {
@@ -319,14 +317,8 @@ public class NodeStatusUpdaterImpl exten
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
     nodeStatus.setNodeId(this.nodeId);
 
-    List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
-    if(previousHeartBeatSucceeded) {
-      previousContainersStatuses.clear();
-    } else {
-      containersStatuses.addAll(previousContainersStatuses);
-    }
-
     int numActiveContainers = 0;
+    List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
     for (Iterator<Entry<ContainerId, Container>> i =
         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
       Entry<ContainerId, Container> e = i.next();
@@ -341,7 +333,6 @@ public class NodeStatusUpdaterImpl exten
       LOG.info("Sending out status for container: " + containerStatus);
 
       if (containerStatus.getState() == ContainerState.COMPLETE) {
-        previousContainersStatuses.add(containerStatus);
         // Remove
         i.remove();
 
@@ -404,6 +395,9 @@ public class NodeStatusUpdaterImpl exten
         while (!isStopped) {
           // Send heartbeat
           try {
+            NodeHeartbeatResponse response = null;
+            int rmRetryCount = 0;
+            long waitStartTime = System.currentTimeMillis();
             NodeStatus nodeStatus = getNodeStatus();
             nodeStatus.setResponseId(lastHeartBeatID);
             
@@ -414,9 +408,31 @@ public class NodeStatusUpdaterImpl exten
               request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
                 .getContainerTokenSecretManager().getCurrentKey());
             }
-            NodeHeartbeatResponse response =
-              resourceTracker.nodeHeartbeat(request);
-            previousHeartBeatSucceeded = true;
+            while (!isStopped) {
+              try {
+                rmRetryCount++;
+                response = resourceTracker.nodeHeartbeat(request);
+                break;
+              } catch (Throwable e) {
+                LOG.warn("Trying to heartbeat to ResourceManager, "
+                    + "current no. of failed attempts is " + rmRetryCount);
+                if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
+                    || waitForEver) {
+                  try {
+                    LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+                        + " seconds before next heartbeat to RM");
+                    Thread.sleep(rmConnectionRetryIntervalMS);
+                  } catch(InterruptedException ex) {
+                    //done nothing
+                  }
+                } else {
+                  String errorMessage = "Failed to heartbeat to RM, " +
+                      "no. of failed attempts is "+rmRetryCount;
+                  LOG.error(errorMessage,e);
+                  throw new YarnException(errorMessage,e);
+                }
+              }
+            }
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             // See if the master-key has rolled over
@@ -432,7 +448,7 @@ public class NodeStatusUpdaterImpl exten
             if (response.getNodeAction() == NodeAction.SHUTDOWN) {
               LOG
                   .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
-                  		" hence shutting down.");
+                      " hence shutting down.");
               dispatcher.getEventHandler().handle(
                   new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
               break;
@@ -461,8 +477,12 @@ public class NodeStatusUpdaterImpl exten
               dispatcher.getEventHandler().handle(
                   new CMgrCompletedAppsEvent(appsToCleanup));
             }
+          } catch (YarnException e) {
+            //catch and throw the exception if tried MAX wait time to connect RM
+            dispatcher.getEventHandler().handle(
+                new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+            throw e;
           } catch (Throwable e) {
-            previousHeartBeatSucceeded = false;
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.
             LOG.error("Caught exception in status-updater", e);

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerHeartbeatResponse.java Wed Apr 10 20:17:39 2013
@@ -18,18 +18,13 @@
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords;
 
 import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.*;
 
 public interface LocalizerHeartbeatResponse {
-  public LocalizerAction getLocalizerAction();
-  public List<LocalResource> getAllResources();
-  public LocalResource getLocalResource(int i);
 
+  public LocalizerAction getLocalizerAction();
   public void setLocalizerAction(LocalizerAction action);
 
-  public void addAllResources(List<LocalResource> resources);
-  public void addResource(LocalResource resource);
-  public void removeResource(int index);
-  public void clearResources();
-}
+  public List<ResourceLocalizationSpec> getResourceSpecs();
+  public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs);
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java Wed Apr 10 20:17:39 2013
@@ -21,13 +21,14 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.yarn.api.records.LocalResource;
+
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.ResourceLocalizationSpecProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
+import org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.ResourceLocalizationSpecPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 
@@ -40,13 +41,14 @@ public class LocalizerHeartbeatResponseP
   LocalizerHeartbeatResponseProto.Builder builder = null;
   boolean viaProto = false;
 
-  private List<LocalResource> resources;
+  private List<ResourceLocalizationSpec> resourceSpecs;
 
   public LocalizerHeartbeatResponsePBImpl() {
     builder = LocalizerHeartbeatResponseProto.newBuilder();
   }
 
-  public LocalizerHeartbeatResponsePBImpl(LocalizerHeartbeatResponseProto proto) {
+  public LocalizerHeartbeatResponsePBImpl(
+      LocalizerHeartbeatResponseProto proto) {
     this.proto = proto;
     viaProto = true;
   }
@@ -59,7 +61,7 @@ public class LocalizerHeartbeatResponseP
   }
 
   private void mergeLocalToBuilder() {
-    if (resources != null) {
+    if (resourceSpecs != null) {
       addResourcesToProto();
     }
   }
@@ -79,6 +81,7 @@ public class LocalizerHeartbeatResponseP
     viaProto = false;
   }
 
+  @Override
   public LocalizerAction getLocalizerAction() {
     LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     if (!p.hasAction()) {
@@ -87,14 +90,10 @@ public class LocalizerHeartbeatResponseP
     return convertFromProtoFormat(p.getAction());
   }
 
-  public List<LocalResource> getAllResources() {
-    initResources();
-    return this.resources;
-  }
-
-  public LocalResource getLocalResource(int i) {
+  @Override
+  public List<ResourceLocalizationSpec> getResourceSpecs() {
     initResources();
-    return this.resources.get(i);
+    return this.resourceSpecs;
   }
 
   public void setLocalizerAction(LocalizerAction action) {
@@ -106,31 +105,39 @@ public class LocalizerHeartbeatResponseP
     builder.setAction(convertToProtoFormat(action));
   }
 
+  public void setResourceSpecs(List<ResourceLocalizationSpec> rsrcs) {
+    maybeInitBuilder();
+    if (rsrcs == null) {
+      builder.clearResources();
+      return;
+    }
+    this.resourceSpecs = rsrcs;
+  }
+
   private void initResources() {
-    if (this.resources != null) {
+    if (this.resourceSpecs != null) {
       return;
     }
     LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<LocalResourceProto> list = p.getResourcesList();
-    this.resources = new ArrayList<LocalResource>();
-
-    for (LocalResourceProto c : list) {
-      this.resources.add(convertFromProtoFormat(c));
+    List<ResourceLocalizationSpecProto> list = p.getResourcesList();
+    this.resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
+    for (ResourceLocalizationSpecProto c : list) {
+      this.resourceSpecs.add(convertFromProtoFormat(c));
     }
   }
 
   private void addResourcesToProto() {
     maybeInitBuilder();
     builder.clearResources();
-    if (this.resources == null) 
+    if (this.resourceSpecs == null) 
       return;
-    Iterable<LocalResourceProto> iterable =
-        new Iterable<LocalResourceProto>() {
+    Iterable<ResourceLocalizationSpecProto> iterable =
+        new Iterable<ResourceLocalizationSpecProto>() {
       @Override
-      public Iterator<LocalResourceProto> iterator() {
-        return new Iterator<LocalResourceProto>() {
+      public Iterator<ResourceLocalizationSpecProto> iterator() {
+        return new Iterator<ResourceLocalizationSpecProto>() {
 
-          Iterator<LocalResource> iter = resources.iterator();
+          Iterator<ResourceLocalizationSpec> iter = resourceSpecs.iterator();
 
           @Override
           public boolean hasNext() {
@@ -138,8 +145,10 @@ public class LocalizerHeartbeatResponseP
           }
 
           @Override
-          public LocalResourceProto next() {
-            return convertToProtoFormat(iter.next());
+          public ResourceLocalizationSpecProto next() {
+            ResourceLocalizationSpec resource = iter.next();
+            
+            return ((ResourceLocalizationSpecPBImpl)resource).getProto();
           }
 
           @Override
@@ -154,34 +163,10 @@ public class LocalizerHeartbeatResponseP
     builder.addAllResources(iterable);
   }
 
-  public void addAllResources(List<LocalResource> resources) {
-    if (resources == null)
-      return;
-    initResources();
-    this.resources.addAll(resources);
-  }
 
-  public void addResource(LocalResource resource) {
-    initResources();
-    this.resources.add(resource);
-  }
-
-  public void removeResource(int index) {
-    initResources();
-    this.resources.remove(index);
-  }
-
-  public void clearResources() {
-    initResources();
-    this.resources.clear();
-  }
-
-  private LocalResource convertFromProtoFormat(LocalResourceProto p) {
-    return new LocalResourcePBImpl(p);
-  }
-
-  private LocalResourceProto convertToProtoFormat(LocalResource s) {
-    return ((LocalResourcePBImpl)s).getProto();
+  private ResourceLocalizationSpec convertFromProtoFormat(
+      ResourceLocalizationSpecProto p) {
+    return new ResourceLocalizationSpecPBImpl(p);
   }
 
   private LocalizerActionProto convertToProtoFormat(LocalizerAction a) {
@@ -191,5 +176,4 @@ public class LocalizerHeartbeatResponseP
   private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) {
     return LocalizerAction.valueOf(a.name());
   }
-
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Wed Apr 10 20:17:39 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
@@ -89,8 +91,6 @@ public class ContainerLocalizer {
   private final String localizerId;
   private final FileContext lfs;
   private final Configuration conf;
-  private final LocalDirAllocator appDirs;
-  private final LocalDirAllocator userDirs;
   private final RecordFactory recordFactory;
   private final Map<LocalResource,Future<Path>> pendingResources;
   private final String appCacheDirContextName;
@@ -112,8 +112,6 @@ public class ContainerLocalizer {
     this.recordFactory = recordFactory;
     this.conf = new Configuration();
     this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
-    this.appDirs = new LocalDirAllocator(appCacheDirContextName);
-    this.userDirs = new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, user));
     this.pendingResources = new HashMap<LocalResource,Future<Path>>();
   }
 
@@ -197,10 +195,10 @@ public class ContainerLocalizer {
     return new ExecutorCompletionService<Path>(exec);
   }
 
-  Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
+  Callable<Path> download(Path path, LocalResource rsrc,
       UserGroupInformation ugi) throws IOException {
-    Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
-    return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random());
+    DiskChecker.checkDir(new File(path.toUri().getRawPath()));
+    return new FSDownload(lfs, ugi, conf, path, rsrc);
   }
 
   static long getEstimatedSize(LocalResource rsrc) {
@@ -238,25 +236,12 @@ public class ContainerLocalizer {
         LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
         switch (response.getLocalizerAction()) {
         case LIVE:
-          List<LocalResource> newResources = response.getAllResources();
-          for (LocalResource r : newResources) {
-            if (!pendingResources.containsKey(r)) {
-              final LocalDirAllocator lda;
-              switch (r.getVisibility()) {
-              default:
-                LOG.warn("Unknown visibility: " + r.getVisibility()
-                        + ", Using userDirs");
-                //Falling back to userDirs for unknown visibility.
-              case PUBLIC:
-              case PRIVATE:
-                lda = userDirs;
-                break;
-              case APPLICATION:
-                lda = appDirs;
-                break;
-              }
-              // TODO: Synchronization??
-              pendingResources.put(r, cs.submit(download(lda, r, ugi)));
+          List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
+          for (ResourceLocalizationSpec newRsrc : newRsrcs) {
+            if (!pendingResources.containsKey(newRsrc.getResource())) {
+              pendingResources.put(newRsrc.getResource(), cs.submit(download(
+                new Path(newRsrc.getDestinationDirectory().getFile()),
+                newRsrc.getResource(), ugi)));
             }
           }
           break;

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Wed Apr 10 20:17:39 2013
@@ -22,8 +22,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Queue;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Wed Apr 10 20:17:39 2013
@@ -43,4 +43,5 @@ interface LocalResourcesTracker
   // TODO: Remove this in favour of EventHandler.handle
   void localizationCompleted(LocalResourceRequest req, boolean success);
 
+  long nextUniqueNumber();
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Wed Apr 10 20:17:39 2013
@@ -21,6 +21,7 @@ import java.io.File;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -66,6 +67,12 @@ class LocalResourcesTrackerImpl implemen
    */
   private ConcurrentHashMap<LocalResourceRequest, Path>
     inProgressLocalResourcesMap;
+  /*
+   * starting with 10 to accommodate 0-9 directories created as a part of
+   * LocalCacheDirectoryManager. So there will be one unique number generator
+   * per APPLICATION, USER and PUBLIC cache.
+   */
+  private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
 
   public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
       boolean useLocalCacheDirectoryManager, Configuration conf) {
@@ -283,4 +290,9 @@ class LocalResourcesTrackerImpl implemen
       }
     }
   }
+
+  @Override
+  public long nextUniqueNumber() {
+    return uniqueNumberGenerator.incrementAndGet();
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Apr 10 20:17:39 2013
@@ -34,7 +34,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
@@ -80,10 +79,12 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
@@ -105,6 +106,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -326,7 +328,7 @@ public class ResourceLocalizationService
     // 0) Create application tracking structs
     String userName = app.getUser();
     privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
-      dispatcher, false, super.getConfig()));
+      dispatcher, true, super.getConfig()));
     if (null != appRsrc.putIfAbsent(
       ConverterUtils.toString(app.getAppId()),
       new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
@@ -476,6 +478,21 @@ public class ResourceLocalizationService
     }
   }
 
+  private String getUserFileCachePath(String user) {
+    String path =
+        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + user + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
+    return path;
+  }
+
+  private String getUserAppCachePath(String user, String appId) {
+    String path =
+        "." + Path.SEPARATOR + ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+            + Path.SEPARATOR + appId;
+    return path;
+  }
+
   /**
    * Sub-component handling the spawning of {@link ContainerLocalizer}s
    */
@@ -648,8 +665,11 @@ public class ResourceLocalizationService
               DiskChecker.checkDir(
                 new File(publicDirDestPath.toUri().getPath()));
             }
+            publicDirDestPath =
+                new Path(publicDirDestPath, Long.toString(publicRsrc
+                  .nextUniqueNumber()));
             pending.put(queue.submit(new FSDownload(
-                lfs, null, conf, publicDirDestPath, resource, new Random())),
+                lfs, null, conf, publicDirDestPath, resource)),
                 request);
             attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
           } catch (IOException e) {
@@ -803,7 +823,20 @@ public class ResourceLocalizationService
         LocalResource next = findNextResource();
         if (next != null) {
           response.setLocalizerAction(LocalizerAction.LIVE);
-          response.addResource(next);
+          try {
+            ArrayList<ResourceLocalizationSpec> rsrcs =
+                new ArrayList<ResourceLocalizationSpec>();
+            ResourceLocalizationSpec rsrc =
+                NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                  getPathForLocalization(next));
+            rsrcs.add(rsrc);
+            response.setResourceSpecs(rsrcs);
+          } catch (IOException e) {
+            LOG.error("local path for PRIVATE localization could not be found."
+                + "Disks might have failed.", e);
+          } catch (URISyntaxException e) {
+            // TODO fail? Already translated several times...
+          }
         } else if (pending.isEmpty()) {
           // TODO: Synchronization
           response.setLocalizerAction(LocalizerAction.DIE);
@@ -812,7 +845,8 @@ public class ResourceLocalizationService
         }
         return response;
       }
-
+      ArrayList<ResourceLocalizationSpec> rsrcs =
+          new ArrayList<ResourceLocalizationSpec>();
       for (LocalResourceStatus stat : remoteResourceStatuses) {
         LocalResource rsrc = stat.getResource();
         LocalResourceRequest req = null;
@@ -835,6 +869,7 @@ public class ResourceLocalizationService
                   new ResourceLocalizedEvent(req,
                     ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
                     stat.getLocalSize()));
+              localizationCompleted(stat);
             } catch (URISyntaxException e) { }
             if (pending.isEmpty()) {
               // TODO: Synchronization
@@ -844,7 +879,17 @@ public class ResourceLocalizationService
             response.setLocalizerAction(LocalizerAction.LIVE);
             LocalResource next = findNextResource();
             if (next != null) {
-              response.addResource(next);
+              try {
+                ResourceLocalizationSpec resource =
+                    NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
+                      getPathForLocalization(next));
+                rsrcs.add(resource);
+              } catch (IOException e) {
+                LOG.error("local path for PRIVATE localization could not be " +
+                  "found. Disks might have failed.", e);
+              } catch (URISyntaxException e) {
+                  //TODO fail? Already translated several times...
+              }
             }
             break;
           case FETCH_PENDING:
@@ -854,6 +899,7 @@ public class ResourceLocalizationService
             LOG.info("DEBUG: FAILED " + req, stat.getException());
             assoc.getResource().unlock();
             response.setLocalizerAction(LocalizerAction.DIE);
+            localizationCompleted(stat);
             // TODO: Why is this event going directly to the container. Why not
             // the resource itself? What happens to the resource? Is it removed?
             dispatcher.getEventHandler().handle(
@@ -869,9 +915,53 @@ public class ResourceLocalizationService
             break;
         }
       }
+      response.setResourceSpecs(rsrcs);
       return response;
     }
 
+    private void localizationCompleted(LocalResourceStatus stat) {
+      try {
+        LocalResource rsrc = stat.getResource();
+        LocalResourceRequest key = new LocalResourceRequest(rsrc);
+        String user = context.getUser();
+        ApplicationId appId =
+            context.getContainerId().getApplicationAttemptId()
+              .getApplicationId();
+        LocalResourceVisibility vis = rsrc.getVisibility();
+        LocalResourcesTracker tracker =
+            getLocalResourcesTracker(vis, user, appId);
+        if (stat.getStatus() == ResourceStatusType.FETCH_SUCCESS) {
+          tracker.localizationCompleted(key, true);
+        } else {
+          tracker.localizationCompleted(key, false);
+        }
+      } catch (URISyntaxException e) {
+        LOG.error("Invalid resource URL specified", e);
+      }
+    }
+
+    private Path getPathForLocalization(LocalResource rsrc) throws IOException,
+        URISyntaxException {
+      String user = context.getUser();
+      ApplicationId appId =
+          context.getContainerId().getApplicationAttemptId().getApplicationId();
+      LocalResourceVisibility vis = rsrc.getVisibility();
+      LocalResourcesTracker tracker =
+          getLocalResourcesTracker(vis, user, appId);
+      String cacheDirectory = null;
+      if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
+        cacheDirectory = getUserFileCachePath(user);
+      } else {// APPLICATION ONLY
+        cacheDirectory = getUserAppCachePath(user, appId.toString());
+      }
+      Path dirPath =
+          dirsHandler.getLocalPathForWrite(cacheDirectory,
+            ContainerLocalizer.getEstimatedSize(rsrc), false);
+      dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+        dirPath);
+      return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+    }
+
     @Override
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
@@ -1033,4 +1123,4 @@ public class ResourceLocalizationService
     del.delete(null, dirPath, new Path[] {});
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_service_protos.proto Wed Apr 10 20:17:39 2013
@@ -47,7 +47,12 @@ enum LocalizerActionProto {
   DIE = 2;
 }
 
+message ResourceLocalizationSpecProto {
+  optional LocalResourceProto resource = 1;
+  optional URLProto destination_directory = 2;
+}
+
 message LocalizerHeartbeatResponseProto {
   optional LocalizerActionProto action = 1;
-  repeated LocalResourceProto resources = 2;
+  repeated ResourceLocalizationSpecProto resources = 2;
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Apr 10 20:17:39 2013
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -51,9 +52,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -167,6 +170,10 @@ public class TestNodeStatusUpdater {
         throws YarnRemoteException {
       NodeStatus nodeStatus = request.getNodeStatus();
       LOG.info("Got heartbeat number " + heartBeatID);
+      NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
+      Dispatcher mockDispatcher = mock(Dispatcher.class);
+      EventHandler mockEventHandler = mock(EventHandler.class);
+      when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
       nodeStatus.setResponseId(heartBeatID++);
       Map<ApplicationId, List<ContainerStatus>> appToContainers =
           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@@ -183,7 +190,8 @@ public class TestNodeStatusUpdater {
         launchContext.setContainerId(firstContainerID);
         launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
         launchContext.getResource().setMemory(2);
-        Container container = new ContainerImpl(conf , null, launchContext, null, null);
+        Container container = new ContainerImpl(conf , mockDispatcher,
+            launchContext, null, mockMetrics);
         this.context.getContainers().put(firstContainerID, container);
       } else if (heartBeatID == 2) {
         // Checks on the RM end
@@ -207,7 +215,8 @@ public class TestNodeStatusUpdater {
         launchContext.setContainerId(secondContainerID);
         launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
         launchContext.getResource().setMemory(3);
-        Container container = new ContainerImpl(conf, null, launchContext, null, null);
+        Container container = new ContainerImpl(conf, mockDispatcher,
+            launchContext, null, mockMetrics);
         this.context.getContainers().put(secondContainerID, container);
       } else if (heartBeatID == 3) {
         // Checks on the RM end
@@ -229,13 +238,14 @@ public class TestNodeStatusUpdater {
   }
 
   private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
-    public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
+    public ResourceTracker resourceTracker;
     private Context context;
 
     public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
       super(context, dispatcher, healthChecker, metrics);
       this.context = context;
+      resourceTracker = new MyResourceTracker(this.context);
     }
 
     @Override
@@ -312,6 +322,21 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
+    private ResourceTracker resourceTracker;
+
+    public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+      super(context, dispatcher, healthChecker, metrics);
+      resourceTracker = new MyResourceTracker5();
+    }
+
+    @Override
+    protected ResourceTracker getRMClient() {
+      return resourceTracker;
+    }
+  }
+
   private class MyNodeManager extends NodeManager {
     
     private MyNodeStatusUpdater3 nodeStatusUpdater;
@@ -328,6 +353,32 @@ public class TestNodeStatusUpdater {
     }
   }
   
+  private class MyNodeManager2 extends NodeManager {
+    public boolean isStopped = false;
+    private NodeStatusUpdater nodeStatusUpdater;
+    private CyclicBarrier syncBarrier;
+    public MyNodeManager2 (CyclicBarrier syncBarrier) {
+      this.syncBarrier = syncBarrier;
+    }
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      nodeStatusUpdater =
+          new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
+                                     metrics);
+      return nodeStatusUpdater;
+    }
+
+    @Override
+    public void stop() {
+      super.stop();
+      isStopped = true;
+      try {
+        syncBarrier.await();
+      } catch (Exception e) {
+      }
+    }
+  }
   // 
   private class MyResourceTracker2 implements ResourceTracker {
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
@@ -505,6 +556,26 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyResourceTracker5 implements ResourceTracker {
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(registerNodeAction );
+      return response;
+    }
+    
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      heartBeatID++;
+      throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+    }
+  }
+
   @Before
   public void clearError() {
     nmStartError = null;
@@ -883,6 +954,30 @@ public class TestNodeStatusUpdater {
     nm.stop();
   }
 
+  @Test(timeout = 20000)
+  public void testNodeStatusUpdaterRetryAndNMShutdown() 
+      throws InterruptedException {
+    final long connectionWaitSecs = 1;
+    final long connectionRetryIntervalSecs = 1;
+    YarnConfiguration conf = createNMConfig();
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+        connectionWaitSecs);
+    conf.setLong(YarnConfiguration
+        .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+        connectionRetryIntervalSecs);
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    nm = new MyNodeManager2(syncBarrier);
+    nm.init(conf);
+    nm.start();
+    try {
+      syncBarrier.await();
+    } catch (Exception e) {
+    }
+    Assert.assertTrue(((MyNodeManager2) nm).isStopped);
+    Assert.assertTrue("calculate heartBeatCount based on" +
+        " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
+  }
+
   private class MyNMContext extends NMContext {
     ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBRecordImpl.java Wed Apr 10 20:17:39 2013
@@ -17,6 +17,13 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -31,15 +38,14 @@ import org.apache.hadoop.yarn.ipc.RPCUti
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 public class TestPBRecordImpl {
 
@@ -54,9 +60,8 @@ public class TestPBRecordImpl {
   static LocalResource createResource() {
     LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
     assertTrue(ret instanceof LocalResourcePBImpl);
-    ret.setResource(
-        ConverterUtils.getYarnUrlFromPath(
-          new Path("hdfs://y.ak:8020/foo/bar")));
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(
+      "hdfs://y.ak:8020/foo/bar")));
     ret.setSize(4344L);
     ret.setTimestamp(3141592653589793L);
     ret.setVisibility(LocalResourceVisibility.PUBLIC);
@@ -90,16 +95,27 @@ public class TestPBRecordImpl {
     return ret;
   }
 
-  static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() {
+  static LocalizerHeartbeatResponse createLocalizerHeartbeatResponse() 
+      throws URISyntaxException {
     LocalizerHeartbeatResponse ret =
       recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
     assertTrue(ret instanceof LocalizerHeartbeatResponsePBImpl);
     ret.setLocalizerAction(LocalizerAction.LIVE);
-    ret.addResource(createResource());
+    LocalResource rsrc = createResource();
+    ArrayList<ResourceLocalizationSpec> rsrcs =
+      new ArrayList<ResourceLocalizationSpec>();
+    ResourceLocalizationSpec resource =
+      recordFactory.newRecordInstance(ResourceLocalizationSpec.class);
+    resource.setResource(rsrc);
+    resource.setDestinationDirectory(ConverterUtils
+      .getYarnUrlFromPath(new Path("/tmp" + System.currentTimeMillis())));
+    rsrcs.add(resource);
+    ret.setResourceSpecs(rsrcs);
+    System.out.println(resource);
     return ret;
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalResourceStatusSerDe() throws Exception {
     LocalResourceStatus rsrcS = createLocalResourceStatus();
     assertTrue(rsrcS instanceof LocalResourceStatusPBImpl);
@@ -119,7 +135,7 @@ public class TestPBRecordImpl {
     assertEquals(createResource(), rsrcD.getResource());
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalizerStatusSerDe() throws Exception {
     LocalizerStatus rsrcS = createLocalizerStatus();
     assertTrue(rsrcS instanceof LocalizerStatusPBImpl);
@@ -141,7 +157,7 @@ public class TestPBRecordImpl {
     assertEquals(createLocalResourceStatus(), rsrcD.getResourceStatus(0));
   }
 
-  @Test
+  @Test(timeout=10000)
   public void testLocalizerHeartbeatResponseSerDe() throws Exception {
     LocalizerHeartbeatResponse rsrcS = createLocalizerHeartbeatResponse();
     assertTrue(rsrcS instanceof LocalizerHeartbeatResponsePBImpl);
@@ -158,8 +174,8 @@ public class TestPBRecordImpl {
       new LocalizerHeartbeatResponsePBImpl(rsrcPbD);
 
     assertEquals(rsrcS, rsrcD);
-    assertEquals(createResource(), rsrcS.getLocalResource(0));
-    assertEquals(createResource(), rsrcD.getLocalResource(0));
+    assertEquals(createResource(), rsrcS.getResourceSpecs().get(0).getResource());
+    assertEquals(createResource(), rsrcD.getResourceSpecs().get(0).getResource());
   }
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java?rev=1466658&r1=1466657&r2=1466658&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/MockLocalizerHeartbeatResponse.java Wed Apr 10 20:17:39 2013
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.no
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 
@@ -28,28 +28,30 @@ public class MockLocalizerHeartbeatRespo
     implements LocalizerHeartbeatResponse {
 
   LocalizerAction action;
-  List<LocalResource> rsrc;
+  List<ResourceLocalizationSpec> resourceSpecs;
 
   MockLocalizerHeartbeatResponse() {
-    rsrc = new ArrayList<LocalResource>();
+    resourceSpecs = new ArrayList<ResourceLocalizationSpec>();
   }
 
   MockLocalizerHeartbeatResponse(
-      LocalizerAction action, List<LocalResource> rsrc) {
+      LocalizerAction action, List<ResourceLocalizationSpec> resources) {
     this.action = action;
-    this.rsrc = rsrc;
+    this.resourceSpecs = resources;
   }
 
   public LocalizerAction getLocalizerAction() { return action; }
-  public List<LocalResource> getAllResources() { return rsrc; }
-  public LocalResource getLocalResource(int i) { return rsrc.get(i); }
   public void setLocalizerAction(LocalizerAction action) {
     this.action = action;
   }
-  public void addAllResources(List<LocalResource> resources) {
-    rsrc.addAll(resources);
+
+  @Override
+  public List<ResourceLocalizationSpec> getResourceSpecs() {
+    return resourceSpecs;
+}
+
+  @Override
+  public void setResourceSpecs(List<ResourceLocalizationSpec> resourceSpecs) {
+    this.resourceSpecs = resourceSpecs;
   }
-  public void addResource(LocalResource resource) { rsrc.add(resource); }
-  public void removeResource(int index) { rsrc.remove(index); }
-  public void clearResources() { rsrc.clear(); }
 }