You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by to...@apache.org on 2013/04/01 18:47:34 UTC

svn commit: r1463203 [3/8] - in /hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org...

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java Mon Apr  1 16:47:16 2013
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Shell;
 
 /**
  * Plugin to calculate resource information on the system.
@@ -31,6 +32,18 @@ import org.apache.hadoop.util.Reflection
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public abstract class ResourceCalculatorPlugin extends Configured {
+  
+  protected String processPid = null;
+
+  /**
+   * set the pid of the process for which <code>getProcResourceValues</code>
+   * will be invoked
+   * 
+   * @param pid
+   */
+  public void setProcessPid(String pid) {
+    processPid = pid;
+  }
 
   /**
    * Obtain the total size of the virtual memory present in the system.
@@ -109,10 +122,12 @@ public abstract class ResourceCalculator
 
     // No class given, try a os specific class
     try {
-      String osName = System.getProperty("os.name");
-      if (osName.startsWith("Linux")) {
+      if (Shell.LINUX) {
         return new LinuxResourceCalculatorPlugin();
       }
+      if (Shell.WINDOWS) {
+        return new WindowsResourceCalculatorPlugin();
+      }
     } catch (SecurityException se) {
       // Failed to get Operating System name.
       return null;

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java Mon Apr  1 16:47:16 2013
@@ -145,14 +145,11 @@ public abstract class ResourceCalculator
     }
 
     // No class given, try a os specific class
-    try {
-      String osName = System.getProperty("os.name");
-      if (osName.startsWith("Linux")) {
-        return new ProcfsBasedProcessTree(pid);
-      }
-    } catch (SecurityException se) {
-      // Failed to get Operating System name.
-      return null;
+    if (ProcfsBasedProcessTree.isAvailable()) {
+      return new ProcfsBasedProcessTree(pid);
+    }
+    if (WindowsBasedProcessTree.isAvailable()) {
+      return new WindowsBasedProcessTree(pid);
     }
 
     // Not supported on this system.

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java Mon Apr  1 16:47:16 2013
@@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.webapp.vi
 
 import org.apache.hadoop.yarn.webapp.ResponseInfo;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
+
 
 import com.google.inject.Inject;
 
@@ -47,7 +51,19 @@ public class InfoBlock extends HtmlBlock
       String value = String.valueOf(item.value);
       if (item.url == null) {
         if (!item.isRaw) {
-          tr.td(value);
+          TD<TR<TABLE<DIV<Hamlet>>>> td = tr.td();
+          if ( value.lastIndexOf('\n') > 0) {
+            String []lines = value.split("\n");
+        	DIV<TD<TR<TABLE<DIV<Hamlet>>>>> singleLineDiv;
+            for ( String line :lines) {
+              singleLineDiv = td.div();
+              singleLineDiv._r(line);
+              singleLineDiv._();
+            }
+          } else {
+            td._r(value);
+          }
+          td._();
         } else {
           tr.td()._r(value)._();
         }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java Mon Apr  1 16:47:16 2013
@@ -107,12 +107,21 @@ public class JQueryUI extends HtmlBlock 
 
   protected void initDataTables(List<String> list) {
     String defaultInit = "{bJQueryUI: true, sPaginationType: 'full_numbers'}";
+    String stateSaveInit = "bStateSave : true, " +
+          "\"fnStateSave\": function (oSettings, oData) { " +
+              "sessionStorage.setItem( oSettings.sTableId, JSON.stringify(oData) ); }, " +
+          "\"fnStateLoad\": function (oSettings) { " +
+              "return JSON.parse( sessionStorage.getItem(oSettings.sTableId) );}, ";
+      
     for (String id : split($(DATATABLES_ID))) {
       if (Html.isValidId(id)) {
         String init = $(initID(DATATABLES, id));
         if (init.isEmpty()) {
           init = defaultInit;
         }
+        // for inserting stateSaveInit
+        int pos = init.indexOf('{') + 1;  
+        init = new StringBuffer(init).insert(pos, stateSaveInit).toString(); 
         list.add(join(id,"DataTable =  $('#", id, "').dataTable(", init,
                       ").fnSetFilteringDelay(188);"));
         String postInit = $(postInitID(DATATABLES, id));
@@ -126,9 +135,12 @@ public class JQueryUI extends HtmlBlock 
       String init = $(initSelector(DATATABLES));
       if (init.isEmpty()) {
         init = defaultInit;
-      }
+      }      
+      int pos = init.indexOf('{') + 1;  
+      init = new StringBuffer(init).insert(pos, stateSaveInit).toString();  
       list.add(join("  $('", escapeJavaScript(selector), "').dataTable(", init,
-               ").fnSetFilteringDelay(288);"));
+               ").fnSetFilteringDelay(288);"));      
+      
     }
   }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Mon Apr  1 16:47:16 2013
@@ -135,8 +135,12 @@
   </property>
 
   <property>
-    <description>The maximum number of application master retries.</description>
-    <name>yarn.resourcemanager.am.max-retries</name>
+    <description>The maximum number of application attempts. It's a global
+    setting for all application masters. Each application master can specify
+    its individual maximum number of application attempts via the API, but the
+    individual number cannot be more than the global upper bound. If it is,
+    the resourcemanager will override it.</description>
+    <name>yarn.resourcemanager.am.max-attempts</name>
     <value>1</value>
   </property>
 
@@ -448,6 +452,20 @@
   </property>
 
   <property>
+    <description>Whether physical memory limits will be enforced for
+    containers.</description>
+    <name>yarn.nodemanager.pmem-check-enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <description>Whether virtual memory limits will be enforced for
+    containers.</description>
+    <name>yarn.nodemanager.vmem-check-enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
     <description>Ratio between virtual memory to physical memory when
     setting memory limits for containers. Container allocations are
     expressed in terms of physical memory, and virtual memory usage
@@ -597,6 +615,20 @@
     <value>2000</value>
   </property>
 
+  <property>
+    <description>Max time, in seconds, to wait to establish a connection to RM when NM starts.
+    The NM will shutdown if it cannot connect to RM within the specified max time period.
+    If the value is set as -1, then NM will retry forever.</description>
+    <name>yarn.nodemanager.resourcemanager.connect.wait.secs</name>
+    <value>900</value>
+  </property>
+
+  <property>
+    <description>Time interval, in seconds, between each NM attempt to connect to RM.</description>
+    <name>yarn.nodemanager.resourcemanager.connect.retry_interval.secs</name>
+    <value>30</value>
+  </property>
+
   <!--Map Reduce configuration-->
   <property>
     <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java Mon Apr  1 16:47:16 2013
@@ -23,9 +23,9 @@ import junit.framework.Assert;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
 import org.junit.Test;
 
 public class TestRecordFactory {
@@ -35,15 +35,17 @@ public class TestRecordFactory {
     RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
     
     try {
-      AMResponse response = pbRecordFactory.newRecordInstance(AMResponse.class);
-      Assert.assertEquals(AMResponsePBImpl.class, response.getClass());
+      AllocateResponse response =
+          pbRecordFactory.newRecordInstance(AllocateResponse.class);
+      Assert.assertEquals(AllocateResponsePBImpl.class, response.getClass());
     } catch (YarnException e) {
       e.printStackTrace();
       Assert.fail("Failed to crete record");
     }
     
     try {
-      AllocateRequest response = pbRecordFactory.newRecordInstance(AllocateRequest.class);
+      AllocateRequest response =
+          pbRecordFactory.newRecordInstance(AllocateRequest.class);
       Assert.assertEquals(AllocateRequestPBImpl.class, response.getClass());
     } catch (YarnException e) {
       e.printStackTrace();

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Mon Apr  1 16:47:16 2013
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEqu
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -47,10 +48,12 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -113,7 +116,127 @@ public class TestFSDownload {
     return ret;
   }
   
-  @Test
+  static LocalResource createTarFile(FileContext files, Path p, int len,
+      Random r, LocalResourceVisibility vis) throws IOException,
+      URISyntaxException {
+
+    FSDataOutputStream outFile = null;
+    try {
+      byte[] bytes = new byte[len];
+      Path tarPath = new Path(p.toString());
+      outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+      r.nextBytes(bytes);
+      outFile.write(bytes);
+    } finally {
+      if (outFile != null)
+        outFile.close();
+    }
+    StringBuffer tarCommand = new StringBuffer();
+    URI u = new URI(p.getParent().toString());
+    tarCommand.append("cd '");
+    tarCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+    tarCommand.append("' ; ");
+    tarCommand.append("tar -czf " + p.getName() + ".tar " + p.getName());
+    String[] shellCmd = { "bash", "-c", tarCommand.toString() };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+    shexec.execute();
+    int exitcode = shexec.getExitCode();
+    if (exitcode != 0) {
+      throw new IOException("Error untarring file " + p
+          + ". Tar process exited with exit code " + exitcode);
+    }
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+        + ".tar")));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.ARCHIVE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar"))
+        .getModificationTime());
+    return ret;
+  }
+  
+  static LocalResource createJarFile(FileContext files, Path p, int len,
+      Random r, LocalResourceVisibility vis) throws IOException,
+      URISyntaxException {
+
+    FSDataOutputStream outFile = null;
+    try {
+      byte[] bytes = new byte[len];
+      Path tarPath = new Path(p.toString());
+      outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+      r.nextBytes(bytes);
+      outFile.write(bytes);
+    } finally {
+      if (outFile != null)
+        outFile.close();
+    }
+    StringBuffer tarCommand = new StringBuffer();
+    URI u = new URI(p.getParent().toString());
+    tarCommand.append("cd '");
+    tarCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+    tarCommand.append("' ; ");
+    tarCommand.append("jar cf " + p.getName() + ".jar " + p.getName());
+    String[] shellCmd = { "bash", "-c", tarCommand.toString() };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+    shexec.execute();
+    int exitcode = shexec.getExitCode();
+    if (exitcode != 0) {
+      throw new IOException("Error untarring file " + p
+          + ". Tar process exited with exit code " + exitcode);
+    }
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+        + ".jar")));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.ARCHIVE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar"))
+        .getModificationTime());
+    return ret;
+  }
+  
+  static LocalResource createZipFile(FileContext files, Path p, int len,
+      Random r, LocalResourceVisibility vis) throws IOException,
+      URISyntaxException {
+
+    FSDataOutputStream outFile = null;
+    try {
+      byte[] bytes = new byte[len];
+      Path tarPath = new Path(p.toString());
+      outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE));
+      r.nextBytes(bytes);
+      outFile.write(bytes);
+    } finally {
+      if (outFile != null)
+        outFile.close();
+    }
+    StringBuffer zipCommand = new StringBuffer();
+    URI u = new URI(p.getParent().toString());
+    zipCommand.append("cd '");
+    zipCommand.append(FileUtil.makeShellPath(u.getPath().toString()));
+    zipCommand.append("' ; ");
+    zipCommand.append("gzip " + p.getName());
+    String[] shellCmd = { "bash", "-c", zipCommand.toString() };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
+    shexec.execute();
+    int exitcode = shexec.getExitCode();
+    if (exitcode != 0) {
+      throw new IOException("Error untarring file " + p
+          + ". Tar process exited with exit code " + exitcode);
+    }
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+        + ".zip")));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.ARCHIVE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".gz"))
+        .getModificationTime());
+    return ret;
+  }
+  
+  @Test (timeout=10000)
   public void testDownloadBadPublic() throws IOException, URISyntaxException,
       InterruptedException {
     Configuration conf = new Configuration();
@@ -161,7 +284,7 @@ public class TestFSDownload {
     }
   }
   
-  @Test
+  @Test (timeout=10000)
   public void testDownload() throws IOException, URISyntaxException,
       InterruptedException {
     Configuration conf = new Configuration();
@@ -229,6 +352,175 @@ public class TestFSDownload {
     }
   }
   
+  @SuppressWarnings("deprecation")
+  @Test (timeout=10000) 
+  public void testDownloadArchive() throws IOException, URISyntaxException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+        TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    Random rand = new Random();
+    long sharedSeed = rand.nextLong();
+    rand.setSeed(sharedSeed);
+    System.out.println("SEED: " + sharedSeed);
+
+    Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    LocalDirAllocator dirs = new LocalDirAllocator(
+        TestFSDownload.class.getName());
+
+    int size = rand.nextInt(512) + 512;
+    LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+    Path p = new Path(basedir, "" + 1);
+    LocalResource rsrc = createTarFile(files, p, size, rand, vis);
+    Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    FSDownload fsd = new FSDownload(files,
+        UserGroupInformation.getCurrentUser(), conf, destPath, rsrc,
+        new Random(sharedSeed));
+    pending.put(rsrc, exec.submit(fsd));
+    
+    try {
+      FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+          basedir);
+      for (FileStatus filestatus : filesstatus) {
+        if (filestatus.isDir()) {
+          FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+              filestatus.getPath());
+          for (FileStatus childfile : childFiles) {
+            if (childfile.getPath().getName().equalsIgnoreCase("1.tar.tmp")) {
+              Assert.fail("Tmp File should not have been there "
+                  + childfile.getPath());
+            }
+          }
+        }
+      }
+    }catch (Exception e) {
+      throw new IOException("Failed exec", e);
+    }
+    finally {
+      exec.shutdown();
+    }
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test (timeout=10000) 
+  public void testDownloadPatternJar() throws IOException, URISyntaxException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+        TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    Random rand = new Random();
+    long sharedSeed = rand.nextLong();
+    rand.setSeed(sharedSeed);
+    System.out.println("SEED: " + sharedSeed);
+
+    Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    LocalDirAllocator dirs = new LocalDirAllocator(
+        TestFSDownload.class.getName());
+
+    int size = rand.nextInt(512) + 512;
+    LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+    Path p = new Path(basedir, "" + 1);
+    LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
+    rsrcjar.setType(LocalResourceType.PATTERN);
+    Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    FSDownload fsdjar = new FSDownload(files,
+        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar,
+        new Random(sharedSeed));
+    pending.put(rsrcjar, exec.submit(fsdjar));
+
+    try {
+      FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+          basedir);
+      for (FileStatus filestatus : filesstatus) {
+        if (filestatus.isDir()) {
+          FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+              filestatus.getPath());
+          for (FileStatus childfile : childFiles) {
+            if (childfile.getPath().getName().equalsIgnoreCase("1.jar.tmp")) {
+              Assert.fail("Tmp File should not have been there "
+                  + childfile.getPath());
+            }
+          }
+        }
+      }
+    }catch (Exception e) {
+      throw new IOException("Failed exec", e);
+    }
+    finally {
+      exec.shutdown();
+    }
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test (timeout=10000) 
+  public void testDownloadArchiveZip() throws IOException, URISyntaxException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+        TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+
+    Random rand = new Random();
+    long sharedSeed = rand.nextLong();
+    rand.setSeed(sharedSeed);
+    System.out.println("SEED: " + sharedSeed);
+
+    Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    LocalDirAllocator dirs = new LocalDirAllocator(
+        TestFSDownload.class.getName());
+
+    int size = rand.nextInt(512) + 512;
+    LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+
+    Path p = new Path(basedir, "" + 1);
+    LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
+    Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
+    FSDownload fsdzip = new FSDownload(files,
+        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip,
+        new Random(sharedSeed));
+    pending.put(rsrczip, exec.submit(fsdzip));
+
+    try {
+      FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
+          basedir);
+      for (FileStatus filestatus : filesstatus) {
+        if (filestatus.isDir()) {
+          FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
+              filestatus.getPath());
+          for (FileStatus childfile : childFiles) {
+            if (childfile.getPath().getName().equalsIgnoreCase("1.gz.tmp")) {
+              Assert.fail("Tmp File should not have been there "
+                  + childfile.getPath());
+            }
+          }
+        }
+      }
+    }catch (Exception e) {
+      throw new IOException("Failed exec", e);
+    }
+    finally {
+      exec.shutdown();
+    }
+  }
+  
   private void verifyPermsRecursively(FileSystem fs,
       FileContext files, Path p,
       LocalResourceVisibility vis) throws IOException {
@@ -261,7 +553,7 @@ public class TestFSDownload {
     }      
   }
   
-  @Test
+  @Test (timeout=10000)
   public void testDirDownload() throws IOException, InterruptedException {
     Configuration conf = new Configuration();
     FileContext files = FileContext.getLocalFSFileContext(conf);

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java Mon Apr  1 16:47:16 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.util;
 
+import static org.junit.Assert.fail;
+
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -36,6 +38,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
@@ -104,17 +107,21 @@ public class TestProcfsBasedProcessTree 
         new Path(TEST_ROOT_DIR.getAbsolutePath()), true);
   }
 
-  @Test
+  @Test (timeout = 30000)
   public void testProcessTree() throws Exception {
 
+    if (!Shell.LINUX) {
+      System.out
+          .println("ProcfsBasedProcessTree is not available on this system. Not testing");
+      return;
+
+    }
     try {
-      if (!ProcfsBasedProcessTree.isAvailable()) {
-        System.out
-            .println("ProcfsBasedProcessTree is not available on this system. Not testing");
-        return;
-      }
+      Assert.assertTrue(ProcfsBasedProcessTree.isAvailable());
     } catch (Exception e) {
       LOG.info(StringUtils.stringifyException(e));
+      Assert.assertTrue("ProcfsBaseProcessTree should be available on Linux",
+        false);
       return;
     }
     // create shell script
@@ -183,11 +190,20 @@ public class TestProcfsBasedProcessTree 
     // destroy the process and all its subprocesses
     destroyProcessTree(pid);
 
-    if (isSetsidAvailable()) { // whole processtree should be gone
-      Assert.assertFalse("Proceesses in process group live",
-          isAnyProcessInTreeAlive(p));
-    } else {// process should be gone
-      Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
+    boolean isAlive = true;
+    for (int tries = 100; tries > 0; tries--) {
+      if (isSetsidAvailable()) {// whole processtree
+        isAlive = isAnyProcessInTreeAlive(p);
+      } else {// process
+        isAlive = isAlive(pid);
+      }
+      if (!isAlive) {
+        break;
+      }
+      Thread.sleep(100);
+    }
+    if (isAlive) {
+      fail("ProcessTree shouldn't be alive");
     }
 
     LOG.info("Process-tree dump follows: \n" + processTreeDump);
@@ -328,7 +344,7 @@ public class TestProcfsBasedProcessTree 
    * @throws IOException if there was a problem setting up the
    *                      fake procfs directories or files.
    */
