You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:03 UTC
svn commit: r1196458 [11/19] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/
bin/ conf/ dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-cl...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml Wed Nov 2 05:34:31 2011
@@ -16,16 +16,18 @@
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>${yarn.version}</version>
+ <version>0.24.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
+ <version>0.24.0-SNAPSHOT</version>
<name>hadoop-yarn-server-nodemanager</name>
<properties>
- <install.file>${project.artifact.file}</install.file>
+ <!-- Basedir eeded for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+ <container-executor.conf.dir>/etc/hadoop</container-executor.conf.dir>
</properties>
<dependencies>
@@ -37,7 +39,7 @@
<profiles>
<profile>
- <id>cbuild</id>
+ <id>native</id>
<build>
<plugins>
<plugin>
@@ -46,61 +48,48 @@
<version>1.0-beta-1</version>
<executions>
<execution>
- <id>autoreconf</id>
- <phase>package</phase>
- <configuration>
- <arguments>
- <argument>-i</argument>
- </arguments>
- <workDir>src/main/c/container-executor</workDir>
- </configuration>
+ <id>compile</id>
+ <phase>compile</phase>
<goals>
<goal>autoreconf</goal>
- </goals>
- </execution>
- <execution>
- <id>make</id>
- <phase>package</phase>
- <configuration>
- <workDir>src/main/c/container-executor</workDir>
- <configureEnvironment>
- <property>
- <name>CFLAGS</name>
- <value>-DHADOOP_CONF_DIR=${container-executor.conf.dir}</value>
- </property>
- </configureEnvironment>
- <sources>
- <source>
- <directory>src/main/c/container-executor</directory>
- </source>
- </sources>
- <workDir>src/main/c/container-executor</workDir>
- <destDir>target</destDir>
- <prefix>${project.build.outputDirectory}</prefix>
- </configuration>
- <goals>
- <!-- always clean, to ensure conf dir regenerated -->
- <goal>make-clean</goal>
<goal>configure</goal>
+ <goal>make-install</goal>
</goals>
</execution>
<execution>
- <id>install</id>
- <phase>package</phase>
- <configuration>
- <destDir>/</destDir>
- <workDir>src/main/c/container-executor</workDir>
- </configuration>
+ <id>test</id>
+ <phase>test</phase>
<goals>
- <goal>make-install</goal>
+ <goal>test</goal>
</goals>
</execution>
</executions>
+ <configuration>
+ <!-- autoreconf settings -->
+ <workDir>${project.build.directory}/native/container-executor</workDir>
+ <arguments>
+ <argument>-i</argument>
+ </arguments>
+
+ <!-- configure settings -->
+ <configureEnvironment>
+ <property>
+ <name>CFLAGS</name>
+ <value>-DHADOOP_CONF_DIR=${container-executor.conf.dir}</value>
+ </property>
+ </configureEnvironment>
+ <configureWorkDir>${project.build.directory}/native/container-executor</configureWorkDir>
+ <prefix>/usr/local</prefix>
+
+ <!-- configure & make settings -->
+ <destDir>${project.build.directory}/native/target</destDir>
+
+ </configuration>
</plugin>
</plugins>
</build>
<activation>
- <activeByDefault>true</activeByDefault>
+ <activeByDefault>false</activeByDefault>
</activation>
</profile>
<profile>
@@ -145,8 +134,12 @@
<configuration>
<systemPropertyVariables>
<property>
- <name>container-executor-path</name>
- <value></value>
+ <name>container-executor.path</name>
+ <value>${container-executor.path}</value>
+ </property>
+ <property>
+ <name>application.submitter</name>
+ <value>${application.submitter}</value>
</property>
</systemPropertyVariables>
<excludes>
@@ -170,6 +163,21 @@
<goal>run</goal>
</goals>
</execution>
+ <execution>
+ <id>compile</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <mkdir dir="${project.build.directory}/native"/>
+ <copy toDir="${project.build.directory}/native">
+ <fileset dir="${basedir}/src/main/native"/>
+ </copy>
+ </target>
+ </configuration>
+ </execution>
</executions>
</plugin>
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Wed Nov 2 05:34:31 2011
@@ -19,12 +19,13 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
-import java.lang.reflect.Field;
-
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +36,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
public abstract class ContainerExecutor implements Configurable {
@@ -43,8 +45,12 @@ public abstract class ContainerExecutor
FsPermission.createImmutable((short) 0700);
private Configuration conf;
- protected ConcurrentMap<ContainerId, ShellCommandExecutor> launchCommandObjs =
- new ConcurrentHashMap<ContainerId, ShellCommandExecutor>();
+ private ConcurrentMap<ContainerId, Path> pidFiles =
+ new ConcurrentHashMap<ContainerId, Path>();
+
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReadLock readLock = lock.readLock();
+ private final WriteLock writeLock = lock.writeLock();
@Override
public void setConf(Configuration conf) {
@@ -102,7 +108,8 @@ public abstract class ContainerExecutor
throws IOException, InterruptedException;
public enum ExitCode {
- KILLED(137);
+ FORCE_KILLED(137),
+ TERMINATED(143);
private final int code;
private ExitCode(int exitCode) {
@@ -150,6 +157,66 @@ public abstract class ContainerExecutor
}
/**
+ * Get the pidFile of the container.
+ * @param containerId
+ * @return the path of the pid-file for the given containerId.
+ */
+ protected Path getPidFilePath(ContainerId containerId) {
+ try {
+ readLock.lock();
+ return (this.pidFiles.get(containerId));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Is the container still active?
+ * @param containerId
+ * @return true if the container is active else false.
+ */
+ protected boolean isContainerActive(ContainerId containerId) {
+ try {
+ readLock.lock();
+ return (this.pidFiles.containsKey(containerId));
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Mark the container as active
+ *
+ * @param containerId
+ * the ContainerId
+ * @param pidFilePath
+ * Path where the executor should write the pid of the launched
+ * process
+ */
+ public void activateContainer(ContainerId containerId, Path pidFilePath) {
+ try {
+ writeLock.lock();
+ this.pidFiles.put(containerId, pidFilePath);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Mark the container as inactive.
+ * Done iff the container is still active. Else treat it as
+ * a no-op
+ */
+ public void deactivateContainer(ContainerId containerId) {
+ try {
+ writeLock.lock();
+ this.pidFiles.remove(containerId);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
* Get the process-identifier for the container
*
* @param containerID
@@ -158,28 +225,15 @@ public abstract class ContainerExecutor
*/
public String getProcessId(ContainerId containerID) {
String pid = null;
- ShellCommandExecutor shExec = launchCommandObjs.get(containerID);
- if (shExec == null) {
+ Path pidFile = pidFiles.get(containerID);
+ if (pidFile == null) {
// This container isn't even launched yet.
return pid;
}
- Process proc = shExec.getProcess();
- if (proc == null) {
- // This happens if the command is not yet started
- return pid;
- }
try {
- Field pidField = proc.getClass().getDeclaredField("pid");
- pidField.setAccessible(true);
- pid = ((Integer) pidField.get(proc)).toString();
- } catch (SecurityException e) {
- // SecurityManager not expected with yarn. Ignore.
- } catch (NoSuchFieldException e) {
- // Yarn only on UNIX for now. Ignore.
- } catch (IllegalArgumentException e) {
- ;
- } catch (IllegalAccessException e) {
- ;
+ pid = ProcessIdFileReader.getProcessId(pidFile);
+ } catch (IOException e) {
+ LOG.error("Got exception reading pid from pid-file " + pidFile, e);
}
return pid;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java Wed Nov 2 05:34:31 2011
@@ -20,5 +20,5 @@ package org.apache.hadoop.yarn.server.no
public enum ContainerManagerEventType {
FINISH_APPS,
- FINISH_CONTAINERS
+ FINISH_CONTAINERS,
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Wed Nov 2 05:34:31 2011
@@ -18,10 +18,16 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -48,6 +54,9 @@ public class DefaultContainerExecutor ex
private final FileContext lfs;
+ private static final String WRAPPER_LAUNCH_SCRIPT =
+ "default_container_executor.sh";
+
public DefaultContainerExecutor() {
try {
this.lfs = FileContext.getLocalFSFileContext();
@@ -80,8 +89,9 @@ public class DefaultContainerExecutor ex
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
Path tokenDst = new Path(appStorageDir, tokenFn);
lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
+ LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
lfs.setWorkingDirectory(appStorageDir);
-
+ LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
// TODO: DO it over RPC for maintaining similarity?
localizer.runLocalization(nmAddr);
}
@@ -100,8 +110,9 @@ public class DefaultContainerExecutor ex
ConverterUtils.toString(
container.getContainerID().getApplicationAttemptId().
getApplicationId());
- String[] sLocalDirs =
- getConf().getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
+ String[] sLocalDirs = getConf().getStrings(
+ YarnConfiguration.NM_LOCAL_DIRS,
+ YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
for (String sLocalDir : sLocalDirs) {
Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
@@ -124,21 +135,47 @@ public class DefaultContainerExecutor ex
new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
lfs.util().copy(nmPrivateTokensPath, tokenDst);
+ // Create new local launch wrapper script
+ Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT);
+ DataOutputStream wrapperScriptOutStream =
+ lfs.create(wrapperScriptDst,
+ EnumSet.of(CREATE, OVERWRITE));
+
+ Path pidFile = getPidFilePath(containerId);
+ if (pidFile != null) {
+ writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri()
+ .getPath().toString(), pidFile.toString());
+ } else {
+ LOG.info("Container " + containerIdStr
+ + " was marked as inactive. Returning terminated error");
+ return ExitCode.TERMINATED.getExitCode();
+ }
+
// create log dir under app
// fork script
ShellCommandExecutor shExec = null;
try {
lfs.setPermission(launchDst,
ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
- String[] command =
- new String[] { "bash", "-c", launchDst.toUri().getPath().toString() };
+ lfs.setPermission(wrapperScriptDst,
+ ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
+
+ // Setup command to run
+ String[] command = {"bash", "-c",
+ wrapperScriptDst.toUri().getPath().toString()};
LOG.info("launchContainer: " + Arrays.toString(command));
shExec = new ShellCommandExecutor(
command,
- new File(containerWorkDir.toUri().getPath()),
+ new File(containerWorkDir.toUri().getPath()),
container.getLaunchContext().getEnvironment()); // sanitized env
- launchCommandObjs.put(containerId, shExec);
- shExec.execute();
+ if (isContainerActive(containerId)) {
+ shExec.execute();
+ }
+ else {
+ LOG.info("Container " + containerIdStr +
+ " was marked as inactive. Returning terminated error");
+ return ExitCode.TERMINATED.getExitCode();
+ }
} catch (IOException e) {
if (null == shExec) {
return -1;
@@ -151,17 +188,44 @@ public class DefaultContainerExecutor ex
message));
return exitCode;
} finally {
- launchCommandObjs.remove(containerId);
+ ; //
}
return 0;
}
+ private void writeLocalWrapperScript(DataOutputStream out,
+ String launchScriptDst, String pidFilePath) throws IOException {
+ // We need to do a move as writing to a file is not atomic
+ // Process reading a file being written to may get garbled data
+ // hence write pid to tmp file first followed by a mv
+ StringBuilder sb = new StringBuilder("#!/bin/bash\n\n");
+ sb.append("echo $$ > " + pidFilePath + ".tmp\n");
+ sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n");
+ sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec");
+ sb.append(" /bin/bash ");
+ sb.append("-c ");
+ sb.append("\"");
+ sb.append(launchScriptDst);
+ sb.append("\"\n");
+ PrintStream pout = null;
+ try {
+ pout = new PrintStream(out);
+ pout.append(sb);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
@Override
public boolean signalContainer(String user, String pid, Signal signal)
throws IOException {
final String sigpid = ContainerExecutor.isSetsidAvailable
? "-" + pid
: pid;
+ LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid
+ + " as user " + user);
try {
sendSignal(sigpid, Signal.NULL);
} catch (ExitCodeException e) {
@@ -189,8 +253,8 @@ public class DefaultContainerExecutor ex
*/
protected void sendSignal(String pid, Signal signal) throws IOException {
ShellCommandExecutor shexec = null;
- String[] arg = { "kill", "-" + signal.getValue(), pid };
- shexec = new ShellCommandExecutor(arg);
+ String[] arg = { "kill", "-" + signal.getValue(), pid };
+ shexec = new ShellCommandExecutor(arg);
shexec.execute();
}
@@ -199,13 +263,18 @@ public class DefaultContainerExecutor ex
throws IOException, InterruptedException {
if (baseDirs == null || baseDirs.length == 0) {
LOG.info("Deleting absolute path : " + subDir);
- lfs.delete(subDir, true);
+ if (!lfs.delete(subDir, true)) {
+ //Maybe retry
+ LOG.warn("delete returned false for path: [" + subDir + "]");
+ }
return;
}
for (Path baseDir : baseDirs) {
Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
LOG.info("Deleting path : " + del);
- lfs.delete(del, true);
+ if (!lfs.delete(del, true)) {
+ LOG.warn("delete returned false for path: [" + del + "]");
+ }
}
}
@@ -335,12 +404,6 @@ public class DefaultContainerExecutor ex
FsPermission appperms = new FsPermission(APPDIR_PERM);
for (Path localDir : localDirs) {
Path fullAppDir = getApplicationDir(localDir, user, appId);
- if (lfs.util().exists(fullAppDir)) {
- // this will happen on a partial execution of localizeJob. Sometimes
- // copying job.xml to the local disk succeeds but copying job.jar might
- // throw out an exception. We should clean up and then try again.
- lfs.delete(fullAppDir, true);
- }
// create $local.dir/usercache/$user/appcache/$appId
try {
lfs.mkdir(fullAppDir, appperms, true);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Wed Nov 2 05:34:31 2011
@@ -19,8 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
@@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.service.Ab
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
public class DeletionService extends AbstractService {
static final Log LOG = LogFactory.getLog(DeletionService.class);
private int debugDelay;
@@ -71,12 +73,17 @@ public class DeletionService extends Abs
@Override
public void init(Configuration conf) {
+ ThreadFactory tf = new ThreadFactoryBuilder()
+ .setNameFormat("DeletionService #%d")
+ .build();
if (conf != null) {
sched = new ScheduledThreadPoolExecutor(
- conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT));
+ conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT),
+ tf);
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else {
- sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT);
+ sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
+ tf);
}
sched.setKeepAliveTime(60L, SECONDS);
super.init(conf);
@@ -125,6 +132,7 @@ public class DeletionService extends Abs
}
} else {
try {
+ LOG.debug("Deleting path: [" + subDir + "] as user: [" + user + "]");
exec.deleteAsUser(user, subDir, baseDirs);
} catch (IOException e) {
LOG.warn("Failed to delete as user " + user, e);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Wed Nov 2 05:34:31 2011
@@ -56,11 +56,10 @@ public class LinuxContainerExecutor exte
* List of commands that the setuid script will execute.
*/
enum Commands {
- INITIALIZE_JOB(0),
+ INITIALIZE_CONTAINER(0),
LAUNCH_CONTAINER(1),
SIGNAL_CONTAINER(2),
- DELETE_AS_USER(3),
- DELETE_LOG_AS_USER(4);
+ DELETE_AS_USER(3);
private int value;
Commands(int value) {
@@ -78,8 +77,9 @@ public class LinuxContainerExecutor exte
enum ResultCode {
OK(0),
INVALID_USER_NAME(2),
- INVALID_TASK_PID(9),
- INVALID_TASKCONTROLLER_PERMISSIONS(22),
+ UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
+ INVALID_CONTAINER_PID(9),
+ INVALID_CONTAINER_EXEC_PERMISSIONS(22),
INVALID_CONFIG_FILE(24);
private final int value;
@@ -107,7 +107,7 @@ public class LinuxContainerExecutor exte
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe,
user,
- Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+ Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
appId,
nmPrivateContainerTokensPath.toUri().getPath().toString()));
File jvm = // use same jvm as parent
@@ -115,6 +115,10 @@ public class LinuxContainerExecutor exte
command.add(jvm.toString());
command.add("-classpath");
command.add(System.getProperty("java.class.path"));
+ String javaLibPath = System.getProperty("java.library.path");
+ if (javaLibPath != null) {
+ command.add("-Djava.library.path=" + javaLibPath);
+ }
command.add(ContainerLocalizer.class.getName());
command.add(user);
command.add(appId);
@@ -151,41 +155,49 @@ public class LinuxContainerExecutor exte
ContainerId containerId = container.getContainerID();
String containerIdStr = ConverterUtils.toString(containerId);
- List<String> command = new ArrayList<String>(
- Arrays.asList(containerExecutorExe,
- user,
- Integer.toString(Commands.LAUNCH_CONTAINER.getValue()),
- appId,
- containerIdStr,
- containerWorkDir.toString(),
- nmPrivateCotainerScriptPath.toUri().getPath().toString(),
- nmPrivateTokensPath.toUri().getPath().toString()));
- String[] commandArray = command.toArray(new String[command.size()]);
- ShellCommandExecutor shExec =
- new ShellCommandExecutor(
- commandArray,
- null, // NM's cwd
- container.getLaunchContext().getEnvironment()); // sanitized env
- launchCommandObjs.put(containerId, shExec);
- // DEBUG
- LOG.info("launchContainer: " + Arrays.toString(commandArray));
- String output = shExec.getOutput();
+
+ ShellCommandExecutor shExec = null;
+
try {
- shExec.execute();
- if (LOG.isDebugEnabled()) {
- logOutput(output);
+ Path pidFilePath = getPidFilePath(containerId);
+ if (pidFilePath != null) {
+ List<String> command = new ArrayList<String>(Arrays.asList(
+ containerExecutorExe, user, Integer
+ .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
+ containerIdStr, containerWorkDir.toString(),
+ nmPrivateCotainerScriptPath.toUri().getPath().toString(),
+ nmPrivateTokensPath.toUri().getPath().toString(), pidFilePath
+ .toString()));
+ String[] commandArray = command.toArray(new String[command.size()]);
+ shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
+ container.getLaunchContext().getEnvironment()); // sanitized env
+ // DEBUG
+ LOG.info("launchContainer: " + Arrays.toString(commandArray));
+ shExec.execute();
+ if (LOG.isDebugEnabled()) {
+ logOutput(shExec.getOutput());
+ }
+ } else {
+ LOG.info("Container was marked as inactive. Returning terminated error");
+ return ExitCode.TERMINATED.getExitCode();
}
} catch (ExitCodeException e) {
+
+ if (null == shExec) {
+ return -1;
+ }
+
int exitCode = shExec.getExitCode();
LOG.warn("Exit code from container is : " + exitCode);
// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
// terminated/killed forcefully. In all other cases, log the
// container-executor's output
- if (exitCode != 143 && exitCode != 137) {
+ if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
+ && exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch : ", e);
- logOutput(output);
+ logOutput(shExec.getOutput());
String diagnostics = "Exception from container-launch: \n"
- + StringUtils.stringifyException(e) + "\n" + output;
+ + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {
@@ -194,11 +206,11 @@ public class LinuxContainerExecutor exte
}
return exitCode;
} finally {
- launchCommandObjs.remove(containerId);
+ ; //
}
if (LOG.isDebugEnabled()) {
LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
- logOutput(output);
+ logOutput(shExec.getOutput());
}
return 0;
}
@@ -221,7 +233,7 @@ public class LinuxContainerExecutor exte
shExec.execute();
} catch (ExitCodeException e) {
int ret_code = shExec.getExitCode();
- if (ret_code == ResultCode.INVALID_TASK_PID.getValue()) {
+ if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) {
return false;
}
logOutput(shExec.getOutput());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Nov 2 05:34:31 2011
@@ -46,15 +46,19 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
import org.apache.hadoop.yarn.util.Records;
-public class NodeManager extends CompositeService {
+public class NodeManager extends CompositeService implements
+ ServiceStateChangeListener {
private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
protected ContainerTokenSecretManager containerTokenSecretManager;
+ private ApplicationACLsManager aclsManager;
public NodeManager() {
super(NodeManager.class.getName());
@@ -74,14 +78,14 @@ public class NodeManager extends Composi
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ContainerTokenSecretManager
- containerTokenSecretManager) {
+ containerTokenSecretManager, ApplicationACLsManager aclsManager) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
- metrics, containerTokenSecretManager);
+ metrics, containerTokenSecretManager, aclsManager);
}
protected WebServer createWebServer(Context nmContext,
- ResourceView resourceView) {
- return new WebServer(nmContext, resourceView);
+ ResourceView resourceView, ApplicationACLsManager aclsManager) {
+ return new WebServer(nmContext, resourceView, aclsManager);
}
protected void doSecureLogin() throws IOException {
@@ -101,6 +105,8 @@ public class NodeManager extends Composi
this.containerTokenSecretManager = new ContainerTokenSecretManager();
}
+ this.aclsManager = new ApplicationACLsManager(conf);
+
ContainerExecutor exec = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
@@ -119,17 +125,19 @@ public class NodeManager extends Composi
NodeStatusUpdater nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, healthChecker,
this.containerTokenSecretManager);
+
+ nodeStatusUpdater.register(this);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
ContainerManagerImpl containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
- this.containerTokenSecretManager);
+ this.containerTokenSecretManager, this.aclsManager);
addService(containerManager);
- Service webServer =
- createWebServer(context, containerManager.getContainersMonitor());
+ Service webServer = createWebServer(context, containerManager
+ .getContainersMonitor(), this.aclsManager);
addService(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
@@ -202,6 +210,16 @@ public class NodeManager extends Composi
}
}
+
+ @Override
+ public void stateChanged(Service service) {
+ // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
+ if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
+ && STATE.STOPPED.equals(service.getServiceState())) {
+ stop();
+ }
+ }
+
public static void main(String[] args) {
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
try {
@@ -216,5 +234,4 @@ public class NodeManager extends Composi
System.exit(-1);
}
}
-
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Nov 2 05:34:31 2011
@@ -30,8 +30,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -45,11 +45,11 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -100,9 +100,9 @@ public class NodeStatusUpdaterImpl exten
this.heartBeatInterval =
conf.getLong(YarnConfiguration.NM_TO_RM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_TO_RM_HEARTBEAT_INTERVAL_MS);
- int memory = conf.getInt(YarnConfiguration.NM_VMEM_GB, YarnConfiguration.DEFAULT_NM_VMEM_GB);
+ int memoryMb = conf.getInt(YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
this.totalResource = recordFactory.newRecordInstance(Resource.class);
- this.totalResource.setMemory(memory * 1024);
+ this.totalResource.setMemory(memoryMb);
metrics.addResource(totalResource);
super.init(conf);
}
@@ -117,7 +117,9 @@ public class NodeStatusUpdaterImpl exten
getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
InetSocketAddress httpBindAddress =
- NetUtils.createSocketAddr(httpBindAddressStr);
+ NetUtils.createSocketAddr(httpBindAddressStr,
+ YarnConfiguration.DEFAULT_NM_WEBAPP_PORT,
+ YarnConfiguration.NM_WEBAPP_ADDRESS);
try {
// this.hostName = InetAddress.getLocalHost().getCanonicalHostName();
this.httpPort = httpBindAddress.getPort();
@@ -141,7 +143,9 @@ public class NodeStatusUpdaterImpl exten
protected ResourceTracker getRMClient() {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.rmAddress);
+ InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.rmAddress,
+ YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT,
+ YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
conf);
}
@@ -156,6 +160,12 @@ public class NodeStatusUpdaterImpl exten
request.setNodeId(this.nodeId);
RegistrationResponse regResponse =
this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
+ // if the Resourcemanager instructs NM to shutdown.
+ if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
+ throw new YarnException(
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+ }
+
if (UserGroupInformation.isSecurityEnabled()) {
this.secretKeyBytes = regResponse.getSecretKey().array();
}
@@ -231,7 +241,7 @@ public class NodeStatusUpdaterImpl exten
protected void startStatusUpdater() {
- new Thread() {
+ new Thread("Node Status Updater") {
@Override
public void run() {
int lastHeartBeatID = 0;
@@ -244,10 +254,25 @@ public class NodeStatusUpdaterImpl exten
NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID);
- NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
+ NodeHeartbeatRequest request = recordFactory
+ .newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
HeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
+ if (response.getNodeAction() == NodeAction.SHUTDOWN) {
+ LOG
+ .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
+ " hence shutting down.");
+ NodeStatusUpdaterImpl.this.stop();
+ break;
+ }
+ if (response.getNodeAction() == NodeAction.REBOOT) {
+ LOG.info("Node is out of sync with ResourceManager,"
+ + " hence shutting down.");
+ NodeStatusUpdaterImpl.this.stop();
+ break;
+ }
+
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanupList();
@@ -262,8 +287,9 @@ public class NodeStatusUpdaterImpl exten
new CMgrCompletedAppsEvent(appsToCleanup));
}
} catch (Throwable e) {
+ // TODO Better error handling. Thread can die with the rest of the
+ // NM still running.
LOG.error("Caught exception in status-updater", e);
- break;
}
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Wed Nov 2 05:34:31 2011
@@ -42,8 +42,8 @@ public class AuxServices extends Abstrac
private static final Log LOG = LogFactory.getLog(AuxServices.class);
- public final Map<String,AuxiliaryService> serviceMap;
- public final Map<String,ByteBuffer> serviceMeta;
+ protected final Map<String,AuxiliaryService> serviceMap;
+ protected final Map<String,ByteBuffer> serviceMeta;
public AuxServices() {
super(AuxServices.class.getName());
@@ -157,20 +157,24 @@ public class AuxServices extends Abstrac
@Override
public void handle(AuxServicesEvent event) {
- LOG.info("Got event " + event.getType() + " for service "
- + event.getServiceID());
- AuxiliaryService service = serviceMap.get(event.getServiceID());
- if (null == service) {
- // TODO kill all containers waiting on Application
- return;
- }
+ LOG.info("Got event " + event.getType() + " for appId "
+ + event.getApplicationID());
switch (event.getType()) {
case APPLICATION_INIT:
+ LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
+ AuxiliaryService service = serviceMap.get(event.getServiceID());
+ if (null == service) {
+ LOG.info("service is null");
+ // TODO kill all containers waiting on Application
+ return;
+ }
service.initApp(event.getUser(), event.getApplicationID(),
event.getServiceData());
break;
case APPLICATION_STOP:
- service.stopApp(event.getApplicationID());
+ for (AuxiliaryService serv : serviceMap.values()) {
+ serv.stopApp(event.getApplicationID());
+ }
break;
default:
throw new RuntimeException("Unknown type: " + event.getType());
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Nov 2 05:34:31 2011
@@ -27,17 +27,19 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Map;
-import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -58,7 +61,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -69,6 +72,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
@@ -83,11 +87,15 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
@@ -102,9 +110,10 @@ public class ContainerManagerImpl extend
final Context context;
private final ContainersMonitor containersMonitor;
private Server server;
+ private InetAddress resolvedAddress = null;
private final ResourceLocalizationService rsrcLocalizationSrvc;
private final ContainersLauncher containersLauncher;
- private final AuxServices auxiluaryServices;
+ private final AuxServices auxiliaryServices;
private final NodeManagerMetrics metrics;
private final NodeStatusUpdater nodeStatusUpdater;
@@ -113,13 +122,14 @@ public class ContainerManagerImpl extend
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected final AsyncDispatcher dispatcher;
+ private final ApplicationACLsManager aclsManager;
private final DeletionService deletionService;
public ContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, ContainerTokenSecretManager
- containerTokenSecretManager) {
+ containerTokenSecretManager, ApplicationACLsManager aclsManager) {
super(ContainerManagerImpl.class.getName());
this.context = context;
dispatcher = new AsyncDispatcher();
@@ -135,35 +145,55 @@ public class ContainerManagerImpl extend
this.nodeStatusUpdater = nodeStatusUpdater;
this.containerTokenSecretManager = containerTokenSecretManager;
+ this.aclsManager = aclsManager;
// Start configurable services
- auxiluaryServices = new AuxServices();
- auxiluaryServices.register(this);
- addService(auxiluaryServices);
+ auxiliaryServices = new AuxServices();
+ auxiliaryServices.register(this);
+ addService(auxiliaryServices);
this.containersMonitor =
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor);
- LogAggregationService logAggregationService =
- createLogAggregationService(this.context, this.deletionService);
- addService(logAggregationService);
dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class,
new ApplicationEventDispatcher());
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
- dispatcher.register(AuxServicesEventType.class, auxiluaryServices);
+ dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
- dispatcher.register(LogAggregatorEventType.class, logAggregationService);
+
addService(dispatcher);
}
- protected LogAggregationService createLogAggregationService(Context context,
+ @Override
+ public void init(Configuration conf) {
+ LogHandler logHandler =
+ createLogHandler(conf, this.context, this.deletionService);
+ addIfService(logHandler);
+ dispatcher.register(LogHandlerEventType.class, logHandler);
+
+ super.init(conf);
+ }
+
+ private void addIfService(Object object) {
+ if (object instanceof Service) {
+ addService((Service) object);
+ }
+ }
+
+ protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
- return new LogAggregationService(context, deletionService);
+ if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+ return new LogAggregationService(this.dispatcher, context,
+ deletionService);
+ } else {
+ return new NonAggregatingLogHandler(this.dispatcher, deletionService);
+ }
}
public ContainersMonitor getContainersMonitor() {
@@ -190,48 +220,139 @@ public class ContainerManagerImpl extend
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress initialAddress = NetUtils.createSocketAddr(conf.get(
- YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS));
+ YarnConfiguration.NM_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS),
+ YarnConfiguration.DEFAULT_NM_PORT,
+ YarnConfiguration.NM_ADDRESS);
server =
rpc.getServer(ContainerManager.class, this, initialAddress, conf,
this.containerTokenSecretManager,
conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
+
+ // Enable service authorization?
+ if (conf.getBoolean(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ false)) {
+ refreshServiceAcls(conf, new NMPolicyProvider());
+ }
+
server.start();
- InetAddress hostNameResolved = null;
try {
- hostNameResolved = InetAddress.getLocalHost();
+ resolvedAddress = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
- this.context.getNodeId().setHost(hostNameResolved.getCanonicalHostName());
+ this.context.getNodeId().setHost(resolvedAddress.getCanonicalHostName());
this.context.getNodeId().setPort(server.getPort());
LOG.info("ContainerManager started at "
+ this.context.getNodeId().toString());
super.start();
}
+ void refreshServiceAcls(Configuration configuration,
+ PolicyProvider policyProvider) {
+ this.server.refreshServiceAcl(configuration, policyProvider);
+ }
+
@Override
public void stop() {
- if (auxiluaryServices.getServiceState() == STARTED) {
- auxiluaryServices.unregister(this);
+ if (auxiliaryServices.getServiceState() == STARTED) {
+ auxiliaryServices.unregister(this);
}
if (server != null) {
- server.close();
+ server.stop();
}
super.stop();
}
/**
+ * Authorize the request.
+ *
+ * @param containerID
+ * of the container
+ * @param launchContext
+ * passed if verifying the startContainer, null otherwise.
+ * @throws YarnRemoteException
+ */
+ private void authorizeRequest(ContainerId containerID,
+ ContainerLaunchContext launchContext) throws YarnRemoteException {
+
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+
+ String containerIDStr = containerID.toString();
+
+ UserGroupInformation remoteUgi;
+ try {
+ remoteUgi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ String msg = "Cannot obtain the user-name for containerId: "
+ + containerIDStr + ". Got exception: "
+ + StringUtils.stringifyException(e);
+ LOG.warn(msg);
+ throw RPCUtil.getRemoteException(msg);
+ }
+
+ boolean unauthorized = false;
+ StringBuilder messageBuilder = new StringBuilder(
+ "Unauthorized request to start container. ");
+
+ if (!remoteUgi.getUserName().equals(containerIDStr)) {
+ unauthorized = true;
+ messageBuilder.append("\nExpected containerId: "
+ + remoteUgi.getUserName() + " Found: " + containerIDStr);
+ }
+
+ if (launchContext != null) {
+
+ // Verify other things for startContainer() request.
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of TokenIdentifiers in the UGI from RPC: "
+ + remoteUgi.getTokenIdentifiers().size());
+ }
+ // We must and should get only one TokenIdentifier from the RPC.
+ ContainerTokenIdentifier tokenId = (ContainerTokenIdentifier) remoteUgi
+ .getTokenIdentifiers().iterator().next();
+ if (tokenId == null) {
+ unauthorized = true;
+ messageBuilder
+ .append("\nContainerTokenIdentifier cannot be null! Null found for "
+ + containerIDStr);
+ } else {
+
+ Resource resource = tokenId.getResource();
+ if (!resource.equals(launchContext.getResource())) {
+ unauthorized = true;
+ messageBuilder.append("\nExpected resource " + resource
+ + " but found " + launchContext.getResource());
+ }
+ }
+ }
+
+ if (unauthorized) {
+ String msg = messageBuilder.toString();
+ LOG.error(msg);
+ throw RPCUtil.getRemoteException(msg);
+ }
+ }
+
+ /**
* Start a container on this NodeManager.
*/
+ @SuppressWarnings("unchecked")
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
+ ContainerId containerID = launchContext.getContainerId();
+ authorizeRequest(containerID, launchContext);
+
LOG.info(" container is " + request);
-
+
// //////////// Parse credentials
ByteBuffer tokens = launchContext.getContainerTokens();
Credentials credentials = new Credentials();
@@ -253,9 +374,8 @@ public class ContainerManagerImpl extend
}
// //////////// End of parsing credentials
- Container container =
- new ContainerImpl(this.dispatcher, launchContext, credentials, metrics);
- ContainerId containerID = launchContext.getContainerId();
+ Container container = new ContainerImpl(getConfig(), this.dispatcher,
+ launchContext, credentials, metrics);
ApplicationId applicationID =
containerID.getApplicationAttemptId().getApplicationId();
if (context.getContainers().putIfAbsent(containerID, container) != null) {
@@ -268,16 +388,21 @@ public class ContainerManagerImpl extend
}
// Create the application
- Application application = new ApplicationImpl(dispatcher,
- launchContext.getUser(), applicationID, credentials);
+ Application application =
+ new ApplicationImpl(dispatcher, this.aclsManager,
+ launchContext.getUser(), applicationID, credentials, context);
if (null ==
context.getApplications().putIfAbsent(applicationID, application)) {
LOG.info("Creating a new application reference for app "
+ applicationID);
+ dispatcher.getEventHandler().handle(
+ new ApplicationInitEvent(applicationID, container
+ .getLaunchContext().getApplicationACLs()));
}
// TODO: Validate the request
- dispatcher.getEventHandler().handle(new ApplicationInitEvent(container));
+ dispatcher.getEventHandler().handle(
+ new ApplicationContainerInitEvent(container));
NMAuditLogger.logSuccess(launchContext.getUser(),
AuditConstants.START_CONTAINER, "ContainerManageImpl",
@@ -285,44 +410,44 @@ public class ContainerManagerImpl extend
StartContainerResponse response =
recordFactory.newRecordInstance(StartContainerResponse.class);
- response.addAllServiceResponse(auxiluaryServices.getMeta());
+ response.addAllServiceResponse(auxiliaryServices.getMeta());
+ // TODO launchedContainer misplaced -> doesn't necessarily mean a container
+ // launch. A finished Application will not launch containers.
metrics.launchedContainer();
metrics.allocateContainer(launchContext.getResource());
return response;
}
+ /**
+ * Stop the container running on this NodeManager.
+ */
@Override
+ @SuppressWarnings("unchecked")
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
+ ContainerId containerID = request.getContainerId();
+ // TODO: Only the container's owner can kill containers today.
+ authorizeRequest(containerID, null);
+
StopContainerResponse response =
recordFactory.newRecordInstance(StopContainerResponse.class);
- ContainerId containerID = request.getContainerId();
Container container = this.context.getContainers().get(containerID);
if (container == null) {
LOG.warn("Trying to stop unknown container " + containerID);
- String userName;
- try {
- userName = UserGroupInformation.getCurrentUser().getUserName();
- } catch (IOException e) {
- LOG.error("Error finding userName", e);
- return response;
- }
- NMAuditLogger.logFailure(userName,
+ NMAuditLogger.logFailure("UnknownUser",
AuditConstants.STOP_CONTAINER, "ContainerManagerImpl",
"Trying to stop unknown container!",
containerID.getApplicationAttemptId().getApplicationId(),
containerID);
return response; // Return immediately.
}
+
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
"Container killed by the ApplicationMaster."));
-
- // user logged here not ideal since just getting user from container but
- // request doesn't have anything and should be coming from user of AM so
- // should be the same or should be rejected by auth before here.
+
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl",
containerID.getApplicationAttemptId().getApplicationId(),
@@ -336,20 +461,26 @@ public class ContainerManagerImpl extend
}
@Override
- public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException {
+ public GetContainerStatusResponse getContainerStatus(
+ GetContainerStatusRequest request) throws YarnRemoteException {
+
ContainerId containerID = request.getContainerId();
+ // TODO: Only the container's owner can get containers' status today.
+ authorizeRequest(containerID, null);
+
LOG.info("Getting container-status for " + containerID);
Container container = this.context.getContainers().get(containerID);
if (container != null) {
ContainerStatus containerStatus = container.cloneAndGetContainerStatus();
LOG.info("Returning " + containerStatus);
- GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class);
+ GetContainerStatusResponse response = recordFactory
+ .newRecordInstance(GetContainerStatusResponse.class);
response.setStatus(containerStatus);
return response;
- } else {
- throw RPCUtil.getRemoteException("Container " + containerID
- + " is not handled by this NodeManager");
}
+
+ throw RPCUtil.getRemoteException("Container " + containerID
+ + " is not handled by this NodeManager");
}
class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
@@ -371,19 +502,19 @@ public class ContainerManagerImpl extend
@Override
public void handle(ApplicationEvent event) {
- Application app =
- ContainerManagerImpl.this.context.getApplications().get(
- event.getApplicationID());
+ Application app =
+ ContainerManagerImpl.this.context.getApplications().get(
+ event.getApplicationID());
if (app != null) {
app.handle(event);
} else {
- LOG.warn("Event " + event + " sent to absent application " +
- event.getApplicationID());
+ LOG.warn("Event " + event + " sent to absent application "
+ + event.getApplicationID());
}
}
-
}
+ @SuppressWarnings("unchecked")
@Override
public void handle(ContainerManagerEvent event) {
switch (event.getType()) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java Wed Nov 2 05:34:31 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.no
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.Event;
public class ApplicationEvent extends AbstractEvent<ApplicationEventType> {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java Wed Nov 2 05:34:31 2011
@@ -22,6 +22,7 @@ public enum ApplicationEventType {
// Source: ContainerManager
INIT_APPLICATION,
+ INIT_CONTAINER,
FINISH_APPLICATION,
// Source: ResourceLocalizationService
@@ -31,6 +32,6 @@ public enum ApplicationEventType {
// Source: Container
APPLICATION_CONTAINER_FINISHED,
- // Source: LogAggregationService.
- APPLICATION_FINISHED,
+ // Source: Log Handler
+ APPLICATION_LOG_HANDLING_FINISHED
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Wed Nov 2 05:34:31 2011
@@ -21,46 +21,69 @@ package org.apache.hadoop.yarn.server.no
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.util.ConverterUtils;
+/**
+ * The state machine for the representation of an Application
+ * within the NodeManager.
+ */
public class ApplicationImpl implements Application {
final Dispatcher dispatcher;
final String user;
final ApplicationId appId;
final Credentials credentials;
+ Map<ApplicationAccessType, String> applicationACLs;
+ final ApplicationACLsManager aclsManager;
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+ private final Context context;
private static final Log LOG = LogFactory.getLog(Application.class);
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
- public ApplicationImpl(Dispatcher dispatcher, String user,
- ApplicationId appId, Credentials credentials) {
+ public ApplicationImpl(Dispatcher dispatcher,
+ ApplicationACLsManager aclsManager, String user, ApplicationId appId,
+ Credentials credentials, Context context) {
this.dispatcher = dispatcher;
this.user = user.toString();
this.appId = appId;
this.credentials = credentials;
+ this.aclsManager = aclsManager;
+ this.context = context;
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
stateMachine = stateMachineFactory.make(this);
}
@@ -75,15 +98,23 @@ public class ApplicationImpl implements
}
@Override
- public synchronized ApplicationState getApplicationState() {
- // TODO: Synchro should be at statemachine level.
- // This is only for tests?
- return this.stateMachine.getCurrentState();
+ public ApplicationState getApplicationState() {
+ this.readLock.lock();
+ try {
+ return this.stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
}
@Override
public Map<ContainerId, Container> getContainers() {
- return this.containers;
+ this.readLock.lock();
+ try {
+ return this.containers;
+ } finally {
+ this.readLock.unlock();
+ }
}
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
@@ -97,11 +128,14 @@ public class ApplicationImpl implements
// Transitions from NEW state
.addTransition(ApplicationState.NEW, ApplicationState.INITING,
ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
+ .addTransition(ApplicationState.NEW, ApplicationState.NEW,
+ ApplicationEventType.INIT_CONTAINER,
+ new InitContainerTransition())
// Transitions from INITING state
.addTransition(ApplicationState.INITING, ApplicationState.INITING,
- ApplicationEventType.INIT_APPLICATION,
- new AppIsInitingTransition())
+ ApplicationEventType.INIT_CONTAINER,
+ new InitContainerTransition())
.addTransition(ApplicationState.INITING,
EnumSet.of(ApplicationState.FINISHING_CONTAINERS_WAIT,
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
@@ -114,8 +148,8 @@ public class ApplicationImpl implements
// Transitions from RUNNING state
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
- ApplicationEventType.INIT_APPLICATION,
- new DuplicateAppInitTransition())
+ ApplicationEventType.INIT_CONTAINER,
+ new InitContainerTransition())
.addTransition(ApplicationState.RUNNING,
ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
@@ -143,7 +177,13 @@ public class ApplicationImpl implements
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition())
-
+
+ // Transitions from FINISHED state
+ .addTransition(ApplicationState.FINISHED,
+ ApplicationState.FINISHED,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+ new AppLogsAggregatedTransition())
+
// create the topology tables
.installTopology();
@@ -151,14 +191,18 @@ public class ApplicationImpl implements
/**
* Notify services of new application.
+ *
+ * In particular, this requests that the {@link ResourceLocalizationService}
+ * localize the application-scoped resources.
*/
+ @SuppressWarnings("unchecked")
static class AppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
- ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
- Container container = initEvent.getContainer();
- app.containers.put(container.getContainerID(), container);
+ ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
+ app.applicationACLs = initEvent.getApplicationACLs();
+ app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
@@ -166,20 +210,40 @@ public class ApplicationImpl implements
}
/**
- * Absorb initialization events while the application initializes.
+ * Handles INIT_CONTAINER events which request that we launch a new
+ * container. When we're still in the INITTING state, we simply
+ * queue these up. When we're in the RUNNING state, we pass along
+ * an ContainerInitEvent to the appropriate ContainerImpl.
*/
- static class AppIsInitingTransition implements
+ @SuppressWarnings("unchecked")
+ static class InitContainerTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
- ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
+ ApplicationContainerInitEvent initEvent =
+ (ApplicationContainerInitEvent) event;
Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container);
LOG.info("Adding " + container.getContainerID()
+ " to application " + app.toString());
+
+ switch (app.getApplicationState()) {
+ case RUNNING:
+ app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
+ container.getContainerID()));
+ break;
+ case INITING:
+ case NEW:
+ // these get queued up and sent out in AppInitDoneTransition
+ break;
+ default:
+ assert false : "Invalid state for InitContainerTransition: " +
+ app.getApplicationState();
+ }
}
}
+ @SuppressWarnings("unchecked")
static class AppInitDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
@@ -187,9 +251,9 @@ public class ApplicationImpl implements
// Inform the logAggregator
app.dispatcher.getEventHandler().handle(
- new LogAggregatorAppStartedEvent(app.appId, app.user,
- app.credentials,
- ContainerLogsRetentionPolicy.ALL_CONTAINERS)); // TODO: Fix
+ new LogHandlerAppStartedEvent(app.appId, app.user,
+ app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
+ app.applicationACLs));
// Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) {
@@ -199,19 +263,6 @@ public class ApplicationImpl implements
}
}
- static class DuplicateAppInitTransition implements
- SingleArcTransition<ApplicationImpl, ApplicationEvent> {
- @Override
- public void transition(ApplicationImpl app, ApplicationEvent event) {
- ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
- Container container = initEvent.getContainer();
- app.containers.put(container.getContainerID(), container);
- LOG.info("Adding " + container.getContainerID()
- + " to application " + app.toString());
- app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
- container.getContainerID()));
- }
- }
static final class ContainerDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@@ -229,15 +280,21 @@ public class ApplicationImpl implements
}
}
+ @SuppressWarnings("unchecked")
void handleAppFinishWithContainersCleanedup() {
// Delete Application level resources
this.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
+ // tell any auxiliary services that the app is done
+ this.dispatcher.getEventHandler().handle(
+ new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, appId));
+
// TODO: Trigger the LogsManager
}
+ @SuppressWarnings("unchecked")
static class AppFinishTriggeredTransition
implements
MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@@ -286,38 +343,57 @@ public class ApplicationImpl implements
}
+ @SuppressWarnings("unchecked")
static class AppCompletelyDoneTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
+
// Inform the logService
app.dispatcher.getEventHandler().handle(
- new LogAggregatorAppFinishedEvent(app.appId));
+ new LogHandlerAppFinishedEvent(app.appId));
+
+ }
+ }
+
+ static class AppLogsAggregatedTransition implements
+ SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
+ ApplicationId appId = event.getApplicationID();
+ app.context.getApplications().remove(appId);
+ app.aclsManager.removeApplication(appId);
}
}
@Override
- public synchronized void handle(ApplicationEvent event) {
+ public void handle(ApplicationEvent event) {
- ApplicationId applicationID = event.getApplicationID();
- LOG.info("Processing " + applicationID + " of type " + event.getType());
+ this.writeLock.lock();
- ApplicationState oldState = stateMachine.getCurrentState();
- ApplicationState newState = null;
try {
- // queue event requesting init of the same app
- newState = stateMachine.doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.warn("Can't handle this event at current state", e);
- }
- if (oldState != newState) {
- LOG.info("Application " + applicationID + " transitioned from "
- + oldState + " to " + newState);
+ ApplicationId applicationID = event.getApplicationID();
+ LOG.info("Processing " + applicationID + " of type " + event.getType());
+
+ ApplicationState oldState = stateMachine.getCurrentState();
+ ApplicationState newState = null;
+ try {
+ // queue event requesting init of the same app
+ newState = stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.warn("Can't handle this event at current state", e);
+ }
+ if (oldState != newState) {
+ LOG.info("Application " + applicationID + " transitioned from "
+ + oldState + " to " + newState);
+ }
+ } finally {
+ this.writeLock.unlock();
}
}
@Override
public String toString() {
- return ConverterUtils.toString(appId);
+ return appId.toString();
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java Wed Nov 2 05:34:31 2011
@@ -18,20 +18,22 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
public class ApplicationInitEvent extends ApplicationEvent {
- private final Container container;
+ private final Map<ApplicationAccessType, String> applicationACLs;
- public ApplicationInitEvent(Container container) {
- super(container.getContainerID().getApplicationAttemptId().getApplicationId(),
- ApplicationEventType.INIT_APPLICATION);
- this.container = container;
+ public ApplicationInitEvent(ApplicationId appId,
+ Map<ApplicationAccessType, String> acls) {
+ super(appId, ApplicationEventType.INIT_APPLICATION);
+ this.applicationACLs = acls;
}
- public Container getContainer() {
- return this.container;
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return this.applicationACLs;
}
-
}