You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:03 UTC
svn commit: r1196458 [12/19] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/
bin/ conf/ dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-cl...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Wed Nov 2 05:34:31 2011
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Reentr
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.StringUtils;
@@ -55,7 +56,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -77,6 +78,9 @@ public class ContainerImpl implements Co
private int exitCode = YarnConfiguration.INVALID_CONTAINER_EXIT_STATUS;
private final StringBuilder diagnostics;
+ /** The NM-wide configuration - not specific to this container */
+ private final Configuration daemonConf;
+
private static final Log LOG = LogFactory.getLog(Container.class);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private final Map<LocalResourceRequest,String> pendingResources =
@@ -90,9 +94,11 @@ public class ContainerImpl implements Co
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
- public ContainerImpl(Dispatcher dispatcher,
+ public ContainerImpl(Configuration conf,
+ Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics) {
+ this.daemonConf = conf;
this.dispatcher = dispatcher;
this.launchContext = launchContext;
this.diagnostics = new StringBuilder();
@@ -152,6 +158,19 @@ public class ContainerImpl implements Co
ContainerState.LOCALIZATION_FAILED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
+ // container not launched so kill is a no-op
+ .addTransition(ContainerState.LOCALIZATION_FAILED,
+ ContainerState.LOCALIZATION_FAILED,
+ ContainerEventType.KILL_CONTAINER)
+ // container cleanup triggers a release of all resources
+ // regardless of whether they were localized or not
+ // LocalizedResource handles release event in all states
+ .addTransition(ContainerState.LOCALIZATION_FAILED,
+ ContainerState.LOCALIZATION_FAILED,
+ ContainerEventType.RESOURCE_LOCALIZED)
+ .addTransition(ContainerState.LOCALIZATION_FAILED,
+ ContainerState.LOCALIZATION_FAILED,
+ ContainerEventType.RESOURCE_FAILED)
// From LOCALIZED State
.addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
@@ -162,8 +181,6 @@ public class ContainerImpl implements Co
.addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
- // TODO race: Can lead to a CONTAINER_LAUNCHED event at state KILLING,
- // and a container which will never be killed by the NM.
.addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
@@ -218,6 +235,9 @@ public class ContainerImpl implements Co
ContainerState.KILLING,
ContainerEventType.RESOURCE_LOCALIZED,
new LocalizedResourceDuringKillTransition())
+ .addTransition(ContainerState.KILLING,
+ ContainerState.KILLING,
+ ContainerEventType.RESOURCE_FAILED)
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
@@ -233,6 +253,12 @@ public class ContainerImpl implements Co
ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
CONTAINER_DONE_TRANSITION)
+ // Handle a launched container during killing stage is a no-op
+ // as cleanup container is always handled after launch container event
+ // in the container launcher
+ .addTransition(ContainerState.KILLING,
+ ContainerState.KILLING,
+ ContainerEventType.CONTAINER_LAUNCHED)
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -399,7 +425,7 @@ public class ContainerImpl implements Co
// Remove the container from the resource-monitor
eventHandler.handle(new ContainerStopMonitoringEvent(containerID));
// Tell the logService too
- eventHandler.handle(new LogAggregatorContainerFinishedEvent(
+ eventHandler.handle(new LogHandlerContainerFinishedEvent(
containerID, exitCode));
}
@@ -431,6 +457,20 @@ public class ContainerImpl implements Co
}
+ /**
+ * State transition when a NEW container receives the INIT_CONTAINER
+ * message.
+ *
+ * If there are resources to localize, sends a
+ * ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES)
+ * to the ResourceLocalizationManager and enters LOCALIZING state.
+ *
+ * If there are no resources to localize, sends LAUNCH_CONTAINER event
+ * and enters LOCALIZED state directly.
+ *
+ * If there are any invalid resources specified, enters LOCALIZATION_FAILED
+ * directly.
+ */
@SuppressWarnings("unchecked") // dispatcher not typed
static class RequestResourcesTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@@ -513,6 +553,10 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Transition when one of the requested resources for this container
+ * has been successfully localized.
+ */
@SuppressWarnings("unchecked") // dispatcher not typed
static class LocalizedTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@@ -540,22 +584,34 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Transition from LOCALIZED state to RUNNING state upon receiving
+ * a CONTAINER_LAUNCHED event
+ */
@SuppressWarnings("unchecked") // dispatcher not typed
static class LaunchTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Inform the ContainersMonitor to start monitoring the container's
// resource usage.
- // TODO: Fix pmem limits below
- long vmemBytes =
+ long pmemBytes =
container.getLaunchContext().getResource().getMemory() * 1024 * 1024L;
+ float pmemRatio = container.daemonConf.getFloat(
+ YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ long vmemBytes = (long) (pmemRatio * pmemBytes);
+
container.dispatcher.getEventHandler().handle(
new ContainerStartMonitoringEvent(container.getContainerID(),
- vmemBytes, -1));
+ vmemBytes, pmemBytes));
container.metrics.runningContainer();
}
}
+ /**
+ * Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
+ * upon EXITED_WITH_SUCCESS message.
+ */
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition {
@@ -582,6 +638,10 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Transition to EXITED_WITH_FAILURE state upon
+ * CONTAINER_EXITED_WITH_FAILURE state.
+ **/
@SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition {
@@ -609,6 +669,9 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
+ */
static class KilledExternallyTransition extends ExitedWithFailureTransition {
KilledExternallyTransition() {
super(true);
@@ -621,6 +684,10 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
+ * RESOURCE_FAILED event.
+ */
static class ResourceFailedTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -638,7 +705,11 @@ public class ContainerImpl implements Co
container.metrics.endInitingContainer();
}
}
-
+
+ /**
+ * Transition from LOCALIZING to KILLING upon receiving
+ * KILL_CONTAINER event.
+ */
static class KillDuringLocalizationTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -652,6 +723,10 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Remain in KILLING state when receiving a RESOURCE_LOCALIZED request
+ * while in the process of killing.
+ */
static class LocalizedResourceDuringKillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -669,6 +744,11 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Transitions upon receiving KILL_CONTAINER:
+ * - LOCALIZED -> KILLING
+ * - RUNNING -> KILLING
+ */
@SuppressWarnings("unchecked") // dispatcher not typed
static class KillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@@ -683,6 +763,10 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
+ * upon receiving CONTAINER_KILLED_ON_REQUEST.
+ */
static class ContainerKilledTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -696,6 +780,13 @@ public class ContainerImpl implements Co
}
}
+ /**
+ * Handle the following transitions:
+ * - NEW -> DONE upon KILL_CONTAINER
+ * - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
+ * KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
+ * -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
+ */
static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -703,7 +794,10 @@ public class ContainerImpl implements Co
container.finished();
}
}
-
+
+ /**
+ * Update diagnostics, staying in the same state.
+ */
static class ContainerDiagnosticsUpdateTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Nov 2 05:34:31 2011
@@ -27,11 +27,11 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,11 +45,14 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@@ -57,21 +60,33 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
+import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> {
private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
- public static final String CONTAINER_SCRIPT = "task.sh";
+ public static final String CONTAINER_SCRIPT = "launch_container.sh";
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
+ private static final String PID_FILE_NAME_FMT = "%s.pid";
+
private final Dispatcher dispatcher;
private final ContainerExecutor exec;
private final Application app;
private final Container container;
private final Configuration conf;
private final LocalDirAllocator logDirsSelector;
+
+ private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
+ private volatile AtomicBoolean completed = new AtomicBoolean(false);
+
+ private long sleepDelayBeforeSigKill = 250;
+ private long maxKillWaitTime = 2000;
+
+ private Path pidFilePath = null;
public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
ContainerExecutor exec, Application app, Container container) {
@@ -81,6 +96,12 @@ public class ContainerLaunch implements
this.container = container;
this.dispatcher = dispatcher;
this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
+ this.sleepDelayBeforeSigKill =
+ conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
+ YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
+ this.maxKillWaitTime =
+ conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
+ YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
}
@Override
@@ -88,7 +109,8 @@ public class ContainerLaunch implements
public Integer call() {
final ContainerLaunchContext launchContext = container.getLaunchContext();
final Map<Path,String> localResources = container.getLocalizedResources();
- String containerIdStr = ConverterUtils.toString(container.getContainerID());
+ ContainerId containerID = container.getContainerID();
+ String containerIdStr = ConverterUtils.toString(containerID);
final String user = launchContext.getUser();
final List<String> command = launchContext.getCommands();
int ret = -1;
@@ -97,11 +119,11 @@ public class ContainerLaunch implements
// /////////////////////////// Variable expansion
// Before the container script gets written out.
List<String> newCmds = new ArrayList<String>(command.size());
- String appIdStr = app.toString();
+ String appIdStr = app.getAppId().toString();
Path containerLogDir =
- this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
- + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf,
- false);
+ this.logDirsSelector.getLocalPathForWrite(ContainerLaunch
+ .getRelativeContainerLogDir(appIdStr, containerIdStr),
+ LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
@@ -124,19 +146,18 @@ public class ContainerLaunch implements
FileContext lfs = FileContext.getLocalFSFileContext();
LocalDirAllocator lDirAllocator =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
+
Path nmPrivateContainerScriptPath =
lDirAllocator.getLocalPathForWrite(
- ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
- + appIdStr + Path.SEPARATOR + containerIdStr
- + Path.SEPARATOR + CONTAINER_SCRIPT, this.conf);
+ getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ + CONTAINER_SCRIPT, this.conf);
Path nmPrivateTokensPath =
lDirAllocator.getLocalPathForWrite(
- ResourceLocalizationService.NM_PRIVATE_DIR
- + Path.SEPARATOR
- + containerIdStr
+ getContainerPrivateDir(appIdStr, containerIdStr)
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerIdStr), this.conf);
+
DataOutputStream containerScriptOutStream = null;
DataOutputStream tokensOutStream = null;
@@ -147,6 +168,17 @@ public class ContainerLaunch implements
+ ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ Path.SEPARATOR + containerIdStr,
LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
+
+ String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
+ containerIdStr);
+
+ // pid file should be in nm private dir so that it is not
+ // accessible by users
+ pidFilePath = lDirAllocator.getLocalPathForWrite(
+ ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ + pidFileSuffix,
+ this.conf);
+
try {
// /////////// Write out the container-script in the nmPrivate space.
String[] localDirs =
@@ -191,21 +223,36 @@ public class ContainerLaunch implements
// LaunchContainer is a blocking call. We are here almost means the
// container is launched, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent(
- container.getContainerID(),
+ containerID,
ContainerEventType.CONTAINER_LAUNCHED));
- ret =
- exec.launchContainer(container, nmPrivateContainerScriptPath,
- nmPrivateTokensPath, user, appIdStr, containerWorkDir);
+ // Check if the container is signalled to be killed.
+ if (!shouldLaunchContainer.compareAndSet(false, true)) {
+ LOG.info("Container " + containerIdStr + " not launched as "
+ + "cleanup already called");
+ ret = ExitCode.TERMINATED.getExitCode();
+ }
+ else {
+ exec.activateContainer(containerID, pidFilePath);
+ ret =
+ exec.launchContainer(container, nmPrivateContainerScriptPath,
+ nmPrivateTokensPath, user, appIdStr, containerWorkDir);
+ }
} catch (Throwable e) {
LOG.warn("Failed to launch container", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
launchContext.getContainerId(),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
return ret;
+ } finally {
+ completed.set(true);
+ exec.deactivateContainer(containerID);
}
- if (ret == ExitCode.KILLED.getExitCode()) {
+ LOG.debug("Container " + containerIdStr + " completed with exit code "
+ + ret);
+ if (ret == ExitCode.FORCE_KILLED.getExitCode()
+ || ret == ExitCode.TERMINATED.getExitCode()) {
// If the process was killed, Send container_cleanedup_after_kill and
// just break out of this method.
dispatcher.getEventHandler().handle(
@@ -228,6 +275,129 @@ public class ContainerLaunch implements
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
return 0;
}
+
+ /**
+ * Cleanup the container.
+ * Cancels the launch if launch has not started yet or signals
+ * the executor to not execute the process if not already done so.
+ * Also, sends a SIGTERM followed by a SIGKILL to the process if
+ * the process id is available.
+ * @throws IOException
+ */
+ public void cleanupContainer() throws IOException {
+ ContainerId containerId = container.getContainerID();
+ String containerIdStr = ConverterUtils.toString(containerId);
+ LOG.info("Cleaning up container " + containerIdStr);
+
+ // launch flag will be set to true if process already launched
+ boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
+ if (!alreadyLaunched) {
+ LOG.info("Container " + containerIdStr + " not launched."
+ + " No cleanup needed to be done");
+ return;
+ }
+
+ LOG.debug("Marking container " + containerIdStr + " as inactive");
+ // this should ensure that if the container process has not launched
+ // by this time, it will never be launched
+ exec.deactivateContainer(containerId);
+
+ LOG.debug("Getting pid for container " + containerIdStr + " to kill"
+ + " from pid file "
+ + (pidFilePath != null ? pidFilePath.toString() : "null"));
+
+ // however the container process may have already started
+ try {
+
+ // get process id from pid file if available
+ // else if shell is still active, get it from the shell
+ String processId = null;
+ if (pidFilePath != null) {
+ processId = getContainerPid(pidFilePath);
+ }
+
+ // kill process
+ if (processId != null) {
+ String user = container.getLaunchContext().getUser();
+ LOG.debug("Sending signal to pid " + processId
+ + " as user " + user
+ + " for container " + containerIdStr);
+ if (sleepDelayBeforeSigKill > 0) {
+ boolean result = exec.signalContainer(user,
+ processId, Signal.TERM);
+ LOG.debug("Sent signal to pid " + processId
+ + " as user " + user
+ + " for container " + containerIdStr
+ + ", result=" + (result? "success" : "failed"));
+ new DelayedProcessKiller(user,
+ processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Got error when trying to cleanup container " + containerIdStr
+ + ", error=" + e.getMessage());
+ } finally {
+ // cleanup pid file if present
+ if (pidFilePath != null) {
+ FileContext lfs = FileContext.getLocalFSFileContext();
+ lfs.delete(pidFilePath, false);
+ }
+ }
+ }
+
+ /**
+ * Loop through for a time-bounded interval waiting to
+ * read the process id from a file generated by a running process.
+ * @param pidFilePath File from which to read the process id
+ * @return Process ID
+ * @throws Exception
+ */
+ private String getContainerPid(Path pidFilePath) throws Exception {
+ String containerIdStr =
+ ConverterUtils.toString(container.getContainerID());
+ String processId = null;
+ LOG.debug("Accessing pid for container " + containerIdStr
+ + " from pid file " + pidFilePath);
+ int sleepCounter = 0;
+ final int sleepInterval = 100;
+
+ // loop waiting for pid file to show up
+ // until either the completed flag is set which means something bad
+ // happened or our timer expires in which case we admit defeat
+ while (!completed.get()) {
+ processId = ProcessIdFileReader.getProcessId(pidFilePath);
+ if (processId != null) {
+ LOG.debug("Got pid " + processId + " for container "
+ + containerIdStr);
+ break;
+ }
+ else if ((sleepCounter*sleepInterval) > maxKillWaitTime) {
+ LOG.info("Could not get pid for " + containerIdStr
+ + ". Waited for " + maxKillWaitTime + " ms.");
+ break;
+ }
+ else {
+ ++sleepCounter;
+ Thread.sleep(sleepInterval);
+ }
+ }
+ return processId;
+ }
+
+ public static String getRelativeContainerLogDir(String appIdStr,
+ String containerIdStr) {
+ return appIdStr + Path.SEPARATOR + containerIdStr;
+ }
+
+ private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
+ return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+ + Path.SEPARATOR;
+ }
+
+ private String getAppPrivateDir(String appIdStr) {
+ return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ + appIdStr;
+ }
private static class ShellScriptBuilder {
@@ -260,7 +430,7 @@ public class ContainerLaunch implements
if (dst.toUri().getPath().indexOf('/') != -1) {
line("mkdir -p ", dst.getParent().toString());
}
- line("ln -sf ", src.toUri().getPath(), " ", dst.toString());
+ line("ln -sf \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
return this;
}
@@ -279,7 +449,7 @@ public class ContainerLaunch implements
public String toString() {
return sb.toString();
}
-
+
}
private static void putEnvIfNotNull(
@@ -301,7 +471,7 @@ public class ContainerLaunch implements
/**
* Non-modifiable environment variables
*/
-
+
putEnvIfNotNull(environment, Environment.USER.name(), container.getUser());
putEnvIfNotNull(environment,
@@ -335,14 +505,23 @@ public class ContainerLaunch implements
* Modifiable environment variables
*/
- putEnvIfAbsent(environment, Environment.JAVA_HOME.name());
- putEnvIfAbsent(environment, Environment.HADOOP_COMMON_HOME.name());
- putEnvIfAbsent(environment, Environment.HADOOP_HDFS_HOME.name());
- putEnvIfAbsent(environment, Environment.YARN_HOME.name());
+ // allow containers to override these variables
+ String[] whitelist = conf.get(YarnConfiguration.NM_ENV_WHITELIST, YarnConfiguration.DEFAULT_NM_ENV_WHITELIST).split(",");
+
+ for(String whitelistEnvVariable : whitelist) {
+ putEnvIfAbsent(environment, whitelistEnvVariable.trim());
+ }
+ // variables here will be forced in, even if the container has specified them.
+ Apps.setEnvFromInputString(
+ environment,
+ conf.get(
+ YarnConfiguration.NM_ADMIN_USER_ENV,
+ YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV)
+ );
}
-
- private static void writeLaunchEnv(OutputStream out,
+
+ static void writeLaunchEnv(OutputStream out,
Map<String,String> environment, Map<Path,String> resources,
List<String> command)
throws IOException {
@@ -357,9 +536,9 @@ public class ContainerLaunch implements
sb.symlink(link.getKey(), link.getValue());
}
}
+
ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
- cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec ");
- cmd.add("/bin/bash ");
+ cmd.add("exec /bin/bash ");
cmd.add("-c ");
cmd.add("\"");
for (String cs : command) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Wed Nov 2 05:34:31 2011
@@ -26,21 +26,25 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.service.AbstractService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* The launcher for the containers. This service should be started only after
* the {@link ResourceLocalizationService} is started as it depends on creation
@@ -50,22 +54,28 @@ import org.apache.hadoop.yarn.service.Ab
public class ContainersLauncher extends AbstractService
implements EventHandler<ContainersLauncherEvent> {
+ private static final Log LOG = LogFactory.getLog(ContainersLauncher.class);
+
private final Context context;
private final ContainerExecutor exec;
private final Dispatcher dispatcher;
private final ExecutorService containerLauncher =
- Executors.newCachedThreadPool();
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setNameFormat("ContainersLauncher #%d")
+ .build());
private final Map<ContainerId,RunningContainer> running =
Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>());
private static final class RunningContainer {
- public RunningContainer(String string, Future<Integer> submit) {
- this.user = string;
+ public RunningContainer(Future<Integer> submit,
+ ContainerLaunch launcher) {
this.runningcontainer = submit;
+ this.launcher = launcher;
}
- String user;
Future<Integer> runningcontainer;
+ ContainerLaunch launcher;
}
@@ -99,7 +109,6 @@ public class ContainersLauncher extends
// TODO: ContainersLauncher launches containers one by one!!
Container container = event.getContainer();
ContainerId containerId = container.getContainerID();
- String userName = container.getUser();
switch (event.getType()) {
case LAUNCH_CONTAINER:
Application app =
@@ -109,33 +118,26 @@ public class ContainersLauncher extends
new ContainerLaunch(getConfig(), dispatcher, exec, app,
event.getContainer());
running.put(containerId,
- new RunningContainer(userName,
- containerLauncher.submit(launch)));
+ new RunningContainer(containerLauncher.submit(launch),
+ launch));
break;
case CLEANUP_CONTAINER:
RunningContainer rContainerDatum = running.remove(containerId);
Future<Integer> rContainer = rContainerDatum.runningcontainer;
- if (rContainer != null) {
-
- if (rContainer.isDone()) {
- // The future is already done by this time.
- break;
- }
-
- // Cancel the future so that it won't be launched if it isn't already.
+ if (rContainer != null
+ && !rContainer.isDone()) {
+ // Cancel the future so that it won't be launched
+ // if it isn't already.
rContainer.cancel(false);
-
- // Kill the container
- String processId = exec.getProcessId(containerId);
- if (processId != null) {
- try {
- exec.signalContainer(rContainerDatum.user,
- processId, Signal.KILL);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ }
+
+ // Cleanup a container whether it is running/killed/completed, so that
+ // no sub-processes are alive.
+ try {
+ rContainerDatum.launcher.cleanupContainer();
+ } catch (IOException e) {
+ LOG.warn("Got exception while cleaning container " + containerId
+ + ". Ignoring.");
}
break;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Wed Nov 2 05:34:31 2011
@@ -65,6 +65,9 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ContainerLocalizer {
@@ -178,7 +181,8 @@ public class ContainerLocalizer {
}
ExecutorService createDownloadThreadPool() {
- return Executors.newSingleThreadExecutor();
+ return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("ContainerLocalizer Downloader").build());
}
Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java Wed Nov 2 05:34:31 2011
@@ -33,6 +33,7 @@ public class LocalResourceRequest
private final Path loc;
private final long timestamp;
private final LocalResourceType type;
+ private final LocalResourceVisibility visibility;
/**
* Wrap API resource to match against cache of localized resources.
@@ -43,13 +44,16 @@ public class LocalResourceRequest
throws URISyntaxException {
this(ConverterUtils.getPathFromYarnURL(resource.getResource()),
resource.getTimestamp(),
- resource.getType());
+ resource.getType(),
+ resource.getVisibility());
}
- LocalResourceRequest(Path loc, long timestamp, LocalResourceType type) {
+ LocalResourceRequest(Path loc, long timestamp, LocalResourceType type,
+ LocalResourceVisibility visibility) {
this.loc = loc;
this.timestamp = timestamp;
this.type = type;
+ this.visibility = visibility;
}
@Override
@@ -114,7 +118,7 @@ public class LocalResourceRequest
@Override
public LocalResourceVisibility getVisibility() {
- throw new UnsupportedOperationException();
+ return visibility;
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Wed Nov 2 05:34:31 2011
@@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.no
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -37,6 +40,9 @@ import org.apache.hadoop.yarn.server.nod
class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
+ private static final String RANDOM_DIR_REGEX = "-?\\d+";
+ private static final Pattern RANDOM_DIR_PATTERN = Pattern
+ .compile(RANDOM_DIR_REGEX);
private final String user;
private final Dispatcher dispatcher;
@@ -83,28 +89,44 @@ class LocalResourcesTrackerImpl implemen
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
- // current synchronization guaranteed by crude RLS event for cleanup
+ // current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.get(rem.getRequest());
if (null == rsrc) {
- LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
- " from " + getUser());
+ LOG.error("Attempt to remove absent resource: " + rem.getRequest()
+ + " from " + getUser());
return true;
}
if (rsrc.getRefCount() > 0
- || ResourceState.DOWNLOADING.equals(rsrc.getState())
- || rsrc != rem) {
+ || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) {
// internal error
- LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
+ LOG.error("Attempt to remove resource: " + rsrc
+ + " with non-zero refcount");
assert false;
return false;
}
- localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
- delService.delete(getUser(), rsrc.getLocalPath());
+ delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
return true;
}
+
+ /**
+ * Returns the path up to the random directory component.
+ */
+ private Path getPathToDelete(Path localPath) {
+ Path delPath = localPath.getParent();
+ String name = delPath.getName();
+ Matcher matcher = RANDOM_DIR_PATTERN.matcher(name);
+ if (matcher.matches()) {
+ return delPath;
+ } else {
+ LOG.warn("Random directory component did not match. " +
+ "Deleting localized path only");
+ return localPath;
+ }
+ }
+
@Override
public String getUser() {
return user;
@@ -114,5 +136,4 @@ class LocalResourcesTrackerImpl implemen
public Iterator<LocalizedResource> iterator() {
return localrsrc.values().iterator();
}
-
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Wed Nov 2 05:34:31 2011
@@ -205,6 +205,11 @@ public class LocalizedResource implement
// typedef
}
+ /**
+ * Transition from INIT to DOWNLOADING.
+ * Sends a {@link LocalizerResourceRequestEvent} to the
+ * {@link ResourceLocalizationService}.
+ */
@SuppressWarnings("unchecked") // dispatcher not typed
private static class FetchResourceTransition extends ResourceTransition {
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Nov 2 05:34:31 2011
@@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileUtil;
@@ -61,16 +62,16 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.avro.ipc.Server;
+import org.apache.hadoop.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -104,11 +105,14 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ResourceLocalizationService extends CompositeService
implements EventHandler<LocalizationEvent>, LocalizationProtocol {
@@ -133,8 +137,18 @@ public class ResourceLocalizationService
private final ScheduledExecutorService cacheCleanup;
private final LocalResourcesTracker publicRsrc;
+
+ /**
+ * Map of LocalResourceTrackers keyed by username, for private
+ * resources.
+ */
private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String,LocalResourcesTracker>();
+
+ /**
+ * Map of LocalResourceTrackers keyed by appid, for application
+ * resources.
+ */
private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
new ConcurrentHashMap<String,LocalResourcesTracker>();
@@ -146,7 +160,10 @@ public class ResourceLocalizationService
this.delService = delService;
this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
- this.cacheCleanup = new ScheduledThreadPoolExecutor(1);
+ this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryBuilder()
+ .setNameFormat("ResourceLocalizationService Cache Cleanup")
+ .build());
}
FileContext getLocalFileContext(Configuration conf) {
@@ -200,7 +217,9 @@ public class ResourceLocalizationService
cacheCleanupPeriod =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizationServerAddress = NetUtils.createSocketAddr(
- conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
+ conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS),
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT,
+ YarnConfiguration.NM_LOCALIZER_ADDRESS);
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
@@ -217,8 +236,15 @@ public class ResourceLocalizationService
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
server = createServer();
- LOG.info("Localizer started on port " + server.getPort());
server.start();
+ String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS)
+ .split(":")[0];
+ getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":"
+ + server.getPort());
+ localizationServerAddress = NetUtils.createSocketAddr(
+ getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
+ LOG.info("Localizer started on port " + server.getPort());
super.start();
}
@@ -234,155 +260,197 @@ public class ResourceLocalizationService
secretManager = new LocalizerTokenSecretManager();
}
- return rpc.getServer(LocalizationProtocol.class, this,
+ Server server = rpc.getServer(LocalizationProtocol.class, this,
localizationServerAddress, conf, secretManager,
conf.getInt(YarnConfiguration.NM_LOCALIZER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT));
-
+
+ // Enable service authorization?
+ if (conf.getBoolean(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ false)) {
+ server.refreshServiceAcl(conf, new NMPolicyProvider());
+ }
+
+ return server;
}
@Override
public void stop() {
if (server != null) {
- server.close();
+ server.stop();
}
cacheCleanup.shutdown();
super.stop();
}
@Override
- @SuppressWarnings("unchecked") // dispatcher not typed
public void handle(LocalizationEvent event) {
- String userName;
- String appIDStr;
- Container c;
- Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
- LocalResourcesTracker tracker;
// TODO: create log dir as $logdir/$user/$appId
switch (event.getType()) {
case INIT_APPLICATION_RESOURCES:
- Application app =
- ((ApplicationLocalizationEvent)event).getApplication();
- // 0) Create application tracking structs
- userName = app.getUser();
- privateRsrc.putIfAbsent(userName,
- new LocalResourcesTrackerImpl(userName, dispatcher));
- if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
- new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
- LOG.warn("Initializing application " + app + " already present");
- assert false; // TODO: FIXME assert doesn't help
- // ^ The condition is benign. Tests should fail and it
- // should appear in logs, but it's an internal error
- // that should have no effect on applications
- }
- // 1) Signal container init
- dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
- app.getAppId()));
+ handleInitApplicationResources(
+ ((ApplicationLocalizationEvent)event).getApplication());
break;
case INIT_CONTAINER_RESOURCES:
- ContainerLocalizationRequestEvent rsrcReqs =
- (ContainerLocalizationRequestEvent) event;
- c = rsrcReqs.getContainer();
- LocalizerContext ctxt = new LocalizerContext(
- c.getUser(), c.getContainerID(), c.getCredentials());
- rsrcs = rsrcReqs.getRequestedResources();
- for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
- rsrcs.entrySet()) {
- tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
- c.getContainerID().getApplicationAttemptId().getApplicationId());
- for (LocalResourceRequest req : e.getValue()) {
- tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
- }
- }
+ handleInitContainerResources((ContainerLocalizationRequestEvent) event);
break;
case CACHE_CLEANUP:
- ResourceRetentionSet retain =
- new ResourceRetentionSet(delService, cacheTargetSize);
- retain.addResources(publicRsrc);
- LOG.debug("Resource cleanup (public) " + retain);
- for (LocalResourcesTracker t : privateRsrc.values()) {
- retain.addResources(t);
- LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
- }
+ handleCacheCleanup(event);
break;
case CLEANUP_CONTAINER_RESOURCES:
- ContainerLocalizationCleanupEvent rsrcCleanup =
- (ContainerLocalizationCleanupEvent) event;
- c = rsrcCleanup.getContainer();
- rsrcs = rsrcCleanup.getResources();
- for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
- rsrcs.entrySet()) {
- tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
- c.getContainerID().getApplicationAttemptId().getApplicationId());
- for (LocalResourceRequest req : e.getValue()) {
- tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
- }
- }
-
- // Delete the container directories
- userName = c.getUser();
- String containerIDStr = c.toString();
- appIDStr =
- ConverterUtils.toString(
- c.getContainerID().getApplicationAttemptId().getApplicationId());
- for (Path localDir : localDirs) {
-
- // Delete the user-owned container-dir
- Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
- Path userdir = new Path(usersdir, userName);
- Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
- Path appDir = new Path(allAppsdir, appIDStr);
- Path containerDir = new Path(appDir, containerIDStr);
- delService.delete(userName, containerDir, new Path[] {});
-
- // Delete the nmPrivate container-dir
- Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
- Path appSysDir = new Path(sysDir, appIDStr);
- Path containerSysDir = new Path(appSysDir, containerIDStr);
- delService.delete(null, containerSysDir, new Path[] {});
- }
-
- dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
- ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+ handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
break;
case DESTROY_APPLICATION_RESOURCES:
-
- Application application =
- ((ApplicationLocalizationEvent) event).getApplication();
- LocalResourcesTracker appLocalRsrcsTracker =
- appRsrc.remove(ConverterUtils.toString(application.getAppId()));
- if (null == appLocalRsrcsTracker) {
- LOG.warn("Removing uninitialized application " + application);
- }
- // TODO: What to do with appLocalRsrcsTracker?
-
- // Delete the application directories
- userName = application.getUser();
- appIDStr = application.toString();
- for (Path localDir : localDirs) {
-
- // Delete the user-owned app-dir
- Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
- Path userdir = new Path(usersdir, userName);
- Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
- Path appDir = new Path(allAppsdir, appIDStr);
- delService.delete(userName, appDir, new Path[] {});
-
- // Delete the nmPrivate app-dir
- Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
- Path appSysDir = new Path(sysDir, appIDStr);
- delService.delete(null, appSysDir, new Path[] {});
- }
-
- // TODO: decrement reference counts of all resources associated with this
- // app
-
- dispatcher.getEventHandler().handle(new ApplicationEvent(
- application.getAppId(),
- ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+ handleDestroyApplicationResources(
+ ((ApplicationLocalizationEvent)event).getApplication());
break;
+ default:
+ throw new YarnException("Unknown localization event: " + event);
}
}
+
+ /**
+ * Handle event received the first time any container is scheduled
+ * by a given application.
+ */
+ @SuppressWarnings("unchecked")
+ private void handleInitApplicationResources(Application app) {
+ // 0) Create application tracking structs
+ String userName = app.getUser();
+ privateRsrc.putIfAbsent(userName,
+ new LocalResourcesTrackerImpl(userName, dispatcher));
+ if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
+ new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
+ LOG.warn("Initializing application " + app + " already present");
+ assert false; // TODO: FIXME assert doesn't help
+ // ^ The condition is benign. Tests should fail and it
+ // should appear in logs, but it's an internal error
+ // that should have no effect on applications
+ }
+ // 1) Signal container init
+ //
+ // This is handled by the ApplicationImpl state machine and allows
+ // containers to proceed with launching.
+ dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
+ app.getAppId()));
+ }
+
+ /**
+ * For each of the requested resources for a container, determines the
+ * appropriate {@link LocalResourcesTracker} and forwards a
+ * {@link LocalResourceRequest} to that tracker.
+ */
+ private void handleInitContainerResources(
+ ContainerLocalizationRequestEvent rsrcReqs) {
+ Container c = rsrcReqs.getContainer();
+ LocalizerContext ctxt = new LocalizerContext(
+ c.getUser(), c.getContainerID(), c.getCredentials());
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+ rsrcReqs.getRequestedResources();
+ for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
+ rsrcs.entrySet()) {
+ LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
+ c.getContainerID().getApplicationAttemptId().getApplicationId());
+ for (LocalResourceRequest req : e.getValue()) {
+ tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
+ }
+ }
+ }
+
+ private void handleCacheCleanup(LocalizationEvent event) {
+ ResourceRetentionSet retain =
+ new ResourceRetentionSet(delService, cacheTargetSize);
+ retain.addResources(publicRsrc);
+ LOG.debug("Resource cleanup (public) " + retain);
+ for (LocalResourcesTracker t : privateRsrc.values()) {
+ retain.addResources(t);
+ LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
+ }
+ //TODO Check if appRsrcs should also be added to the retention set.
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private void handleCleanupContainerResources(
+ ContainerLocalizationCleanupEvent rsrcCleanup) {
+ Container c = rsrcCleanup.getContainer();
+ Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+ rsrcCleanup.getResources();
+ for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
+ rsrcs.entrySet()) {
+ LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
+ c.getContainerID().getApplicationAttemptId().getApplicationId());
+ for (LocalResourceRequest req : e.getValue()) {
+ tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
+ }
+ }
+
+ // Delete the container directories
+ String userName = c.getUser();
+ String containerIDStr = c.toString();
+ String appIDStr = ConverterUtils.toString(
+ c.getContainerID().getApplicationAttemptId().getApplicationId());
+ for (Path localDir : localDirs) {
+
+ // Delete the user-owned container-dir
+ Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
+ Path userdir = new Path(usersdir, userName);
+ Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+ Path appDir = new Path(allAppsdir, appIDStr);
+ Path containerDir = new Path(appDir, containerIDStr);
+ delService.delete(userName, containerDir, new Path[] {});
+
+ // Delete the nmPrivate container-dir
+
+ Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+ Path appSysDir = new Path(sysDir, appIDStr);
+ Path containerSysDir = new Path(appSysDir, containerIDStr);
+ delService.delete(null, containerSysDir, new Path[] {});
+ }
+
+ dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+ }
+
+
+ @SuppressWarnings({"unchecked"})
+ private void handleDestroyApplicationResources(Application application) {
+ String userName;
+ String appIDStr;
+ LocalResourcesTracker appLocalRsrcsTracker =
+ appRsrc.remove(ConverterUtils.toString(application.getAppId()));
+ if (null == appLocalRsrcsTracker) {
+ LOG.warn("Removing uninitialized application " + application);
+ }
+ // TODO: What to do with appLocalRsrcsTracker?
+
+ // Delete the application directories
+ userName = application.getUser();
+ appIDStr = application.toString();
+ for (Path localDir : localDirs) {
+
+ // Delete the user-owned app-dir
+ Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
+ Path userdir = new Path(usersdir, userName);
+ Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+ Path appDir = new Path(allAppsdir, appIDStr);
+ delService.delete(userName, appDir, new Path[] {});
+
+ // Delete the nmPrivate app-dir
+ Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+ Path appSysDir = new Path(sysDir, appIDStr);
+ delService.delete(null, appSysDir, new Path[] {});
+ }
+
+ // TODO: decrement reference counts of all resources associated with this
+ // app
+
+ dispatcher.getEventHandler().handle(new ApplicationEvent(
+ application.getAppId(),
+ ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+ }
+
LocalResourcesTracker getLocalResourcesTracker(
LocalResourceVisibility visibility, String user, ApplicationId appId) {
@@ -493,6 +561,17 @@ public class ResourceLocalizationService
}
+ private static ExecutorService createLocalizerExecutor(Configuration conf) {
+ int nThreads = conf.getInt(
+ YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT);
+ ThreadFactory tf = new ThreadFactoryBuilder()
+ .setNameFormat("PublicLocalizer #%d")
+ .build();
+ return Executors.newFixedThreadPool(nThreads, tf);
+ }
+
+
class PublicLocalizer extends Thread {
static final String PUBCACHE_CTXT = "public.cache.dirs";
@@ -508,16 +587,16 @@ public class ResourceLocalizationService
PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf),
- Executors.newFixedThreadPool(conf.getInt(
- YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT)),
+ createLocalizerExecutor(conf),
new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
}
-
+
PublicLocalizer(Configuration conf, FileContext lfs,
ExecutorService threadPool,
Map<Future<Path>,LocalizerResourceRequestEvent> pending,
Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
+ super("Public Localizer");
this.lfs = lfs;
this.conf = conf;
this.pending = pending;
@@ -634,6 +713,7 @@ public class ResourceLocalizationService
RecordFactoryProvider.getRecordFactory(getConfig());
LocalizerRunner(LocalizerContext context, String localizerId) {
+ super("LocalizerRunner for " + localizerId);
this.context = context;
this.localizerId = localizerId;
this.pending = new ArrayList<LocalizerResourceRequestEvent>();
@@ -762,36 +842,19 @@ public class ResourceLocalizationService
@Override
@SuppressWarnings("unchecked") // dispatcher not typed
public void run() {
+ Path nmPrivateCTokensPath = null;
try {
// Use LocalDirAllocator to get nmPrivateDir
- Path nmPrivateCTokensPath =
+ nmPrivateCTokensPath =
localDirsSelector.getLocalPathForWrite(
NM_PRIVATE_DIR
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
localizerId), getConfig());
+
// 0) init queue, etc.
// 1) write credentials to private dir
- DataOutputStream tokenOut = null;
- try {
- Credentials credentials = context.getCredentials();
- FileContext lfs = getLocalFileContext(getConfig());
- tokenOut =
- lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
- LOG.info("Writing credentials to the nmPrivate file "
- + nmPrivateCTokensPath.toString() + ". Credentials list: ");
- if (LOG.isDebugEnabled()) {
- for (Token<? extends TokenIdentifier> tk : credentials
- .getAllTokens()) {
- LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
- }
- }
- credentials.writeTokenStorageToStream(tokenOut);
- } finally {
- if (tokenOut != null) {
- tokenOut.close();
- }
- }
+ writeCredentials(nmPrivateCTokensPath);
// 2) exec initApplication and wait
exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
context.getUser(),
@@ -811,6 +874,31 @@ public class ResourceLocalizationService
for (LocalizerResourceRequestEvent event : scheduled.values()) {
event.getResource().unlock();
}
+ delService.delete(null, nmPrivateCTokensPath, new Path[] {});
+ }
+ }
+
+ private void writeCredentials(Path nmPrivateCTokensPath)
+ throws IOException {
+ DataOutputStream tokenOut = null;
+ try {
+ Credentials credentials = context.getCredentials();
+ FileContext lfs = getLocalFileContext(getConfig());
+ tokenOut =
+ lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
+ LOG.info("Writing credentials to the nmPrivate file "
+ + nmPrivateCTokensPath.toString() + ". Credentials list: ");
+ if (LOG.isDebugEnabled()) {
+ for (Token<? extends TokenIdentifier> tk : credentials
+ .getAllTokens()) {
+ LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
+ }
+ }
+ credentials.writeTokenStorageToStream(tokenOut);
+ } finally {
+ if (tokenOut != null) {
+ tokenOut.close();
+ }
}
}
@@ -821,6 +909,7 @@ public class ResourceLocalizationService
private final Dispatcher dispatcher;
public CacheCleanup(Dispatcher dispatcher) {
+ super("CacheCleanup");
this.dispatcher = dispatcher;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Wed Nov 2 05:34:31 2011
@@ -22,8 +22,15 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+/**
+ * Event that requests that the {@link ResourceLocalizationService} localize
+ * a set of resources for the given container. This is generated by
+ * {@link ContainerImpl} during container initialization.
+ */
public class ContainerLocalizationRequestEvent extends
ContainerLocalizationEvent {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java Wed Nov 2 05:34:31 2011
@@ -19,7 +19,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+/**
+ * Events handled by {@link ResourceLocalizationService}
+ */
public class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
public LocalizationEvent(LocalizationEventType event) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java Wed Nov 2 05:34:31 2011
@@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+/**
+ * Events delivered to the {@link ResourceLocalizationService}
+ */
public class LocalizerEvent extends AbstractEvent<LocalizerEventType> {
private final String localizerId;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java Wed Nov 2 05:34:31 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
public enum LocalizerEventType {
+ /** See {@link LocalizerResourceRequestEvent} */
REQUEST_RESOURCE_LOCALIZATION,
ABORT_LOCALIZATION
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerResourceRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerResourceRequestEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerResourceRequestEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerResourceRequestEvent.java Wed Nov 2 05:34:31 2011
@@ -20,8 +20,13 @@ package org.apache.hadoop.yarn.server.no
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizerContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.util.ConverterUtils;
+/**
+ * Event indicating that the {@link ResourceLocalizationService}
+ * should fetch this resource.
+ */
public class LocalizerResourceRequestEvent extends LocalizerEvent {
private final LocalizerContext context;
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Wed Nov 2 05:34:31 2011
@@ -17,8 +17,17 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
+
+/**
+ * Events delivered to {@link LocalizedResource}. Each of these
+ * events is a subclass of {@link ResourceEvent}.
+ */
public enum ResourceEventType {
+ /** See {@link ResourceRequestEvent} */
REQUEST,
+ /** See {@link ResourceLocalizedEvent} */
LOCALIZED,
+ /** See {@link ResourceReleaseEvent} */
RELEASE
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java Wed Nov 2 05:34:31 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
import java.util.Collection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -28,23 +30,23 @@ import org.apache.hadoop.security.token.
public class LocalizerTokenSelector implements
TokenSelector<LocalizerTokenIdentifier> {
+ private static final Log LOG = LogFactory
+ .getLog(LocalizerTokenSelector.class);
+
+ @SuppressWarnings("unchecked")
@Override
public Token<LocalizerTokenIdentifier> selectToken(Text service,
Collection<Token<? extends TokenIdentifier>> tokens) {
- System.err.print("=========== Using localizerTokenSelector");
-// if (service == null) {
-// return null;
-// }
+
+ LOG.debug("Using localizerTokenSelector.");
+
for (Token<? extends TokenIdentifier> token : tokens) {
- System.err.print("============ token of kind " + token.getKind() + " is found");
- if (LocalizerTokenIdentifier.KIND.equals(token.getKind())
- //&& service.equals(token.getService())
- ) {
+ LOG.debug("Token of kind " + token.getKind() + " is found");
+ if (LocalizerTokenIdentifier.KIND.equals(token.getKind())) {
return (Token<LocalizerTokenIdentifier>) token;
}
}
- System.err.print("returning null ========== ");
+ LOG.debug("Returning null.");
return null;
}
-
}