-  @Test
+  @Test (timeout = 30000)
   public void testCpuAndMemoryForProcessTree() throws IOException {
 
     // test processes
@@ -402,7 +418,7 @@ public class TestProcfsBasedProcessTree 
    * @throws IOException if there was a problem setting up the
    *                      fake procfs directories or files.
    */
-  @Test
+  @Test (timeout = 30000)
   public void testMemForOlderProcesses() throws IOException {
     // initial list of processes
     String[] pids = { "100", "200", "300", "400" };
@@ -509,7 +525,7 @@ public class TestProcfsBasedProcessTree 
    * @throws IOException if there was a problem setting up the
    *                      fake procfs directories or files.
    */
-  @Test
+  @Test (timeout = 30000)
   public void testDestroyProcessTree() throws IOException {
     // test process
     String pid = "100";
@@ -535,7 +551,7 @@ public class TestProcfsBasedProcessTree 
    *
    * @throws IOException
    */
-  @Test
+  @Test (timeout = 30000)
   public void testProcessTreeDump()
       throws IOException {
 

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java Mon Apr  1 16:47:16 2013
@@ -18,9 +18,13 @@
 
 package org.apache.hadoop.yarn.util;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -30,9 +34,12 @@ import org.junit.Test;
 
 public class TestRackResolver {
 
+  private static Log LOG = LogFactory.getLog(TestRackResolver.class);
+
   public static final class MyResolver implements DNSToSwitchMapping {
 
     int numHost1 = 0;
+    public static String resolvedHost1 = "host1";
 
     @Override
     public List<String> resolve(List<String> hostList) {
@@ -43,7 +50,10 @@ public class TestRackResolver {
       if (hostList.isEmpty()) {
         return returnList;
       }
-      if (hostList.get(0).equals("host1")) {
+      LOG.info("Received resolve request for "
+          + hostList.get(0));
+      if (hostList.get(0).equals("host1")
+          || hostList.get(0).equals(resolvedHost1)) {
         numHost1++;
         returnList.add("/rack1");
       }
@@ -53,6 +63,10 @@ public class TestRackResolver {
       return returnList;
     }
 
+    @Override
+    public void reloadCachedMappings() {
+      // nothing to do here, since RawScriptBasedMapping has no cache.
+    }
   }
 
   @Test
@@ -62,6 +76,12 @@ public class TestRackResolver {
       CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
       MyResolver.class, DNSToSwitchMapping.class);
     RackResolver.init(conf);
+    try {
+      InetAddress iaddr = InetAddress.getByName("host1");
+      MyResolver.resolvedHost1 = iaddr.getHostAddress();
+    } catch (UnknownHostException e) {
+      // Ignore if not found
+    }
     Node node = RackResolver.resolve("host1");
     Assert.assertEquals("/rack1", node.getNetworkLocation());
     node = RackResolver.resolve("host1");

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java Mon Apr  1 16:47:16 2013
@@ -18,10 +18,28 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 
 public interface NodeHeartbeatResponse {
-  public abstract HeartbeatResponse getHeartbeatResponse();
+  int getResponseId();
+  NodeAction getNodeAction();
+
+  List<ContainerId> getContainersToCleanup();
+
+  List<ApplicationId> getApplicationsToCleanup();
+
+  void setResponseId(int responseId);
+  void setNodeAction(NodeAction action);
+
+  MasterKey getMasterKey();
+  void setMasterKey(MasterKey secretKey);
+
+  void addAllContainersToCleanup(List<ContainerId> containers);
   
-  public abstract void setHeartbeatResponse(HeartbeatResponse heartbeatResponse);
+  void addAllApplicationsToCleanup(List<ApplicationId> applications);
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java Mon Apr  1 16:47:16 2013
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 
 public interface RegisterNodeManagerResponse {
-  public abstract RegistrationResponse getRegistrationResponse();
-  
-  public abstract void setRegistrationResponse(RegistrationResponse registrationResponse);
+  MasterKey getMasterKey();
+
+  void setMasterKey(MasterKey secretKey);
+
+  NodeAction getNodeAction();
+
+  void setNodeAction(NodeAction nodeAction);
 
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java Mon Apr  1 16:47:16 2013
@@ -18,14 +18,25 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 
     
@@ -34,8 +45,9 @@ public class NodeHeartbeatResponsePBImpl
   NodeHeartbeatResponseProto.Builder builder = null;
   boolean viaProto = false;
   
-  private HeartbeatResponse heartbeatResponse = null;
-  
+  private List<ContainerId> containersToCleanup = null;
+  private List<ApplicationId> applicationsToCleanup = null;
+  private MasterKey masterKey = null;
   
   public NodeHeartbeatResponsePBImpl() {
     builder = NodeHeartbeatResponseProto.newBuilder();
@@ -54,8 +66,14 @@ public class NodeHeartbeatResponsePBImpl
   }
 
   private void mergeLocalToBuilder() {
-    if (this.heartbeatResponse != null) {
-      builder.setHeartbeatResponse(convertToProtoFormat(this.heartbeatResponse));
+    if (this.containersToCleanup != null) {
+      addContainersToCleanupToProto();
+    }
+    if (this.applicationsToCleanup != null) {
+      addApplicationsToCleanupToProto();
+    }
+    if (this.masterKey != null) {
+      builder.setMasterKey(convertToProtoFormat(this.masterKey));
     }
   }
 
@@ -76,34 +94,213 @@ public class NodeHeartbeatResponsePBImpl
     
   
   @Override
-  public HeartbeatResponse getHeartbeatResponse() {
+  public int getResponseId() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getResponseId());
+  }
+
+  @Override
+  public void setResponseId(int responseId) {
+    maybeInitBuilder();
+    builder.setResponseId((responseId));
+  }
+
+  @Override
+  public MasterKey getMasterKey() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.heartbeatResponse != null) {
-      return this.heartbeatResponse;
+    if (this.masterKey != null) {
+      return this.masterKey;
     }
-    if (!p.hasHeartbeatResponse()) {
+    if (!p.hasMasterKey()) {
       return null;
     }
-    this.heartbeatResponse = convertFromProtoFormat(p.getHeartbeatResponse());
-    return this.heartbeatResponse;
+    this.masterKey = convertFromProtoFormat(p.getMasterKey());
+    return this.masterKey;
+  }
+
+  @Override
+  public void setMasterKey(MasterKey masterKey) {
+    maybeInitBuilder();
+    if (masterKey == null)
+      builder.clearMasterKey();
+    this.masterKey = masterKey;
+  }
+
+  @Override
+  public NodeAction getNodeAction() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeAction()) {
+      return null;
+    }
+    return (convertFromProtoFormat(p.getNodeAction()));
+  }
+
+  @Override
+  public void setNodeAction(NodeAction nodeAction) {
+    maybeInitBuilder();
+    if (nodeAction == null) {
+      builder.clearNodeAction();
+      return;
+    }
+    builder.setNodeAction(convertToProtoFormat(nodeAction));
+  }
+
+  @Override
+  public List<ContainerId> getContainersToCleanup() {
+    initContainersToCleanup();
+    return this.containersToCleanup;
+  }
+
+  private void initContainersToCleanup() {
+    if (this.containersToCleanup != null) {
+      return;
+    }
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerIdProto> list = p.getContainersToCleanupList();
+    this.containersToCleanup = new ArrayList<ContainerId>();
+
+    for (ContainerIdProto c : list) {
+      this.containersToCleanup.add(convertFromProtoFormat(c));
+    }
   }
 
   @Override
-  public void setHeartbeatResponse(HeartbeatResponse heartbeatResponse) {
+  public void addAllContainersToCleanup(
+      final List<ContainerId> containersToCleanup) {
+    if (containersToCleanup == null)
+      return;
+    initContainersToCleanup();
+    this.containersToCleanup.addAll(containersToCleanup);
+  }
+
+  private void addContainersToCleanupToProto() {
     maybeInitBuilder();
-    if (heartbeatResponse == null) 
-      builder.clearHeartbeatResponse();
-    this.heartbeatResponse = heartbeatResponse;
+    builder.clearContainersToCleanup();
+    if (containersToCleanup == null)
+      return;
+    Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
+
+      @Override
+      public Iterator<ContainerIdProto> iterator() {
+        return new Iterator<ContainerIdProto>() {
+
+          Iterator<ContainerId> iter = containersToCleanup.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public ContainerIdProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+
+          }
+        };
+
+      }
+    };
+    builder.addAllContainersToCleanup(iterable);
+  }
+
+  @Override
+  public List<ApplicationId> getApplicationsToCleanup() {
+    initApplicationsToCleanup();
+    return this.applicationsToCleanup;
   }
 
-  private HeartbeatResponsePBImpl convertFromProtoFormat(HeartbeatResponseProto p) {
-    return new HeartbeatResponsePBImpl(p);
+  private void initApplicationsToCleanup() {
+    if (this.applicationsToCleanup != null) {
+      return;
+    }
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<ApplicationIdProto> list = p.getApplicationsToCleanupList();
+    this.applicationsToCleanup = new ArrayList<ApplicationId>();
+
+    for (ApplicationIdProto c : list) {
+      this.applicationsToCleanup.add(convertFromProtoFormat(c));
+    }
   }
 
-  private HeartbeatResponseProto convertToProtoFormat(HeartbeatResponse t) {
-    return ((HeartbeatResponsePBImpl)t).getProto();
+  @Override
+  public void addAllApplicationsToCleanup(
+      final List<ApplicationId> applicationsToCleanup) {
+    if (applicationsToCleanup == null)
+      return;
+    initApplicationsToCleanup();
+    this.applicationsToCleanup.addAll(applicationsToCleanup);
   }
 
+  private void addApplicationsToCleanupToProto() {
+    maybeInitBuilder();
+    builder.clearApplicationsToCleanup();
+    if (applicationsToCleanup == null)
+      return;
+    Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
+
+      @Override
+      public Iterator<ApplicationIdProto> iterator() {
+        return new Iterator<ApplicationIdProto>() {
+
+          Iterator<ApplicationId> iter = applicationsToCleanup.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+
+          @Override
+          public ApplicationIdProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+
+          }
+        };
+
+      }
+    };
+    builder.addAllApplicationsToCleanup(iterable);
+  }
 
+  private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+    return new ContainerIdPBImpl(p);
+  }
+
+  private ContainerIdProto convertToProtoFormat(ContainerId t) {
+    return ((ContainerIdPBImpl) t).getProto();
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+  private NodeAction convertFromProtoFormat(NodeActionProto p) {
+    return NodeAction.valueOf(p.name());
+  }
+
+  private NodeActionProto convertToProtoFormat(NodeAction t) {
+    return NodeActionProto.valueOf(t.name());
+  }
+
+  private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
+    return new MasterKeyPBImpl(p);
+  }
+
+  private MasterKeyProto convertToProtoFormat(MasterKey t) {
+    return ((MasterKeyPBImpl) t).getProto();
+  }
+}
 
-}  

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java Mon Apr  1 16:47:16 2013
@@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.ap
 
 
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.RegistrationResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 
     
@@ -34,7 +36,7 @@ public class RegisterNodeManagerResponse
   RegisterNodeManagerResponseProto.Builder builder = null;
   boolean viaProto = false;
   
-  private RegistrationResponse registartionResponse = null;
+  private MasterKey masterKey = null;
   
   private boolean rebuild = false;
   
@@ -56,9 +58,8 @@ public class RegisterNodeManagerResponse
   }
 
   private void mergeLocalToBuilder() {
-    if (this.registartionResponse != null) {
-      builder.setRegistrationResponse(convertToProtoFormat(this.registartionResponse));
-      this.registartionResponse = null;
+    if (this.masterKey != null) {
+      builder.setMasterKey(convertToProtoFormat(this.masterKey));
     }
   }
 
@@ -77,39 +78,60 @@ public class RegisterNodeManagerResponse
     }
     viaProto = false;
   }
-    
-  
+
   @Override
-  public RegistrationResponse getRegistrationResponse() {
+  public MasterKey getMasterKey() {
     RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.registartionResponse != null) {
-      return this.registartionResponse;
+    if (this.masterKey != null) {
+      return this.masterKey;
     }
-    if (!p.hasRegistrationResponse()) {
+    if (!p.hasMasterKey()) {
       return null;
     }
-    this.registartionResponse = convertFromProtoFormat(p.getRegistrationResponse());
-    rebuild = true;
-    return this.registartionResponse;
+    this.masterKey = convertFromProtoFormat(p.getMasterKey());
+    return this.masterKey;
   }
 
   @Override
-  public void setRegistrationResponse(RegistrationResponse registrationResponse) {
+  public void setMasterKey(MasterKey masterKey) {
     maybeInitBuilder();
-    if (registrationResponse == null) 
-      builder.clearRegistrationResponse();
-    this.registartionResponse = registrationResponse;
-    rebuild = true;
+    if (masterKey == null)
+      builder.clearMasterKey();
+    this.masterKey = masterKey;
   }
 
-  private RegistrationResponsePBImpl convertFromProtoFormat(RegistrationResponseProto p) {
-    return new RegistrationResponsePBImpl(p);
+  @Override
+  public NodeAction getNodeAction() {
+    RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if(!p.hasNodeAction()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getNodeAction());
   }
 
-  private RegistrationResponseProto convertToProtoFormat(RegistrationResponse t) {
-    return ((RegistrationResponsePBImpl)t).getProto();
+  @Override
+  public void setNodeAction(NodeAction nodeAction) {
+    maybeInitBuilder();
+    if (nodeAction == null) {
+      builder.clearNodeAction();
+      return;
+    }
+    builder.setNodeAction(convertToProtoFormat(nodeAction));
   }
 
+  private NodeAction convertFromProtoFormat(NodeActionProto p) {
+    return  NodeAction.valueOf(p.name());
+  }
 
+  private NodeActionProto convertToProtoFormat(NodeAction t) {
+    return NodeActionProto.valueOf(t.name());
+  }
 
+  private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
+    return new MasterKeyPBImpl(p);
+  }
+
+  private MasterKeyProto convertToProtoFormat(MasterKey t) {
+    return ((MasterKeyPBImpl)t).getProto();
+  }
 }  

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Mon Apr  1 16:47:16 2013
@@ -42,16 +42,3 @@ message MasterKeyProto {
   optional bytes bytes = 2;
 }
 
-message RegistrationResponseProto {
-  optional MasterKeyProto master_key = 1;
-  optional NodeActionProto nodeAction = 2;
-}
-
-message HeartbeatResponseProto {
-  optional int32 response_id = 1;
-  optional MasterKeyProto master_key = 2;
-  optional NodeActionProto nodeAction = 3;
-  repeated ContainerIdProto containers_to_cleanup = 4;
-  repeated ApplicationIdProto applications_to_cleanup = 5;
-}
-

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Mon Apr  1 16:47:16 2013
@@ -29,8 +29,10 @@ message RegisterNodeManagerRequestProto 
   optional int32 http_port = 3;
   optional ResourceProto resource = 4;
 }
+
 message RegisterNodeManagerResponseProto {
-  optional RegistrationResponseProto registration_response = 1;
+  optional MasterKeyProto master_key = 1;
+  optional NodeActionProto nodeAction = 2;
 }
 
 message NodeHeartbeatRequestProto {
@@ -38,6 +40,11 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_master_key = 2;
 }
 
+
 message NodeHeartbeatResponseProto {
-  optional HeartbeatResponseProto heartbeat_response = 1;
+  optional int32 response_id = 1;
+  optional MasterKeyProto master_key = 2;
+  optional NodeActionProto nodeAction = 3;
+  repeated ContainerIdProto containers_to_cleanup = 4;
+  repeated ApplicationIdProto applications_to_cleanup = 5;
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java Mon Apr  1 16:47:16 2013
@@ -25,8 +25,6 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl;
 import org.junit.Test;
 
 public class TestRecordFactory {
@@ -34,15 +32,6 @@ public class TestRecordFactory {
   @Test
   public void testPbRecordFactory() {
     RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
-    
-    try {
-      HeartbeatResponse response = pbRecordFactory.newRecordInstance(HeartbeatResponse.class);
-      Assert.assertEquals(HeartbeatResponsePBImpl.class, response.getClass());
-    } catch (YarnException e) {
-      e.printStackTrace();
-      Assert.fail("Failed to crete record");
-    }
-    
     try {
       NodeHeartbeatRequest request = pbRecordFactory.newRecordInstance(NodeHeartbeatRequest.class);
       Assert.assertEquals(NodeHeartbeatRequestPBImpl.class, request.getClass());

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Mon Apr  1 16:47:16 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -35,8 +37,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
+import org.apache.hadoop.util.Shell;
 
 public abstract class ContainerExecutor implements Configurable {
 
@@ -182,6 +186,54 @@ public abstract class ContainerExecutor 
       readLock.unlock();
     }
   }
+  
+  /** 
+   *  Return a command to execute the given command in OS shell.
+   *  On Windows, the passed in groupId can be used to launch
+   *  and associate the given groupId in a process group. On
+   *  non-Windows, groupId is ignored. 
+   */
+  protected static String[] getRunCommand(String command, String groupId,
+                                          Configuration conf) {
+    boolean containerSchedPriorityIsSet = false;
+    int containerSchedPriorityAdjustment = 
+        YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
+
+    if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != 
+        null) {
+      containerSchedPriorityIsSet = true;
+      containerSchedPriorityAdjustment = conf 
+          .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 
+          YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
+    }
+  
+    if (Shell.WINDOWS) {
+      return new String[] { Shell.WINUTILS, "task", "create", groupId,
+          "cmd /c " + command };
+    } else {
+      List<String> retCommand = new ArrayList<String>();
+      if (containerSchedPriorityIsSet) {
+        retCommand.addAll(Arrays.asList("nice", "-n",
+            Integer.toString(containerSchedPriorityAdjustment)));
+      }
+      retCommand.addAll(Arrays.asList("bash", "-c", command));
+      return retCommand.toArray(new String[retCommand.size()]);
+    }
+
+  }
+
+  /** Return a command for determining if process with specified pid is alive. */
+  protected static String[] getCheckProcessIsAliveCommand(String pid) {
+    return Shell.WINDOWS ?
+      new String[] { Shell.WINUTILS, "task", "isAlive", pid } :
+      new String[] { "kill", "-0", pid };
+  }
+
+  /** Return a command to send a signal to a given pid */
+  protected static String[] getSignalKillCommand(int code, String pid) {
+    return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } :
+      new String[] { "kill", "-" + code, pid };
+  }
 
   /**
    * Is the container still active?
@@ -253,6 +305,9 @@ public abstract class ContainerExecutor 
 
   public static final boolean isSetsidAvailable = isSetsidSupported();
   private static boolean isSetsidSupported() {
+    if (Shell.WINDOWS) {
+      return true;
+    }
     ShellCommandExecutor shexec = null;
     boolean setsidSupported = true;
     try {

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Mon Apr  1 16:47:16 2013
@@ -37,6 +37,8 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,10 +55,9 @@ public class DefaultContainerExecutor ex
   private static final Log LOG = LogFactory
       .getLog(DefaultContainerExecutor.class);
 
-  private final FileContext lfs;
+  private static final int WIN_MAX_PATH = 260;
 
-  private static final String WRAPPER_LAUNCH_SCRIPT = 
-      "default_container_executor.sh";
+  private final FileContext lfs;
 
   public DefaultContainerExecutor() {
     try {
@@ -145,15 +146,24 @@ public class DefaultContainerExecutor ex
     lfs.util().copy(nmPrivateTokensPath, tokenDst);
 
     // Create new local launch wrapper script
-    Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT);
-    DataOutputStream wrapperScriptOutStream =
-        lfs.create(wrapperScriptDst,
-            EnumSet.of(CREATE, OVERWRITE));
+    LocalWrapperScriptBuilder sb = Shell.WINDOWS ?
+      new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) :
+      new UnixLocalWrapperScriptBuilder(containerWorkDir);
+
+    // Fail fast if attempting to launch the wrapper script would fail due to
+    // Windows path length limitation.
+    if (Shell.WINDOWS &&
+        sb.getWrapperScriptPath().toString().length() > WIN_MAX_PATH) {
+      throw new IOException(String.format(
+        "Cannot launch container using script at path %s, because it exceeds " +
+        "the maximum supported path length of %d characters.  Consider " +
+        "configuring shorter directories in %s.", sb.getWrapperScriptPath(),
+        WIN_MAX_PATH, YarnConfiguration.NM_LOCAL_DIRS));
+    }
 
     Path pidFile = getPidFilePath(containerId);
     if (pidFile != null) {
-      writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri()
-          .getPath().toString(), pidFile.toString());
+      sb.writeLocalWrapperScript(launchDst, pidFile);
     } else {
       LOG.info("Container " + containerIdStr
           + " was marked as inactive. Returning terminated error");
@@ -166,12 +176,13 @@ public class DefaultContainerExecutor ex
     try {
       lfs.setPermission(launchDst,
           ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
-      lfs.setPermission(wrapperScriptDst,
+      lfs.setPermission(sb.getWrapperScriptPath(),
           ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
 
       // Setup command to run
-      String[] command = {"bash",
-          wrapperScriptDst.toUri().getPath().toString()};
+      String[] command = getRunCommand(sb.getWrapperScriptPath().toString(),
+        containerIdStr, this.getConf());
+
       LOG.info("launchContainer: " + Arrays.toString(command));
       shExec = new ShellCommandExecutor(
           command,
@@ -202,28 +213,85 @@ public class DefaultContainerExecutor ex
     return 0;
   }
 
-  private void writeLocalWrapperScript(DataOutputStream out,
-      String launchScriptDst, String pidFilePath) throws IOException {
-    // We need to do a move as writing to a file is not atomic
-    // Process reading a file being written to may get garbled data
-    // hence write pid to tmp file first followed by a mv
-    StringBuilder sb = new StringBuilder("#!/bin/bash\n\n");
-    sb.append("echo $$ > " + pidFilePath + ".tmp\n");
-    sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
-    sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
-    sb.append(" /bin/bash ");
-    sb.append("\"");
-    sb.append(launchScriptDst);
-    sb.append("\"\n");
-    PrintStream pout = null;
-    try {
-      pout = new PrintStream(out);
-      pout.append(sb);
-    } finally {
-      if (out != null) {
-        out.close();
+  private abstract class LocalWrapperScriptBuilder {
+
+    private final Path wrapperScriptPath;
+
+    public Path getWrapperScriptPath() {
+      return wrapperScriptPath;
+    }
+
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException {
+      DataOutputStream out = null;
+      PrintStream pout = null;
+
+      try {
+        out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
+        pout = new PrintStream(out);
+        writeLocalWrapperScript(launchDst, pidFile, pout);
+      } finally {
+        IOUtils.cleanup(LOG, pout, out);
       }
     }
+
+    protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile,
+        PrintStream pout);
+
+    protected LocalWrapperScriptBuilder(Path wrapperScriptPath) {
+      this.wrapperScriptPath = wrapperScriptPath;
+    }
+  }
+
+  private final class UnixLocalWrapperScriptBuilder
+      extends LocalWrapperScriptBuilder {
+
+    public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
+      super(new Path(containerWorkDir, "default_container_executor.sh"));
+    }
+
+    @Override
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile,
+        PrintStream pout) {
+
+      // We need to do a move as writing to a file is not atomic
+      // Process reading a file being written to may get garbled data
+      // hence write pid to tmp file first followed by a mv
+      pout.println("#!/bin/bash");
+      pout.println();
+      pout.println("echo $$ > " + pidFile.toString() + ".tmp");
+      pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
+      String exec = ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec";
+      pout.println(exec + " /bin/bash -c \"" +
+        launchDst.toUri().getPath().toString() + "\"");
+    }
+  }
+
+  private final class WindowsLocalWrapperScriptBuilder
+      extends LocalWrapperScriptBuilder {
+
+    private final String containerIdStr;
+
+    public WindowsLocalWrapperScriptBuilder(String containerIdStr,
+        Path containerWorkDir) {
+
+      super(new Path(containerWorkDir, "default_container_executor.cmd"));
+      this.containerIdStr = containerIdStr;
+    }
+
+    @Override
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile,
+        PrintStream pout) {
+
+      // On Windows, the pid is the container ID, so that it can also serve as
+      // the name of the job object created by winutils for task management.
+      // Write to temp file followed by atomic move.
+      String normalizedPidFile = new File(pidFile.toString()).getPath();
+      pout.println("@echo " + containerIdStr + " > " + normalizedPidFile +
+        ".tmp");
+      pout.println("@move /Y " + normalizedPidFile + ".tmp " +
+        normalizedPidFile);
+      pout.println("@call " + launchDst.toString());
+    }
   }
 
   @Override
@@ -234,17 +302,13 @@ public class DefaultContainerExecutor ex
         : pid;
     LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid
         + " as user " + user);
-    try {
-      sendSignal(sigpid, Signal.NULL);
-    } catch (ExitCodeException e) {
+    if (!containerIsAlive(sigpid)) {
       return false;
     }
     try {
-      sendSignal(sigpid, signal);
+      killContainer(sigpid, signal);
     } catch (IOException e) {
-      try {
-        sendSignal(sigpid, Signal.NULL);
-      } catch (IOException ignore) {
+      if (!containerIsAlive(sigpid)) {
         return false;
       }
       throw e;
@@ -253,17 +317,33 @@ public class DefaultContainerExecutor ex
   }
 
   /**
+   * Returns true if the process with the specified pid is alive.
+   * 
+   * @param pid String pid
+   * @return boolean true if the process is alive
+   */
+  private boolean containerIsAlive(String pid) throws IOException {
+    try {
+      new ShellCommandExecutor(getCheckProcessIsAliveCommand(pid)).execute();
+      // successful execution means process is alive
+      return true;
+    }
+    catch (ExitCodeException e) {
+      // failure (non-zero exit code) means process is not alive
+      return false;
+    }
+  }
+
+  /**
    * Send a specified signal to the specified pid
    *
    * @param pid the pid of the process [group] to signal.
    * @param signal signal to send
    * (for logging).
    */
-  protected void sendSignal(String pid, Signal signal) throws IOException {
-    ShellCommandExecutor shexec = null;
-    String[] arg = { "kill", "-" + signal.getValue(), pid };
-    shexec = new ShellCommandExecutor(arg);
-    shexec.execute();
+  private void killContainer(String pid, Signal signal) throws IOException {
+    new ShellCommandExecutor(getSignalKillCommand(signal.getValue(), pid))
+      .execute();
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Mon Apr  1 16:47:16 2013
@@ -50,6 +50,8 @@ public class LinuxContainerExecutor exte
 
   private String containerExecutorExe;
   private LCEResourcesHandler resourcesHandler;
+  private boolean containerSchedPriorityIsSet = false;
+  private int containerSchedPriorityAdjustment = 0;
   
   
   @Override
@@ -61,6 +63,13 @@ public class LinuxContainerExecutor exte
             conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
               DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
     resourcesHandler.setConf(conf);
+
+    if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != null) {
+      containerSchedPriorityIsSet = true;
+      containerSchedPriorityAdjustment = conf
+          .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 
+          YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
+    }
   }
 
   /**
@@ -114,6 +123,13 @@ public class LinuxContainerExecutor exte
       : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath);
   }
 
+  protected void addSchedPriorityCommand(List<String> command) {
+    if (containerSchedPriorityIsSet) {
+      command.addAll(Arrays.asList("nice", "-n",
+          Integer.toString(containerSchedPriorityAdjustment)));
+    }
+  }
+
   @Override 
   public void init() throws IOException {        
     // Send command to executor which will just start up, 
@@ -145,14 +161,15 @@ public class LinuxContainerExecutor exte
       List<String> localDirs, List<String> logDirs)
       throws IOException, InterruptedException {
 
-    List<String> command = new ArrayList<String>(
-      Arrays.asList(containerExecutorExe, 
-                    user, 
-                    Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
-                    appId,
-                    nmPrivateContainerTokensPath.toUri().getPath().toString(),
-                    StringUtils.join(",", localDirs),
-                    StringUtils.join(",", logDirs)));
+    List<String> command = new ArrayList<String>();
+    addSchedPriorityCommand(command);
+    command.addAll(Arrays.asList(containerExecutorExe,
+                   user, 
+                   Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
+                   appId,
+                   nmPrivateContainerTokensPath.toUri().getPath().toString(),
+                   StringUtils.join(",", localDirs),
+                   StringUtils.join(",", logDirs)));
 
     File jvm =                                  // use same jvm as parent
       new File(new File(System.getProperty("java.home"), "bin"), "java");
@@ -212,7 +229,9 @@ public class LinuxContainerExecutor exte
     try {
       Path pidFilePath = getPidFilePath(containerId);
       if (pidFilePath != null) {
-        List<String> command = new ArrayList<String>(Arrays.asList(
+        List<String> command = new ArrayList<String>();
+        addSchedPriorityCommand(command);
+        command.addAll(Arrays.asList(
             containerExecutorExe, user, Integer
                 .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
             containerIdStr, containerWorkDir.toString(),

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java Mon Apr  1 16:47:16 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.no
 
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Timer;
@@ -305,7 +304,7 @@ public class LocalDirsHandlerService ext
     ArrayList<String> validPaths = new ArrayList<String>();
     for (int i = 0; i < paths.length; ++i) {
       try {
-        URI uriPath = new URI(paths[i]);
+        URI uriPath = (new Path(paths[i])).toUri();
         if (uriPath.getScheme() == null
             || uriPath.getScheme().equals(FILE_SCHEME)) {
           validPaths.add(uriPath.getPath());
@@ -316,7 +315,7 @@ public class LocalDirsHandlerService ext
               + " is not a valid path. Path should be with " + FILE_SCHEME
               + " scheme or without scheme");
         }
-      } catch (URISyntaxException e) {
+      } catch (IllegalArgumentException e) {
         LOG.warn(e.getMessage());
         throw new YarnException(paths[i]
             + " is not a valid path. Path should be with " + FILE_SCHEME

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Apr  1 16:47:16 2013
@@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.service.Co
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.Records;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class NodeManager extends CompositeService 
     implements EventHandler<NodeManagerEvent> {
 
@@ -113,6 +115,10 @@ public class NodeManager extends Composi
     return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);
   }
 
+  protected DeletionService createDeletionService(ContainerExecutor exec) {
+    return new DeletionService(exec);
+  }
+
   protected void doSecureLogin() throws IOException {
     SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
         YarnConfiguration.NM_PRINCIPAL);
@@ -143,7 +149,7 @@ public class NodeManager extends Composi
     } catch (IOException e) {
       throw new YarnException("Failed to initialize container executor", e);
     }    
-    DeletionService del = new DeletionService(exec);
+    DeletionService del = createDeletionService(exec);
     addService(del);
 
     // NodeManager level dispatcher
@@ -350,6 +356,11 @@ public class NodeManager extends Composi
   ContainerManagerImpl getContainerManager() {
     return containerManager;
   }
+  
+  @VisibleForTesting
+  Context getNMContext() {
+    return this.context;
+  }
 
   public static void main(String[] args) {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());

Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Apr  1 16:47:16 2013
@@ -50,12 +50,12 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
-import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -151,7 +151,6 @@ public class NodeStatusUpdaterImpl exten
         YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS,
         YarnConfiguration.DEFAULT_NM_WEBAPP_PORT);
     try {
-      //      this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
       this.httpPort = httpBindAddress.getPort();
       // Registration has to be in start so that ContainerManager can get the
       // perNM tokens needed to authenticate ContainerTokens.
@@ -189,23 +188,91 @@ public class NodeStatusUpdaterImpl exten
   }
 
   private void registerWithRM() throws YarnRemoteException {
-    this.resourceTracker = getRMClient();
-    LOG.info("Connecting to ResourceManager at " + this.rmAddress);
-    
-    RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+    Configuration conf = getConfig();
+    long rmConnectWaitMS =
+        conf.getInt(
+            YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+            YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
+        * 1000;
+    long rmConnectionRetryIntervalMS =
+        conf.getLong(
+            YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+            YarnConfiguration
+                .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS)
+        * 1000;
+
+    if(rmConnectionRetryIntervalMS < 0) {
+      throw new YarnException("Invalid Configuration. " +
+          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS +
+          " should not be negative.");
+    }
+
+    boolean waitForEver = (rmConnectWaitMS == -1000);
+
+    if(! waitForEver) {
+      if(rmConnectWaitMS < 0) {
+          throw new YarnException("Invalid Configuration. " +
+              YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS +
+              " can be -1, but can not be other negative numbers");
+      }
+
+      //try connect once
+      if(rmConnectWaitMS < rmConnectionRetryIntervalMS) {
+        LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS
+            + " is smaller than "
+            + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS
+            + ". Only try connect once.");
+        rmConnectWaitMS = 0;
+      }
+    }
+
+    int rmRetryCount = 0;
+    long waitStartTime = System.currentTimeMillis();
+
+    RegisterNodeManagerRequest request =
+        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
     request.setHttpPort(this.httpPort);
     request.setResource(this.totalResource);
     request.setNodeId(this.nodeId);
-    RegistrationResponse regResponse =
-        this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
+    RegisterNodeManagerResponse regNMResponse;
+
+    while(true) {
+      try {
+        rmRetryCount++;
+        LOG.info("Connecting to ResourceManager at " + this.rmAddress
+            + ". current no. of attempts is " + rmRetryCount);
+        this.resourceTracker = getRMClient();
+        regNMResponse =
+            this.resourceTracker.registerNodeManager(request);
+        break;
+      } catch(Throwable e) {
+        LOG.warn("Trying to connect to ResourceManager, " +
+            "current no. of failed attempts is "+rmRetryCount);
+        if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
+            || waitForEver) {
+          try {
+            LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+                + " seconds before next connection retry to RM");
+            Thread.sleep(rmConnectionRetryIntervalMS);
+          } catch(InterruptedException ex) {
+            //done nothing
+          }
+        } else {
+          String errorMessage = "Failed to Connect to RM, " +
+              "no. of failed attempts is "+rmRetryCount;
+          LOG.error(errorMessage,e);
+          throw new YarnException(errorMessage,e);
+        }
+      }
+    }
     // if the Resourcemanager instructs NM to shutdown.
-    if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
+    if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
       throw new YarnException(
           "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
     }
 
     if (UserGroupInformation.isSecurityEnabled()) {
-      MasterKey masterKey = regResponse.getMasterKey();
+      MasterKey masterKey = regNMResponse.getMasterKey();
       // do this now so that its set before we start heartbeating to RM
       LOG.info("Security enabled - updating secret keys now");
       // It is expected that status updater is started by this point and
@@ -340,8 +407,8 @@ public class NodeStatusUpdaterImpl exten
               request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
                 .getContainerTokenSecretManager().getCurrentKey());
             }
-            HeartbeatResponse response =
-              resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+            NodeHeartbeatResponse response =
+              resourceTracker.nodeHeartbeat(request);
 
             // See if the master-key has rolled over
             if (isSecurityEnabled()) {
@@ -371,14 +438,14 @@ public class NodeStatusUpdaterImpl exten
 
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
-                .getContainersToCleanupList();
+                .getContainersToCleanup();
             if (containersToCleanup.size() != 0) {
               dispatcher.getEventHandler().handle(
                   new CMgrCompletedContainersEvent(containersToCleanup, 
                       CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
             }
             List<ApplicationId> appsToCleanup =
-                response.getApplicationsToCleanupList();
+                response.getApplicationsToCleanup();
             //Only start tracking for keepAlive on FINISH_APP
             trackAppsForKeepAlive(appsToCleanup);
             if (appsToCleanup.size() != 0) {