You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:03 UTC

svn commit: r1196458 [11/19] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-cl...

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml Wed Nov  2 05:34:31 2011
@@ -16,16 +16,18 @@
   <parent>
     <artifactId>hadoop-yarn-server</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>${yarn.version}</version>
+    <version>0.24.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+  <version>0.24.0-SNAPSHOT</version>
   <name>hadoop-yarn-server-nodemanager</name>
 
   <properties>
-    <install.file>${project.artifact.file}</install.file>
+    <!-- Basedir eeded for generating FindBugs warnings using parent pom -->
     <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+    <container-executor.conf.dir>/etc/hadoop</container-executor.conf.dir>
   </properties>
 
   <dependencies>
@@ -37,7 +39,7 @@
 
   <profiles>
     <profile>
-      <id>cbuild</id>
+      <id>native</id>
       <build>
         <plugins>
           <plugin>
@@ -46,61 +48,48 @@
             <version>1.0-beta-1</version>
             <executions>
               <execution>
-                <id>autoreconf</id>
-                <phase>package</phase>
-                <configuration>
-                  <arguments>
-                    <argument>-i</argument>
-                  </arguments>
-                  <workDir>src/main/c/container-executor</workDir>
-                </configuration>
+                <id>compile</id>
+                <phase>compile</phase>
                 <goals>
                   <goal>autoreconf</goal>
-                </goals>
-              </execution>
-              <execution>
-                <id>make</id>
-                <phase>package</phase>
-                <configuration>
-                  <workDir>src/main/c/container-executor</workDir>
-                  <configureEnvironment>
-                    <property>
-                      <name>CFLAGS</name>
-                      <value>-DHADOOP_CONF_DIR=${container-executor.conf.dir}</value>
-                    </property>
-                  </configureEnvironment>
-                  <sources>
-                    <source>
-                      <directory>src/main/c/container-executor</directory>
-                    </source>
-                  </sources>
-                  <workDir>src/main/c/container-executor</workDir>
-                  <destDir>target</destDir>
-                  <prefix>${project.build.outputDirectory}</prefix>
-                </configuration>
-                <goals>
-                  <!-- always clean, to ensure conf dir regenerated -->
-                  <goal>make-clean</goal>
                   <goal>configure</goal>
+                  <goal>make-install</goal>
                 </goals>
               </execution>
               <execution>
-                <id>install</id>
-                <phase>package</phase>
-                <configuration>
-                  <destDir>/</destDir>
-                  <workDir>src/main/c/container-executor</workDir>
-                </configuration>
+                <id>test</id>
+                <phase>test</phase>
                 <goals>
-                  <goal>make-install</goal>
+                  <goal>test</goal>
                 </goals>
               </execution>
             </executions>
+            <configuration>
+              <!-- autoreconf settings -->
+              <workDir>${project.build.directory}/native/container-executor</workDir>
+              <arguments>
+                <argument>-i</argument>
+              </arguments>
+
+              <!-- configure settings -->
+              <configureEnvironment>
+                <property>
+                  <name>CFLAGS</name>
+                  <value>-DHADOOP_CONF_DIR=${container-executor.conf.dir}</value>
+                </property>
+              </configureEnvironment>
+              <configureWorkDir>${project.build.directory}/native/container-executor</configureWorkDir>
+              <prefix>/usr/local</prefix>
+
+              <!-- configure & make settings -->
+              <destDir>${project.build.directory}/native/target</destDir>
+
+            </configuration>
           </plugin>
         </plugins>
       </build>
       <activation>
-        <activeByDefault>true</activeByDefault>
+        <activeByDefault>false</activeByDefault>
       </activation>
     </profile>
     <profile>
@@ -145,8 +134,12 @@
         <configuration>
           <systemPropertyVariables>
             <property>
-              <name>container-executor-path</name>
-              <value></value>
+              <name>container-executor.path</name>
+              <value>${container-executor.path}</value>
+            </property>
+            <property>
+              <name>application.submitter</name>
+              <value>${application.submitter}</value>
             </property>
           </systemPropertyVariables>
           <excludes>
@@ -170,6 +163,21 @@
               <goal>run</goal>
             </goals>
           </execution>
+          <execution>
+            <id>compile</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <target>
+                <mkdir dir="${project.build.directory}/native"/>
+                <copy toDir="${project.build.directory}/native">
+                  <fileset dir="${basedir}/src/main/native"/>
+                </copy>
+              </target>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Wed Nov  2 05:34:31 2011
@@ -19,12 +19,13 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
-
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +36,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
 
 public abstract class ContainerExecutor implements Configurable {
 
@@ -43,8 +45,12 @@ public abstract class ContainerExecutor 
     FsPermission.createImmutable((short) 0700);
 
   private Configuration conf;
-  protected ConcurrentMap<ContainerId, ShellCommandExecutor> launchCommandObjs =
-      new ConcurrentHashMap<ContainerId, ShellCommandExecutor>();
+  private ConcurrentMap<ContainerId, Path> pidFiles =
+      new ConcurrentHashMap<ContainerId, Path>();
+
+  private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final ReadLock readLock = lock.readLock();
+  private final WriteLock writeLock = lock.writeLock();
 
   @Override
   public void setConf(Configuration conf) {
@@ -102,7 +108,8 @@ public abstract class ContainerExecutor 
       throws IOException, InterruptedException;
 
   public enum ExitCode {
-    KILLED(137);
+    FORCE_KILLED(137),
+    TERMINATED(143);
     private final int code;
 
     private ExitCode(int exitCode) {
@@ -150,6 +157,66 @@ public abstract class ContainerExecutor 
   }
 
   /**
+   * Get the pidFile of the container.
+   * @param containerId
+   * @return the path of the pid-file for the given containerId.
+   */
+  protected Path getPidFilePath(ContainerId containerId) {
+    try {
+      readLock.lock();
+      return (this.pidFiles.get(containerId));
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Is the container still active?
+   * @param containerId
+   * @return true if the container is active else false.
+   */
+  protected boolean isContainerActive(ContainerId containerId) {
+    try {
+      readLock.lock();
+      return (this.pidFiles.containsKey(containerId));
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Mark the container as active
+   * 
+   * @param containerId
+   *          the ContainerId
+   * @param pidFilePath
+   *          Path where the executor should write the pid of the launched
+   *          process
+   */
+  public void activateContainer(ContainerId containerId, Path pidFilePath) {
+    try {
+      writeLock.lock();
+      this.pidFiles.put(containerId, pidFilePath);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Mark the container as inactive.
+   * Done iff the container is still active. Else treat it as
+   * a no-op
+   */
+  public void deactivateContainer(ContainerId containerId) {
+    try {
+      writeLock.lock();
+      this.pidFiles.remove(containerId);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
    * Get the process-identifier for the container
    * 
    * @param containerID
@@ -158,28 +225,15 @@ public abstract class ContainerExecutor 
    */
   public String getProcessId(ContainerId containerID) {
     String pid = null;
-    ShellCommandExecutor shExec = launchCommandObjs.get(containerID);
-    if (shExec == null) {
+    Path pidFile = pidFiles.get(containerID);
+    if (pidFile == null) {
       // This container isn't even launched yet.
       return pid;
     }
-    Process proc = shExec.getProcess();
-    if (proc == null) {
-      // This happens if the command is not yet started
-      return pid;
-    }
     try {
-      Field pidField = proc.getClass().getDeclaredField("pid");
-      pidField.setAccessible(true);
-      pid = ((Integer) pidField.get(proc)).toString();
-    } catch (SecurityException e) {
-      // SecurityManager not expected with yarn. Ignore.
-    } catch (NoSuchFieldException e) {
-      // Yarn only on UNIX for now. Ignore.
-    } catch (IllegalArgumentException e) {
-      ;
-    } catch (IllegalAccessException e) {
-      ;
+      pid = ProcessIdFileReader.getProcessId(pidFile);
+    } catch (IOException e) {
+      LOG.error("Got exception reading pid from pid-file " + pidFile, e);
     }
     return pid;
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java Wed Nov  2 05:34:31 2011
@@ -20,5 +20,5 @@ package org.apache.hadoop.yarn.server.no
 
 public enum ContainerManagerEventType {
   FINISH_APPS,
-  FINISH_CONTAINERS
+  FINISH_CONTAINERS,
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Wed Nov  2 05:34:31 2011
@@ -18,10 +18,16 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -48,6 +54,9 @@ public class DefaultContainerExecutor ex
 
   private final FileContext lfs;
 
+  private static final String WRAPPER_LAUNCH_SCRIPT = 
+      "default_container_executor.sh";
+
   public DefaultContainerExecutor() {
     try {
       this.lfs = FileContext.getLocalFSFileContext();
@@ -80,8 +89,9 @@ public class DefaultContainerExecutor ex
     String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
     Path tokenDst = new Path(appStorageDir, tokenFn);
     lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
+    LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
     lfs.setWorkingDirectory(appStorageDir);
-
+    LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
     // TODO: DO it over RPC for maintaining similarity?
     localizer.runLocalization(nmAddr);
   }
@@ -100,8 +110,9 @@ public class DefaultContainerExecutor ex
         ConverterUtils.toString(
             container.getContainerID().getApplicationAttemptId().
                 getApplicationId());
-    String[] sLocalDirs =
-        getConf().getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
+    String[] sLocalDirs = getConf().getStrings(
+        YarnConfiguration.NM_LOCAL_DIRS,
+        YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
     for (String sLocalDir : sLocalDirs) {
       Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
       Path userdir = new Path(usersdir, userName);
@@ -124,21 +135,47 @@ public class DefaultContainerExecutor ex
       new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
     lfs.util().copy(nmPrivateTokensPath, tokenDst);
 
+    // Create new local launch wrapper script
+    Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT);
+    DataOutputStream wrapperScriptOutStream =
+        lfs.create(wrapperScriptDst,
+            EnumSet.of(CREATE, OVERWRITE));
+
+    Path pidFile = getPidFilePath(containerId);
+    if (pidFile != null) {
+      writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri()
+          .getPath().toString(), pidFile.toString());
+    } else {
+      LOG.info("Container " + containerIdStr
+          + " was marked as inactive. Returning terminated error");
+      return ExitCode.TERMINATED.getExitCode();
+    }
+
     // create log dir under app
     // fork script
     ShellCommandExecutor shExec = null;
     try {
       lfs.setPermission(launchDst,
           ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
-      String[] command = 
-          new String[] { "bash", "-c", launchDst.toUri().getPath().toString() };
+      lfs.setPermission(wrapperScriptDst,
+          ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
+
+      // Setup command to run
+      String[] command = {"bash", "-c",
+          wrapperScriptDst.toUri().getPath().toString()};
       LOG.info("launchContainer: " + Arrays.toString(command));
       shExec = new ShellCommandExecutor(
           command,
-          new File(containerWorkDir.toUri().getPath()), 
+          new File(containerWorkDir.toUri().getPath()),
           container.getLaunchContext().getEnvironment());      // sanitized env
-      launchCommandObjs.put(containerId, shExec);
-      shExec.execute();
+      if (isContainerActive(containerId)) {
+        shExec.execute();
+      }
+      else {
+        LOG.info("Container " + containerIdStr +
+            " was marked as inactive. Returning terminated error");
+        return ExitCode.TERMINATED.getExitCode();
+      }
     } catch (IOException e) {
       if (null == shExec) {
         return -1;
@@ -151,17 +188,44 @@ public class DefaultContainerExecutor ex
           message));
       return exitCode;
     } finally {
-      launchCommandObjs.remove(containerId);
+      ; //
     }
     return 0;
   }
 
+  private void writeLocalWrapperScript(DataOutputStream out,
+      String launchScriptDst, String pidFilePath) throws IOException {
+    // We need to do a move as writing to a file is not atomic
+    // Process reading a file being written to may get garbled data
+    // hence write pid to tmp file first followed by a mv
+    StringBuilder sb = new StringBuilder("#!/bin/bash\n\n");
+    sb.append("echo $$ > " + pidFilePath + ".tmp\n");
+    sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
+    sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
+    sb.append(" /bin/bash ");
+    sb.append("-c ");
+    sb.append("\"");
+    sb.append(launchScriptDst);
+    sb.append("\"\n");
+    PrintStream pout = null;
+    try {
+      pout = new PrintStream(out);
+      pout.append(sb);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
   @Override
   public boolean signalContainer(String user, String pid, Signal signal)
       throws IOException {
     final String sigpid = ContainerExecutor.isSetsidAvailable
         ? "-" + pid
         : pid;
+    LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid
+        + " as user " + user);
     try {
       sendSignal(sigpid, Signal.NULL);
     } catch (ExitCodeException e) {
@@ -189,8 +253,8 @@ public class DefaultContainerExecutor ex
    */
   protected void sendSignal(String pid, Signal signal) throws IOException {
     ShellCommandExecutor shexec = null;
-      String[] arg = { "kill", "-" + signal.getValue(), pid };
-      shexec = new ShellCommandExecutor(arg);
+    String[] arg = { "kill", "-" + signal.getValue(), pid };
+    shexec = new ShellCommandExecutor(arg);
     shexec.execute();
   }
 
@@ -199,13 +263,18 @@ public class DefaultContainerExecutor ex
       throws IOException, InterruptedException {
     if (baseDirs == null || baseDirs.length == 0) {
       LOG.info("Deleting absolute path : " + subDir);
-      lfs.delete(subDir, true);
+      if (!lfs.delete(subDir, true)) {
+        //Maybe retry
+        LOG.warn("delete returned false for path: [" + subDir + "]");
+      }
       return;
     }
     for (Path baseDir : baseDirs) {
       Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
       LOG.info("Deleting path : " + del);
-      lfs.delete(del, true);
+      if (!lfs.delete(del, true)) {
+        LOG.warn("delete returned false for path: [" + del + "]");
+      }
     }
   }
 
@@ -335,12 +404,6 @@ public class DefaultContainerExecutor ex
     FsPermission appperms = new FsPermission(APPDIR_PERM);
     for (Path localDir : localDirs) {
       Path fullAppDir = getApplicationDir(localDir, user, appId);
-      if (lfs.util().exists(fullAppDir)) {
-        // this will happen on a partial execution of localizeJob. Sometimes
-        // copying job.xml to the local disk succeeds but copying job.jar might
-        // throw out an exception. We should clean up and then try again.
-        lfs.delete(fullAppDir, true);
-      }
       // create $local.dir/usercache/$user/appcache/$appId
       try {
         lfs.mkdir(fullAppDir, appperms, true);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Wed Nov  2 05:34:31 2011
@@ -19,8 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import static java.util.concurrent.TimeUnit.*;
 
@@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.service.Ab
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 public class DeletionService extends AbstractService {
   static final Log LOG = LogFactory.getLog(DeletionService.class);
   private int debugDelay;
@@ -71,12 +73,17 @@ public class DeletionService extends Abs
 
   @Override
   public void init(Configuration conf) {
+    ThreadFactory tf = new ThreadFactoryBuilder()
+      .setNameFormat("DeletionService #%d")
+      .build();
     if (conf != null) {
       sched = new ScheduledThreadPoolExecutor(
-          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT));
+          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT),
+          tf);
       debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
     } else {
-      sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT);
+      sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
+          tf);
     }
     sched.setKeepAliveTime(60L, SECONDS);
     super.init(conf);
@@ -125,6 +132,7 @@ public class DeletionService extends Abs
         }
       } else {
         try {
+          LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
           exec.deleteAsUser(user, subDir, baseDirs);
         } catch (IOException e) {
           LOG.warn("Failed to delete as user " + user, e);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Wed Nov  2 05:34:31 2011
@@ -56,11 +56,10 @@ public class LinuxContainerExecutor exte
    * List of commands that the setuid script will execute.
    */
   enum Commands {
-    INITIALIZE_JOB(0),
+    INITIALIZE_CONTAINER(0),
     LAUNCH_CONTAINER(1),
     SIGNAL_CONTAINER(2),
-    DELETE_AS_USER(3),
-    DELETE_LOG_AS_USER(4);
+    DELETE_AS_USER(3);
 
     private int value;
     Commands(int value) {
@@ -78,8 +77,9 @@ public class LinuxContainerExecutor exte
   enum ResultCode {
     OK(0),
     INVALID_USER_NAME(2),
-    INVALID_TASK_PID(9),
-    INVALID_TASKCONTROLLER_PERMISSIONS(22),
+    UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
+    INVALID_CONTAINER_PID(9),
+    INVALID_CONTAINER_EXEC_PERMISSIONS(22),
     INVALID_CONFIG_FILE(24);
 
     private final int value;
@@ -107,7 +107,7 @@ public class LinuxContainerExecutor exte
     List<String> command = new ArrayList<String>(
       Arrays.asList(containerExecutorExe, 
                     user, 
-                    Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+                    Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
                     appId,
                     nmPrivateContainerTokensPath.toUri().getPath().toString()));
     File jvm =                                  // use same jvm as parent
@@ -115,6 +115,10 @@ public class LinuxContainerExecutor exte
     command.add(jvm.toString());
     command.add("-classpath");
     command.add(System.getProperty("java.class.path"));
+    String javaLibPath = System.getProperty("java.library.path");
+    if (javaLibPath != null) {
+      command.add("-Djava.library.path=" + javaLibPath);
+    }
     command.add(ContainerLocalizer.class.getName());
     command.add(user);
     command.add(appId);
@@ -151,41 +155,49 @@ public class LinuxContainerExecutor exte
 
     ContainerId containerId = container.getContainerID();
     String containerIdStr = ConverterUtils.toString(containerId);
-    List<String> command = new ArrayList<String>(
-      Arrays.asList(containerExecutorExe, 
-                    user, 
-                    Integer.toString(Commands.LAUNCH_CONTAINER.getValue()),
-                    appId,
-                    containerIdStr,
-                    containerWorkDir.toString(),
-                    nmPrivateCotainerScriptPath.toUri().getPath().toString(),
-                    nmPrivateTokensPath.toUri().getPath().toString()));
-    String[] commandArray = command.toArray(new String[command.size()]);
-    ShellCommandExecutor shExec = 
-        new ShellCommandExecutor(
-            commandArray,
-            null,                                              // NM's cwd
-            container.getLaunchContext().getEnvironment());    // sanitized env
-    launchCommandObjs.put(containerId, shExec);
-    // DEBUG
-    LOG.info("launchContainer: " + Arrays.toString(commandArray));
-    String output = shExec.getOutput();
+
+    ShellCommandExecutor shExec = null;
+
     try {
-      shExec.execute();
-      if (LOG.isDebugEnabled()) {
-        logOutput(output);
+      Path pidFilePath = getPidFilePath(containerId);
+      if (pidFilePath != null) {
+        List<String> command = new ArrayList<String>(Arrays.asList(
+            containerExecutorExe, user, Integer
+                .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
+            containerIdStr, containerWorkDir.toString(),
+            nmPrivateCotainerScriptPath.toUri().getPath().toString(),
+            nmPrivateTokensPath.toUri().getPath().toString(), pidFilePath
+                .toString()));
+        String[] commandArray = command.toArray(new String[command.size()]);
+        shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
+            container.getLaunchContext().getEnvironment()); // sanitized env
+        // DEBUG
+        LOG.info("launchContainer: " + Arrays.toString(commandArray));
+        shExec.execute();
+        if (LOG.isDebugEnabled()) {
+          logOutput(shExec.getOutput());
+        }
+      } else {
+        LOG.info("Container was marked as inactive. Returning terminated error");
+        return ExitCode.TERMINATED.getExitCode();
       }
     } catch (ExitCodeException e) {
+
+      if (null == shExec) {
+        return -1;
+      }
+
       int exitCode = shExec.getExitCode();
       LOG.warn("Exit code from container is : " + exitCode);
       // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
       // terminated/killed forcefully. In all other cases, log the
       // container-executor's output
-      if (exitCode != 143 && exitCode != 137) {
+      if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
+          && exitCode != ExitCode.TERMINATED.getExitCode()) {
         LOG.warn("Exception from container-launch : ", e);
-        logOutput(output);
+        logOutput(shExec.getOutput());
         String diagnostics = "Exception from container-launch: \n"
-            + StringUtils.stringifyException(e) + "\n" + output;
+            + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
         container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
             diagnostics));
       } else {
@@ -194,11 +206,11 @@ public class LinuxContainerExecutor exte
       }
       return exitCode;
     } finally {
-      launchCommandObjs.remove(containerId);
+      ; //
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
-      logOutput(output);
+      logOutput(shExec.getOutput());
     }
     return 0;
   }
@@ -221,7 +233,7 @@ public class LinuxContainerExecutor exte
       shExec.execute();
     } catch (ExitCodeException e) {
       int ret_code = shExec.getExitCode();
-      if (ret_code == ResultCode.INVALID_TASK_PID.getValue()) {
+      if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) {
         return false;
       }
       logOutput(shExec.getOutput());

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Nov  2 05:34:31 2011
@@ -46,15 +46,19 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
 import org.apache.hadoop.yarn.util.Records;
 
-public class NodeManager extends CompositeService {
+public class NodeManager extends CompositeService implements
+    ServiceStateChangeListener {
   private static final Log LOG = LogFactory.getLog(NodeManager.class);
   protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
   protected ContainerTokenSecretManager containerTokenSecretManager;
+  private ApplicationACLsManager aclsManager;
 
   public NodeManager() {
     super(NodeManager.class.getName());
@@ -74,14 +78,14 @@ public class NodeManager extends Composi
   protected ContainerManagerImpl createContainerManager(Context context,
       ContainerExecutor exec, DeletionService del,
       NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager 
-      containerTokenSecretManager) {
+      containerTokenSecretManager, ApplicationACLsManager aclsManager) {
     return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
-                                    metrics, containerTokenSecretManager);
+        metrics, containerTokenSecretManager, aclsManager);
   }
 
   protected WebServer createWebServer(Context nmContext,
-      ResourceView resourceView) {
-    return new WebServer(nmContext, resourceView);
+      ResourceView resourceView, ApplicationACLsManager aclsManager) {
+    return new WebServer(nmContext, resourceView, aclsManager);
   }
 
   protected void doSecureLogin() throws IOException {
@@ -101,6 +105,8 @@ public class NodeManager extends Composi
       this.containerTokenSecretManager = new ContainerTokenSecretManager();
     }
 
+    this.aclsManager = new ApplicationACLsManager(conf);
+
     ContainerExecutor exec = ReflectionUtils.newInstance(
         conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
           DefaultContainerExecutor.class, ContainerExecutor.class), conf);
@@ -119,17 +125,19 @@ public class NodeManager extends Composi
     NodeStatusUpdater nodeStatusUpdater =
         createNodeStatusUpdater(context, dispatcher, healthChecker, 
         this.containerTokenSecretManager);
+    
+    nodeStatusUpdater.register(this);
 
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
     addService(nodeResourceMonitor);
 
     ContainerManagerImpl containerManager =
         createContainerManager(context, exec, del, nodeStatusUpdater,
-        this.containerTokenSecretManager);
+        this.containerTokenSecretManager, this.aclsManager);
     addService(containerManager);
 
-    Service webServer =
-        createWebServer(context, containerManager.getContainersMonitor());
+    Service webServer = createWebServer(context, containerManager
+        .getContainersMonitor(), this.aclsManager);
     addService(webServer);
 
     dispatcher.register(ContainerManagerEventType.class, containerManager);
@@ -202,6 +210,16 @@ public class NodeManager extends Composi
     }
   }
 
+  
+  @Override
+  public void stateChanged(Service service) {
+    // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
+    if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
+        && STATE.STOPPED.equals(service.getServiceState())) {
+      stop();
+    }
+  }
+  
   public static void main(String[] args) {
     StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
     try {
@@ -216,5 +234,4 @@ public class NodeManager extends Composi
       System.exit(-1);
     }
   }
-
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Nov  2 05:34:31 2011
@@ -30,8 +30,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -45,11 +45,11 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -100,9 +100,9 @@ public class NodeStatusUpdaterImpl exten
     this.heartBeatInterval =
         conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
             YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
-    int memory = conf.getInt(YarnConfiguration.NM_VMEM_GB, YarnConfiguration.DEFAULT_NM_VMEM_GB);
+    int memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
     this.totalResource = recordFactory.newRecordInstance(Resource.class);
-    this.totalResource.setMemory(memory * 1024);
+    this.totalResource.setMemory(memoryMb);
     metrics.addResource(totalResource);
     super.init(conf);
   }
@@ -117,7 +117,9 @@ public class NodeStatusUpdaterImpl exten
       getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS,
           YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
     InetSocketAddress httpBindAddress =
-      NetUtils.createSocketAddr(httpBindAddressStr);
+      NetUtils.createSocketAddr(httpBindAddressStr,
+        YarnConfiguration.DEFAULT_NM_WEBAPP_PORT,
+        YarnConfiguration.NM_WEBAPP_ADDRESS);
     try {
       //      this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
       this.httpPort = httpBindAddress.getPort();
@@ -141,7 +143,9 @@ public class NodeStatusUpdaterImpl exten
   protected ResourceTracker getRMClient() {
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
-    InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.rmAddress);
+    InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.rmAddress,
+      YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT,
+      YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
     return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
         conf);
   }
@@ -156,6 +160,12 @@ public class NodeStatusUpdaterImpl exten
     request.setNodeId(this.nodeId);
     RegistrationResponse regResponse =
         this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
+    // if the Resourcemanager instructs NM to shutdown.
+    if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
+      throw new YarnException(
+          "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+    }
+    
     if (UserGroupInformation.isSecurityEnabled()) {
       this.secretKeyBytes = regResponse.getSecretKey().array();
     }
@@ -231,7 +241,7 @@ public class NodeStatusUpdaterImpl exten
 
   protected void startStatusUpdater() {
 
-    new Thread() {
+    new Thread("Node Status Updater") {
       @Override
       public void run() {
         int lastHeartBeatID = 0;
@@ -244,10 +254,25 @@ public class NodeStatusUpdaterImpl exten
             NodeStatus nodeStatus = getNodeStatus();
             nodeStatus.setResponseId(lastHeartBeatID);
             
-            NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
+            NodeHeartbeatRequest request = recordFactory
+                .newRecordInstance(NodeHeartbeatRequest.class);
             request.setNodeStatus(nodeStatus);            
             HeartbeatResponse response =
               resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+            if (response.getNodeAction() == NodeAction.SHUTDOWN) {
+              LOG
+                  .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
+                  		" hence shutting down.");
+              NodeStatusUpdaterImpl.this.stop();
+              break;
+            }
+            if (response.getNodeAction() == NodeAction.REBOOT) {
+              LOG.info("Node is out of sync with ResourceManager,"
+                  + " hence shutting down.");
+              NodeStatusUpdaterImpl.this.stop();
+              break;
+            }
+
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
                 .getContainersToCleanupList();
@@ -262,8 +287,9 @@ public class NodeStatusUpdaterImpl exten
                   new CMgrCompletedAppsEvent(appsToCleanup));
             }
           } catch (Throwable e) {
+            // TODO Better error handling. Thread can die with the rest of the
+            // NM still running.
             LOG.error("Caught exception in status-updater", e);
-            break;
           }
         }
       }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Wed Nov  2 05:34:31 2011
@@ -42,8 +42,8 @@ public class AuxServices extends Abstrac
 
   private static final Log LOG = LogFactory.getLog(AuxServices.class);
 
-  public final Map<String,AuxiliaryService> serviceMap;
-  public final Map<String,ByteBuffer> serviceMeta;
+  protected final Map<String,AuxiliaryService> serviceMap;
+  protected final Map<String,ByteBuffer> serviceMeta;
 
   public AuxServices() {
     super(AuxServices.class.getName());
@@ -157,20 +157,24 @@ public class AuxServices extends Abstrac
 
   @Override
   public void handle(AuxServicesEvent event) {
-    LOG.info("Got event " + event.getType() + " for service "
-        + event.getServiceID());
-    AuxiliaryService service = serviceMap.get(event.getServiceID());
-    if (null == service) {
-      // TODO kill all containers waiting on Application
-      return;
-    }
+    LOG.info("Got event " + event.getType() + " for appId "
+        + event.getApplicationID());
     switch (event.getType()) {
     case APPLICATION_INIT:
+      LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
+      AuxiliaryService service = serviceMap.get(event.getServiceID());
+      if (null == service) {
+        LOG.info("service is null");
+        // TODO kill all containers waiting on Application
+        return;
+      }
       service.initApp(event.getUser(), event.getApplicationID(),
           event.getServiceData());
       break;
     case APPLICATION_STOP:
-      service.stopApp(event.getApplicationID());
+      for (AuxiliaryService serv : serviceMap.values()) {
+        serv.stopApp(event.getApplicationID());
+      }
       break;
     default:
       throw new RuntimeException("Unknown type: " + event.getType());

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Nov  2 05:34:31 2011
@@ -27,17 +27,19 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -58,7 +61,7 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -69,6 +72,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
@@ -83,11 +87,15 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
@@ -102,9 +110,10 @@ public class ContainerManagerImpl extend
   final Context context;
   private final ContainersMonitor containersMonitor;
   private Server server;
+  private InetAddress resolvedAddress = null;
   private final ResourceLocalizationService rsrcLocalizationSrvc;
   private final ContainersLauncher containersLauncher;
-  private final AuxServices auxiluaryServices;
+  private final AuxServices auxiliaryServices;
   private final NodeManagerMetrics metrics;
 
   private final NodeStatusUpdater nodeStatusUpdater;
@@ -113,13 +122,14 @@ public class ContainerManagerImpl extend
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   
   protected final AsyncDispatcher dispatcher;
+  private final ApplicationACLsManager aclsManager;
 
   private final DeletionService deletionService;
 
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
       NodeManagerMetrics metrics, ContainerTokenSecretManager 
-      containerTokenSecretManager) {
+      containerTokenSecretManager, ApplicationACLsManager aclsManager) {
     super(ContainerManagerImpl.class.getName());
     this.context = context;
     dispatcher = new AsyncDispatcher();
@@ -135,35 +145,55 @@ public class ContainerManagerImpl extend
 
     this.nodeStatusUpdater = nodeStatusUpdater;
     this.containerTokenSecretManager = containerTokenSecretManager;
+    this.aclsManager = aclsManager;
 
     // Start configurable services
-    auxiluaryServices = new AuxServices();
-    auxiluaryServices.register(this);
-    addService(auxiluaryServices);
+    auxiliaryServices = new AuxServices();
+    auxiliaryServices.register(this);
+    addService(auxiliaryServices);
 
     this.containersMonitor =
         new ContainersMonitorImpl(exec, dispatcher, this.context);
     addService(this.containersMonitor);
 
-    LogAggregationService logAggregationService =
-        createLogAggregationService(this.context, this.deletionService);
-    addService(logAggregationService);
 
     dispatcher.register(ContainerEventType.class,
         new ContainerEventDispatcher());
     dispatcher.register(ApplicationEventType.class,
         new ApplicationEventDispatcher());
     dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
-    dispatcher.register(AuxServicesEventType.class, auxiluaryServices);
+    dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
     dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
     dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
-    dispatcher.register(LogAggregatorEventType.class, logAggregationService);
+    
     addService(dispatcher);
   }
 
-  protected LogAggregationService createLogAggregationService(Context context,
+  @Override
+  public void init(Configuration conf) {
+    LogHandler logHandler =
+      createLogHandler(conf, this.context, this.deletionService);
+    addIfService(logHandler);
+    dispatcher.register(LogHandlerEventType.class, logHandler);
+    
+    super.init(conf);
+  }
+
+  private void addIfService(Object object) {
+    if (object instanceof Service) {
+      addService((Service) object);
+    }
+  }
+
+  protected LogHandler createLogHandler(Configuration conf, Context context,
       DeletionService deletionService) {
-    return new LogAggregationService(context, deletionService);
+    if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
+        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+      return new LogAggregationService(this.dispatcher, context,
+          deletionService);
+    } else {
+      return new NonAggregatingLogHandler(this.dispatcher, deletionService);
+    }
   }
 
   public ContainersMonitor getContainersMonitor() {
@@ -190,48 +220,139 @@ public class ContainerManagerImpl extend
     YarnRPC rpc = YarnRPC.create(conf);
 
     InetSocketAddress initialAddress = NetUtils.createSocketAddr(conf.get(
-        YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS));
+        YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS),
+        YarnConfiguration.DEFAULT_NM_PORT,
+        YarnConfiguration.NM_ADDRESS);
 
     server =
         rpc.getServer(ContainerManager.class, this, initialAddress, conf,
             this.containerTokenSecretManager,
             conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 
                 YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
+    
+    // Enable service authorization?
+    if (conf.getBoolean(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+        false)) {
+      refreshServiceAcls(conf, new NMPolicyProvider());
+    }
+    
     server.start();
-    InetAddress hostNameResolved = null;
     try {
-      hostNameResolved = InetAddress.getLocalHost();
+      resolvedAddress = InetAddress.getLocalHost();
     } catch (UnknownHostException e) {
       throw new YarnException(e);
     }
-    this.context.getNodeId().setHost(hostNameResolved.getCanonicalHostName());
+    this.context.getNodeId().setHost(resolvedAddress.getCanonicalHostName());
     this.context.getNodeId().setPort(server.getPort());
     LOG.info("ContainerManager started at "
         + this.context.getNodeId().toString());
     super.start();
   }
 
+  void refreshServiceAcls(Configuration configuration, 
+      PolicyProvider policyProvider) {
+    this.server.refreshServiceAcl(configuration, policyProvider);
+  }
+
   @Override
   public void stop() {
-    if (auxiluaryServices.getServiceState() == STARTED) {
-      auxiluaryServices.unregister(this);
+    if (auxiliaryServices.getServiceState() == STARTED) {
+      auxiliaryServices.unregister(this);
     }
     if (server != null) {
-      server.close();
+      server.stop();
     }
     super.stop();
   }
 
   /**
+   * Authorize the request.
+   * 
+   * @param containerID
+   *          of the container
+   * @param launchContext
+   *          passed if verifying the startContainer, null otherwise.
+   * @throws YarnRemoteException
+   */
+  private void authorizeRequest(ContainerId containerID,
+      ContainerLaunchContext launchContext) throws YarnRemoteException {
+
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+
+    String containerIDStr = containerID.toString();
+
+    UserGroupInformation remoteUgi;
+    try {
+      remoteUgi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      String msg = "Cannot obtain the user-name for containerId: "
+          + containerIDStr + ". Got exception: "
+          + StringUtils.stringifyException(e);
+      LOG.warn(msg);
+      throw RPCUtil.getRemoteException(msg);
+    }
+
+    boolean unauthorized = false;
+    StringBuilder messageBuilder = new StringBuilder(
+        "Unauthorized request to start container. ");
+
+    if (!remoteUgi.getUserName().equals(containerIDStr)) {
+      unauthorized = true;
+      messageBuilder.append("\nExpected containerId: "
+          + remoteUgi.getUserName() + " Found: " + containerIDStr);
+    }
+
+    if (launchContext != null) {
+
+      // Verify other things for startContainer() request.
+
+      if (LOG.isDebugEnabled()) {
+      LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
+          + remoteUgi.getTokenIdentifiers().size());
+      }
+      // We must and should get only one TokenIdentifier from the RPC.
+      ContainerTokenIdentifier tokenId = (ContainerTokenIdentifier) remoteUgi
+          .getTokenIdentifiers().iterator().next();
+      if (tokenId == null) {
+        unauthorized = true;
+        messageBuilder
+            .append("\nContainerTokenIdentifier cannot be null! Null found for "
+                + containerIDStr);
+      } else {
+
+        Resource resource = tokenId.getResource();
+        if (!resource.equals(launchContext.getResource())) {
+          unauthorized = true;
+          messageBuilder.append("\nExpected resource " + resource
+              + " but found " + launchContext.getResource());
+        }
+      }
+    }
+
+    if (unauthorized) {
+      String msg = messageBuilder.toString();
+      LOG.error(msg);
+      throw RPCUtil.getRemoteException(msg);
+    }
+  }
+
+  /**
    * Start a container on this NodeManager.
    */
+  @SuppressWarnings("unchecked")
   @Override
   public StartContainerResponse startContainer(StartContainerRequest request)
       throws YarnRemoteException {
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
 
+    ContainerId containerID = launchContext.getContainerId();
+    authorizeRequest(containerID, launchContext);
+
     LOG.info(" container is " + request);
-  
+
     // //////////// Parse credentials
     ByteBuffer tokens = launchContext.getContainerTokens();
     Credentials credentials = new Credentials();
@@ -253,9 +374,8 @@ public class ContainerManagerImpl extend
     }
     // //////////// End of parsing credentials
 
-    Container container =
-        new ContainerImpl(this.dispatcher, launchContext, credentials, metrics);
-    ContainerId containerID = launchContext.getContainerId();
+    Container container = new ContainerImpl(getConfig(), this.dispatcher,
+        launchContext, credentials, metrics);
     ApplicationId applicationID = 
         containerID.getApplicationAttemptId().getApplicationId();
     if (context.getContainers().putIfAbsent(containerID, container) != null) {
@@ -268,16 +388,21 @@ public class ContainerManagerImpl extend
     }
 
     // Create the application
-    Application application = new ApplicationImpl(dispatcher,
-        launchContext.getUser(), applicationID, credentials);
+    Application application =
+        new ApplicationImpl(dispatcher, this.aclsManager,
+            launchContext.getUser(), applicationID, credentials, context);
     if (null ==
         context.getApplications().putIfAbsent(applicationID, application)) {
       LOG.info("Creating a new application reference for app "
           + applicationID);
+      dispatcher.getEventHandler().handle(
+          new ApplicationInitEvent(applicationID, container
+              .getLaunchContext().getApplicationACLs()));
     }
 
     // TODO: Validate the request
-    dispatcher.getEventHandler().handle(new ApplicationInitEvent(container));
+    dispatcher.getEventHandler().handle(
+        new ApplicationContainerInitEvent(container));
 
     NMAuditLogger.logSuccess(launchContext.getUser(), 
         AuditConstants.START_CONTAINER, "ContainerManageImpl", 
@@ -285,44 +410,44 @@ public class ContainerManagerImpl extend
 
     StartContainerResponse response =
         recordFactory.newRecordInstance(StartContainerResponse.class);
-    response.addAllServiceResponse(auxiluaryServices.getMeta());
+    response.addAllServiceResponse(auxiliaryServices.getMeta());
+    // TODO launchedContainer misplaced -> doesn't necessarily mean a container
+    // launch. A finished Application will not launch containers.
     metrics.launchedContainer();
     metrics.allocateContainer(launchContext.getResource());
     return response;
   }
 
+  /**
+   * Stop the container running on this NodeManager.
+   */
   @Override
+  @SuppressWarnings("unchecked")
   public StopContainerResponse stopContainer(StopContainerRequest request)
       throws YarnRemoteException {
 
+    ContainerId containerID = request.getContainerId();
+    // TODO: Only the container's owner can kill containers today.
+    authorizeRequest(containerID, null);
+
     StopContainerResponse response =
         recordFactory.newRecordInstance(StopContainerResponse.class);
 
-    ContainerId containerID = request.getContainerId();
     Container container = this.context.getContainers().get(containerID);
     if (container == null) {
       LOG.warn("Trying to stop unknown container " + containerID);
-      String userName;
-      try {
-        userName = UserGroupInformation.getCurrentUser().getUserName();
-      } catch (IOException e) {
-        LOG.error("Error finding userName", e);
-        return response;
-      }
-      NMAuditLogger.logFailure(userName,
+      NMAuditLogger.logFailure("UnknownUser",
           AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
           "Trying to stop unknown container!",
           containerID.getApplicationAttemptId().getApplicationId(), 
           containerID);
       return response; // Return immediately.
     }
+
     dispatcher.getEventHandler().handle(
         new ContainerKillEvent(containerID,
             "Container killed by the ApplicationMaster."));
-
-    // user logged here not ideal since just getting user from container but
-    // request doesn't have anything and should be coming from user of AM so 
-    // should be the same or should be rejected by auth before here. 
+ 
     NMAuditLogger.logSuccess(container.getUser(), 
         AuditConstants.STOP_CONTAINER, "ContainerManageImpl", 
         containerID.getApplicationAttemptId().getApplicationId(), 
@@ -336,20 +461,26 @@ public class ContainerManagerImpl extend
   }
 
   @Override
-  public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException {
+  public GetContainerStatusResponse getContainerStatus(
+      GetContainerStatusRequest request) throws YarnRemoteException {
+
     ContainerId containerID = request.getContainerId();
+    // TODO: Only the container's owner can get containers' status today.
+    authorizeRequest(containerID, null);
+
     LOG.info("Getting container-status for " + containerID);
     Container container = this.context.getContainers().get(containerID);
     if (container != null) {
       ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
       LOG.info("Returning " + containerStatus);
-      GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class);
+      GetContainerStatusResponse response = recordFactory
+          .newRecordInstance(GetContainerStatusResponse.class);
       response.setStatus(containerStatus);
       return response;
-    } else {
-      throw RPCUtil.getRemoteException("Container " + containerID
-          + " is not handled by this NodeManager");
     }
+
+    throw RPCUtil.getRemoteException("Container " + containerID
+        + " is not handled by this NodeManager");
   }
 
   class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
@@ -371,19 +502,19 @@ public class ContainerManagerImpl extend
 
     @Override
     public void handle(ApplicationEvent event) {
-      Application app = 
-      ContainerManagerImpl.this.context.getApplications().get(
-          event.getApplicationID());
+      Application app =
+          ContainerManagerImpl.this.context.getApplications().get(
+              event.getApplicationID());
       if (app != null) {
         app.handle(event);
       } else {
-        LOG.warn("Event " + event + " sent to absent application " +
-            event.getApplicationID());
+        LOG.warn("Event " + event + " sent to absent application "
+            + event.getApplicationID());
       }
     }
-    
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void handle(ContainerManagerEvent event) {
     switch (event.getType()) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java Wed Nov  2 05:34:31 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.no
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.Event;
 
 public class ApplicationEvent extends AbstractEvent<ApplicationEventType> {
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java Wed Nov  2 05:34:31 2011
@@ -22,6 +22,7 @@ public enum ApplicationEventType {
 
   // Source: ContainerManager
   INIT_APPLICATION,
+  INIT_CONTAINER,
   FINISH_APPLICATION,
 
   // Source: ResourceLocalizationService
@@ -31,6 +32,6 @@ public enum ApplicationEventType {
   // Source: Container
   APPLICATION_CONTAINER_FINISHED,
 
-  // Source: LogAggregationService.
-  APPLICATION_FINISHED,
+  // Source: Log Handler
+  APPLICATION_LOG_HANDLING_FINISHED
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Wed Nov  2 05:34:31 2011
@@ -21,46 +21,69 @@ package org.apache.hadoop.yarn.server.no
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 
+/**
+ * The state machine for the representation of an Application
+ * within the NodeManager.
+ */
 public class ApplicationImpl implements Application {
 
   final Dispatcher dispatcher;
   final String user;
   final ApplicationId appId;
   final Credentials credentials;
+  Map<ApplicationAccessType, String> applicationACLs;
+  final ApplicationACLsManager aclsManager;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+  private final Context context;
 
   private static final Log LOG = LogFactory.getLog(Application.class);
 
   Map<ContainerId, Container> containers =
       new HashMap<ContainerId, Container>();
 
-  public ApplicationImpl(Dispatcher dispatcher, String user,
-      ApplicationId appId, Credentials credentials) {
+  public ApplicationImpl(Dispatcher dispatcher,
+      ApplicationACLsManager aclsManager, String user, ApplicationId appId,
+      Credentials credentials, Context context) {
     this.dispatcher = dispatcher;
     this.user = user.toString();
     this.appId = appId;
     this.credentials = credentials;
+    this.aclsManager = aclsManager;
+    this.context = context;
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -75,15 +98,23 @@ public class ApplicationImpl implements 
   }
 
   @Override
-  public synchronized ApplicationState getApplicationState() {
-    // TODO: Synchro should be at statemachine level.
-    // This is only for tests?
-    return this.stateMachine.getCurrentState();
+  public ApplicationState getApplicationState() {
+    this.readLock.lock();
+    try {
+      return this.stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   @Override
   public Map<ContainerId, Container> getContainers() {
-    return this.containers;
+    this.readLock.lock();
+    try {
+      return this.containers;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
@@ -97,11 +128,14 @@ public class ApplicationImpl implements 
            // Transitions from NEW state
            .addTransition(ApplicationState.NEW, ApplicationState.INITING,
                ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
+           .addTransition(ApplicationState.NEW, ApplicationState.NEW,
+               ApplicationEventType.INIT_CONTAINER,
+               new InitContainerTransition())
 
            // Transitions from INITING state
            .addTransition(ApplicationState.INITING, ApplicationState.INITING,
-               ApplicationEventType.INIT_APPLICATION,
-               new AppIsInitingTransition())
+               ApplicationEventType.INIT_CONTAINER,
+               new InitContainerTransition())
            .addTransition(ApplicationState.INITING,
                EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
                    ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
@@ -114,8 +148,8 @@ public class ApplicationImpl implements 
            // Transitions from RUNNING state
            .addTransition(ApplicationState.RUNNING,
                ApplicationState.RUNNING,
-               ApplicationEventType.INIT_APPLICATION,
-               new DuplicateAppInitTransition())
+               ApplicationEventType.INIT_CONTAINER,
+               new InitContainerTransition())
            .addTransition(ApplicationState.RUNNING,
                ApplicationState.RUNNING,
                ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
@@ -143,7 +177,13 @@ public class ApplicationImpl implements 
                ApplicationState.FINISHED,
                ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
                new AppCompletelyDoneTransition())
-
+           
+           // Transitions from FINISHED state
+           .addTransition(ApplicationState.FINISHED,
+               ApplicationState.FINISHED,
+               ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+               new AppLogsAggregatedTransition())
+               
            // create the topology tables
            .installTopology();
 
@@ -151,14 +191,18 @@ public class ApplicationImpl implements 
 
   /**
    * Notify services of new application.
+   * 
+   * In particular, this requests that the {@link ResourceLocalizationService}
+   * localize the application-scoped resources.
    */
+  @SuppressWarnings("unchecked")
   static class AppInitTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
     public void transition(ApplicationImpl app, ApplicationEvent event) {
-      ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
-      Container container = initEvent.getContainer();
-      app.containers.put(container.getContainerID(), container);
+      ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
+      app.applicationACLs = initEvent.getApplicationACLs();
+      app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
       app.dispatcher.getEventHandler().handle(
           new ApplicationLocalizationEvent(
               LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
@@ -166,20 +210,40 @@ public class ApplicationImpl implements 
   }
 
   /**
-   * Absorb initialization events while the application initializes.
+   * Handles INIT_CONTAINER events which request that we launch a new
+   * container. When we're still in the INITTING state, we simply
+   * queue these up. When we're in the RUNNING state, we pass along
+   * an ContainerInitEvent to the appropriate ContainerImpl.
    */
-  static class AppIsInitingTransition implements
+  @SuppressWarnings("unchecked")
+  static class InitContainerTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
     public void transition(ApplicationImpl app, ApplicationEvent event) {
-      ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
+      ApplicationContainerInitEvent initEvent =
+        (ApplicationContainerInitEvent) event;
       Container container = initEvent.getContainer();
       app.containers.put(container.getContainerID(), container);
       LOG.info("Adding " + container.getContainerID()
           + " to application " + app.toString());
+      
+      switch (app.getApplicationState()) {
+      case RUNNING:
+        app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
+            container.getContainerID()));
+        break;
+      case INITING:
+      case NEW:
+        // these get queued up and sent out in AppInitDoneTransition
+        break;
+      default:
+        assert false : "Invalid state for InitContainerTransition: " +
+            app.getApplicationState();
+      }
     }
   }
 
+  @SuppressWarnings("unchecked")
   static class AppInitDoneTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
@@ -187,9 +251,9 @@ public class ApplicationImpl implements 
 
       // Inform the logAggregator
       app.dispatcher.getEventHandler().handle(
-            new LogAggregatorAppStartedEvent(app.appId, app.user,
-                app.credentials,
-                ContainerLogsRetentionPolicy.ALL_CONTAINERS)); // TODO: Fix
+          new LogHandlerAppStartedEvent(app.appId, app.user,
+              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
+              app.applicationACLs)); 
 
       // Start all the containers waiting for ApplicationInit
       for (Container container : app.containers.values()) {
@@ -199,19 +263,6 @@ public class ApplicationImpl implements 
     }
   }
 
-  static class DuplicateAppInitTransition implements
-      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
-    @Override
-    public void transition(ApplicationImpl app, ApplicationEvent event) {
-      ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
-      Container container = initEvent.getContainer();
-      app.containers.put(container.getContainerID(), container);
-      LOG.info("Adding " + container.getContainerID()
-          + " to application " + app.toString());
-      app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
-            container.getContainerID()));
-    }
-  }
   
   static final class ContainerDoneTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@@ -229,15 +280,21 @@ public class ApplicationImpl implements 
     }
   }
 
+  @SuppressWarnings("unchecked")
   void handleAppFinishWithContainersCleanedup() {
     // Delete Application level resources
     this.dispatcher.getEventHandler().handle(
         new ApplicationLocalizationEvent(
             LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
 
+    // tell any auxiliary services that the app is done 
+    this.dispatcher.getEventHandler().handle(
+        new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
+
     // TODO: Trigger the LogsManager
   }
 
+  @SuppressWarnings("unchecked")
   static class AppFinishTriggeredTransition
       implements
       MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@@ -286,38 +343,57 @@ public class ApplicationImpl implements 
 
   }
 
+  @SuppressWarnings("unchecked")
   static class AppCompletelyDoneTransition implements
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
     public void transition(ApplicationImpl app, ApplicationEvent event) {
+
       // Inform the logService
       app.dispatcher.getEventHandler().handle(
-          new LogAggregatorAppFinishedEvent(app.appId));
+          new LogHandlerAppFinishedEvent(app.appId));
+
+    }
+  }
+
+  static class AppLogsAggregatedTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+      ApplicationId appId = event.getApplicationID();
+      app.context.getApplications().remove(appId);
+      app.aclsManager.removeApplication(appId);
     }
   }
 
   @Override
-  public synchronized void handle(ApplicationEvent event) {
+  public void handle(ApplicationEvent event) {
 
-    ApplicationId applicationID = event.getApplicationID();
-    LOG.info("Processing " + applicationID + " of type " + event.getType());
+    this.writeLock.lock();
 
-    ApplicationState oldState = stateMachine.getCurrentState();
-    ApplicationState newState = null;
     try {
-      // queue event requesting init of the same app
-      newState = stateMachine.doTransition(event.getType(), event);
-    } catch (InvalidStateTransitonException e) {
-      LOG.warn("Can't handle this event at current state", e);
-    }
-    if (oldState != newState) {
-      LOG.info("Application " + applicationID + " transitioned from "
-          + oldState + " to " + newState);
+      ApplicationId applicationID = event.getApplicationID();
+      LOG.info("Processing " + applicationID + " of type " + event.getType());
+
+      ApplicationState oldState = stateMachine.getCurrentState();
+      ApplicationState newState = null;
+      try {
+        // queue event requesting init of the same app
+        newState = stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.warn("Can't handle this event at current state", e);
+      }
+      if (oldState != newState) {
+        LOG.info("Application " + applicationID + " transitioned from "
+            + oldState + " to " + newState);
+      }
+    } finally {
+      this.writeLock.unlock();
     }
   }
 
   @Override
   public String toString() {
-    return ConverterUtils.toString(appId);
+    return appId.toString();
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java Wed Nov  2 05:34:31 2011
@@ -18,20 +18,22 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 
 public class ApplicationInitEvent extends ApplicationEvent {
 
-  private final Container container;
+  private final Map<ApplicationAccessType, String> applicationACLs;
 
-  public ApplicationInitEvent(Container container) {
-    super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
-        ApplicationEventType.INIT_APPLICATION);
-    this.container = container;
+  public ApplicationInitEvent(ApplicationId appId,
+      Map<ApplicationAccessType, String> acls) {
+    super(appId, ApplicationEventType.INIT_APPLICATION);
+    this.applicationACLs = acls;
   }
 
-  public Container getContainer() {
-    return this.container;
+  public Map<ApplicationAccessType, String> getApplicationACLs() {
+    return this.applicationACLs;
   }
-
 }