You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [6/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Tue Aug 19 23:49:39 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -66,4 +67,10 @@ public interface Context {
LocalDirsHandlerService getLocalDirsHandler();
ApplicationACLsManager getApplicationACLsManager();
+
+ NMStateStoreService getNMStateStore();
+
+ boolean getDecommissioned();
+
+ void setDecommissioned(boolean isDecommissioned);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import com.google.common.base.Optional;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
@@ -212,10 +213,21 @@ public class DefaultContainerExecutor ex
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch with container ID: "
+ containerId + " and exit code: " + exitCode , e);
- logOutput(shExec.getOutput());
- String diagnostics = "Exception from container-launch: "
- + e + "\n"
- + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
+
+ StringBuilder builder = new StringBuilder();
+ builder.append("Exception from container-launch.\n");
+ builder.append("Container id: " + containerId + "\n");
+ builder.append("Exit code: " + exitCode + "\n");
+ if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
+ builder.append("Exception message: " + e.getMessage() + "\n");
+ }
+ builder.append("Stack trace: "
+ + StringUtils.stringifyException(e) + "\n");
+ if (!shExec.getOutput().isEmpty()) {
+ builder.append("Shell output: " + shExec.getOutput() + "\n");
+ }
+ String diagnostics = builder.toString();
+ logOutput(diagnostics);
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {
@@ -261,25 +273,57 @@ public class DefaultContainerExecutor ex
private final class UnixLocalWrapperScriptBuilder
extends LocalWrapperScriptBuilder {
+ private final Path sessionScriptPath;
public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
super(containerWorkDir);
+ this.sessionScriptPath = new Path(containerWorkDir,
+ Shell.appendScriptExtension("default_container_executor_session"));
+ }
+
+ @Override
+ public void writeLocalWrapperScript(Path launchDst, Path pidFile)
+ throws IOException {
+ writeSessionScript(launchDst, pidFile);
+ super.writeLocalWrapperScript(launchDst, pidFile);
}
@Override
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
PrintStream pout) {
-
- // We need to do a move as writing to a file is not atomic
- // Process reading a file being written to may get garbled data
- // hence write pid to tmp file first followed by a mv
+ String exitCodeFile = ContainerLaunch.getExitCodeFile(
+ pidFile.toString());
+ String tmpFile = exitCodeFile + ".tmp";
pout.println("#!/bin/bash");
- pout.println();
- pout.println("echo $$ > " + pidFile.toString() + ".tmp");
- pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
- String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
- pout.println(exec + " /bin/bash \"" +
- launchDst.toUri().getPath().toString() + "\"");
+ pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\"");
+ pout.println("rc=$?");
+ pout.println("echo $rc > \"" + tmpFile + "\"");
+ pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
+ pout.println("exit $rc");
+ }
+
+ private void writeSessionScript(Path launchDst, Path pidFile)
+ throws IOException {
+ DataOutputStream out = null;
+ PrintStream pout = null;
+ try {
+ out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
+ pout = new PrintStream(out);
+ // We need to do a move as writing to a file is not atomic
+ // Process reading a file being written to may get garbled data
+ // hence write pid to tmp file first followed by a mv
+ pout.println("#!/bin/bash");
+ pout.println();
+ pout.println("echo $$ > " + pidFile.toString() + ".tmp");
+ pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
+ String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
+ pout.println(exec + " /bin/bash \"" +
+ launchDst.toUri().getPath().toString() + "\"");
+ } finally {
+ IOUtils.cleanup(LOG, pout, out);
+ }
+ lfs.setPermission(sessionScriptPath,
+ ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
}
}
@@ -298,6 +342,7 @@ public class DefaultContainerExecutor ex
@Override
public void writeLocalWrapperScript(Path launchDst, Path pidFile,
PrintStream pout) {
+ // TODO: exit code script for Windows
// On Windows, the pid is the container ID, so that it can also serve as
// the name of the job object created by winutils for task management.
@@ -330,6 +375,12 @@ public class DefaultContainerExecutor ex
return true;
}
+ @Override
+ public boolean isContainerProcessAlive(String user, String pid)
+ throws IOException {
+ return containerIsAlive(pid);
+ }
+
/**
* Returns true if the process with the specified pid is alive.
*
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Tue Aug 19 23:49:39 2014
@@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.server.no
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -40,6 +43,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -50,6 +57,8 @@ public class DeletionService extends Abs
private final ContainerExecutor exec;
private ScheduledThreadPoolExecutor sched;
private static final FileContext lfs = getLfs();
+ private final NMStateStoreService stateStore;
+ private AtomicInteger nextTaskId = new AtomicInteger(0);
static final FileContext getLfs() {
try {
@@ -60,14 +69,18 @@ public class DeletionService extends Abs
}
public DeletionService(ContainerExecutor exec) {
+ this(exec, new NMNullStateStoreService());
+ }
+
+ public DeletionService(ContainerExecutor exec,
+ NMStateStoreService stateStore) {
super(DeletionService.class.getName());
this.exec = exec;
this.debugDelay = 0;
+ this.stateStore = stateStore;
}
/**
- *
- /**
* Delete the path(s) as this user.
* @param user The user to delete as, or the JVM user if null
* @param subDir the sub directory name
@@ -76,19 +89,20 @@ public class DeletionService extends Abs
public void delete(String user, Path subDir, Path... baseDirs) {
// TODO if parent owned by NM, rename within parent inline
if (debugDelay != -1) {
- if (baseDirs == null || baseDirs.length == 0) {
- sched.schedule(new FileDeletionTask(this, user, subDir, null),
- debugDelay, TimeUnit.SECONDS);
- } else {
- sched.schedule(
- new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
- debugDelay, TimeUnit.SECONDS);
+ List<Path> baseDirList = null;
+ if (baseDirs != null && baseDirs.length != 0) {
+ baseDirList = Arrays.asList(baseDirs);
}
+ FileDeletionTask task =
+ new FileDeletionTask(this, user, subDir, baseDirList);
+ recordDeletionTaskInStateStore(task);
+ sched.schedule(task, debugDelay, TimeUnit.SECONDS);
}
}
public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
if (debugDelay != -1) {
+ recordDeletionTaskInStateStore(fileDeletionTask);
sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
}
}
@@ -109,6 +123,9 @@ public class DeletionService extends Abs
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
sched.setKeepAliveTime(60L, SECONDS);
+ if (stateStore.canRecover()) {
+ recover(stateStore.loadDeletionServiceState());
+ }
super.serviceInit(conf);
}
@@ -139,6 +156,8 @@ public class DeletionService extends Abs
}
public static class FileDeletionTask implements Runnable {
+ public static final int INVALID_TASK_ID = -1;
+ private int taskId;
private final String user;
private final Path subDir;
private final List<Path> baseDirs;
@@ -152,6 +171,12 @@ public class DeletionService extends Abs
private FileDeletionTask(DeletionService delService, String user,
Path subDir, List<Path> baseDirs) {
+ this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
+ }
+
+ private FileDeletionTask(int taskId, DeletionService delService,
+ String user, Path subDir, List<Path> baseDirs) {
+ this.taskId = taskId;
this.delService = delService;
this.user = user;
this.subDir = subDir;
@@ -198,6 +223,12 @@ public class DeletionService extends Abs
return this.success;
}
+ public synchronized FileDeletionTask[] getSuccessorTasks() {
+ FileDeletionTask[] successors =
+ new FileDeletionTask[successorTaskSet.size()];
+ return successorTaskSet.toArray(successors);
+ }
+
@Override
public void run() {
if (LOG.isDebugEnabled()) {
@@ -286,6 +317,12 @@ public class DeletionService extends Abs
* dependent tasks of it has failed marking its success = false.
*/
private synchronized void fileDeletionTaskFinished() {
+ try {
+ delService.stateStore.removeDeletionTask(taskId);
+ } catch (IOException e) {
+ LOG.error("Unable to remove deletion task " + taskId
+ + " from state store", e);
+ }
Iterator<FileDeletionTask> successorTaskI =
this.successorTaskSet.iterator();
while (successorTaskI.hasNext()) {
@@ -318,4 +355,129 @@ public class DeletionService extends Abs
Path[] baseDirs) {
return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
}
+
+ private void recover(RecoveredDeletionServiceState state)
+ throws IOException {
+ List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
+ Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
+ new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
+ Set<Integer> successorTasks = new HashSet<Integer>();
+ for (DeletionServiceDeleteTaskProto proto : taskProtos) {
+ DeletionTaskRecoveryInfo info = parseTaskProto(proto);
+ idToInfoMap.put(info.task.taskId, info);
+ nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
+ successorTasks.addAll(info.successorTaskIds);
+ }
+
+ // restore the task dependencies and schedule the deletion tasks that
+ // have no predecessors
+ final long now = System.currentTimeMillis();
+ for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
+ for (Integer successorId : info.successorTaskIds){
+ DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
+ if (successor != null) {
+ info.task.addFileDeletionTaskDependency(successor.task);
+ } else {
+ LOG.error("Unable to locate dependency task for deletion task "
+ + info.task.taskId + " at " + info.task.getSubDir());
+ }
+ }
+ if (!successorTasks.contains(info.task.taskId)) {
+ long msecTilDeletion = info.deletionTimestamp - now;
+ sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private DeletionTaskRecoveryInfo parseTaskProto(
+ DeletionServiceDeleteTaskProto proto) throws IOException {
+ int taskId = proto.getId();
+ String user = proto.hasUser() ? proto.getUser() : null;
+ Path subdir = null;
+ List<Path> basePaths = null;
+ if (proto.hasSubdir()) {
+ subdir = new Path(proto.getSubdir());
+ }
+ List<String> basedirs = proto.getBasedirsList();
+ if (basedirs != null && basedirs.size() > 0) {
+ basePaths = new ArrayList<Path>(basedirs.size());
+ for (String basedir : basedirs) {
+ basePaths.add(new Path(basedir));
+ }
+ }
+
+ FileDeletionTask task = new FileDeletionTask(taskId, this, user,
+ subdir, basePaths);
+ return new DeletionTaskRecoveryInfo(task,
+ proto.getSuccessorIdsList(),
+ proto.getDeletionTime());
+ }
+
+ private int generateTaskId() {
+ // get the next ID but avoid an invalid ID
+ int taskId = nextTaskId.incrementAndGet();
+ while (taskId == FileDeletionTask.INVALID_TASK_ID) {
+ taskId = nextTaskId.incrementAndGet();
+ }
+ return taskId;
+ }
+
+ private void recordDeletionTaskInStateStore(FileDeletionTask task) {
+ if (!stateStore.canRecover()) {
+ // optimize the case where we aren't really recording
+ return;
+ }
+ if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
+ return; // task already recorded
+ }
+
+ task.taskId = generateTaskId();
+
+ FileDeletionTask[] successors = task.getSuccessorTasks();
+
+ // store successors first to ensure task IDs have been generated for them
+ for (FileDeletionTask successor : successors) {
+ recordDeletionTaskInStateStore(successor);
+ }
+
+ DeletionServiceDeleteTaskProto.Builder builder =
+ DeletionServiceDeleteTaskProto.newBuilder();
+ builder.setId(task.taskId);
+ if (task.getUser() != null) {
+ builder.setUser(task.getUser());
+ }
+ if (task.getSubDir() != null) {
+ builder.setSubdir(task.getSubDir().toString());
+ }
+ builder.setDeletionTime(System.currentTimeMillis() +
+ TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
+ if (task.getBaseDirs() != null) {
+ for (Path dir : task.getBaseDirs()) {
+ builder.addBasedirs(dir.toString());
+ }
+ }
+ for (FileDeletionTask successor : successors) {
+ builder.addSuccessorIds(successor.taskId);
+ }
+
+ try {
+ stateStore.storeDeletionTask(task.taskId, builder.build());
+ } catch (IOException e) {
+ LOG.error("Unable to store deletion task " + task.taskId + " for "
+ + task.getSubDir(), e);
+ }
+ }
+
+ private static class DeletionTaskRecoveryInfo {
+ FileDeletionTask task;
+ List<Integer> successorTaskIds;
+ long deletionTimestamp;
+
+ public DeletionTaskRecoveryInfo(FileDeletionTask task,
+ List<Integer> successorTaskIds, long deletionTimestamp) {
+ this.task = task;
+ this.successorTaskIds = successorTaskIds;
+ this.deletionTimestamp = deletionTimestamp;
+ }
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import com.google.common.base.Optional;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -296,9 +297,21 @@ public class LinuxContainerExecutor exte
&& exitCode != ExitCode.TERMINATED.getExitCode()) {
LOG.warn("Exception from container-launch with container ID: "
+ containerId + " and exit code: " + exitCode , e);
- logOutput(shExec.getOutput());
- String diagnostics = "Exception from container-launch: \n"
- + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
+
+ StringBuilder builder = new StringBuilder();
+ builder.append("Exception from container-launch.\n");
+ builder.append("Container id: " + containerId + "\n");
+ builder.append("Exit code: " + exitCode + "\n");
+ if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
+ builder.append("Exception message: " + e.getMessage() + "\n");
+ }
+ builder.append("Stack trace: "
+ + StringUtils.stringifyException(e) + "\n");
+ if (!shExec.getOutput().isEmpty()) {
+ builder.append("Shell output: " + shExec.getOutput() + "\n");
+ }
+ String diagnostics = builder.toString();
+ logOutput(diagnostics);
container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
diagnostics));
} else {
@@ -390,6 +403,13 @@ public class LinuxContainerExecutor exte
}
}
+ @Override
+ public boolean isContainerProcessAlive(String user, String pid)
+ throws IOException {
+ // Send a test signal to the process as the user to see if it's alive
+ return signalContainer(user, pid, Signal.NULL);
+ }
+
public void mountCgroups(List<String> cgroupKVs, String hierarchy)
throws IOException {
List<String> command = new ArrayList<String>(
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Aug 19 23:49:39 2014
@@ -53,6 +53,9 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@@ -78,9 +81,11 @@ public class NodeManager extends Composi
private ContainerManagerImpl containerManager;
private NodeStatusUpdater nodeStatusUpdater;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
+ private NMStateStoreService nmStore = null;
private AtomicBoolean isStopping = new AtomicBoolean(false);
-
+ private boolean rmWorkPreservingRestartEnabled;
+
public NodeManager() {
super(NodeManager.class.getName());
}
@@ -110,14 +115,15 @@ public class NodeManager extends Composi
}
protected DeletionService createDeletionService(ContainerExecutor exec) {
- return new DeletionService(exec);
+ return new DeletionService(exec, nmStore);
}
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
- NMTokenSecretManagerInNM nmTokenSecretManager) {
+ NMTokenSecretManagerInNM nmTokenSecretManager,
+ NMStateStoreService stateStore) {
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
- dirsHandler, aclsManager);
+ dirsHandler, aclsManager, stateStore);
}
protected void doSecureLogin() throws IOException {
@@ -125,11 +131,8 @@ public class NodeManager extends Composi
YarnConfiguration.NM_PRINCIPAL);
}
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
-
- conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
-
+ private void initAndStartRecoveryStore(Configuration conf)
+ throws IOException {
boolean recoveryEnabled = conf.getBoolean(
YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
@@ -142,13 +145,57 @@ public class NodeManager extends Composi
}
Path recoveryRoot = new Path(recoveryDirName);
recoveryFs.mkdirs(recoveryRoot, new FsPermission((short)0700));
+ nmStore = new NMLeveldbStateStoreService();
+ } else {
+ nmStore = new NMNullStateStoreService();
+ }
+ nmStore.init(conf);
+ nmStore.start();
+ }
+
+ private void stopRecoveryStore() throws IOException {
+ nmStore.stop();
+ if (context.getDecommissioned() && nmStore.canRecover()) {
+ LOG.info("Removing state store due to decommission");
+ Configuration conf = getConfig();
+ Path recoveryRoot = new Path(
+ conf.get(YarnConfiguration.NM_RECOVERY_DIR));
+ LOG.info("Removing state store at " + recoveryRoot
+ + " due to decommission");
+ FileSystem recoveryFs = FileSystem.getLocal(conf);
+ if (!recoveryFs.delete(recoveryRoot, true)) {
+ LOG.warn("Unable to delete " + recoveryRoot);
+ }
+ }
+ }
+
+ private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager,
+ NMContainerTokenSecretManager containerTokenSecretManager)
+ throws IOException {
+ if (nmStore.canRecover()) {
+ nmTokenSecretManager.recover();
+ containerTokenSecretManager.recover();
}
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+ rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration
+ .RM_WORK_PRESERVING_RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
+
+ initAndStartRecoveryStore(conf);
NMContainerTokenSecretManager containerTokenSecretManager =
- new NMContainerTokenSecretManager(conf);
+ new NMContainerTokenSecretManager(conf, nmStore);
NMTokenSecretManagerInNM nmTokenSecretManager =
- new NMTokenSecretManagerInNM();
+ new NMTokenSecretManagerInNM(nmStore);
+
+ recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
this.aclsManager = new ApplicationACLsManager(conf);
@@ -171,7 +218,7 @@ public class NodeManager extends Composi
dirsHandler = nodeHealthChecker.getDiskHandler();
this.context = createNMContext(containerTokenSecretManager,
- nmTokenSecretManager);
+ nmTokenSecretManager, nmStore);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
@@ -220,6 +267,7 @@ public class NodeManager extends Composi
return;
}
super.serviceStop();
+ stopRecoveryStore();
DefaultMetricsSystem.shutdown();
}
@@ -244,8 +292,12 @@ public class NodeManager extends Composi
try {
LOG.info("Notifying ContainerManager to block new container-requests");
containerManager.setBlockNewContainerRequests(true);
- LOG.info("Cleaning up running containers on resync");
- containerManager.cleanupContainersOnNMResync();
+ if (!rmWorkPreservingRestartEnabled) {
+ LOG.info("Cleaning up running containers on resync");
+ containerManager.cleanupContainersOnNMResync();
+ } else {
+ LOG.info("Preserving containers on resync");
+ }
((NodeStatusUpdaterImpl) nodeStatusUpdater)
.rebootNodeStatusUpdaterAndRegisterWithRM();
} catch (YarnRuntimeException e) {
@@ -272,10 +324,13 @@ public class NodeManager extends Composi
private WebServer webServer;
private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
-
+ private final NMStateStoreService stateStore;
+ private boolean isDecommissioned = false;
+
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
- LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) {
+ LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
+ NMStateStoreService stateStore) {
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
@@ -283,6 +338,7 @@ public class NodeManager extends Composi
this.nodeHealthStatus.setIsNodeHealthy(true);
this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
+ this.stateStore = stateStore;
}
/**
@@ -349,6 +405,21 @@ public class NodeManager extends Composi
public ApplicationACLsManager getApplicationACLsManager() {
return aclsManager;
}
+
+ @Override
+ public NMStateStoreService getNMStateStore() {
+ return stateStore;
+ }
+
+ @Override
+ public boolean getDecommissioned() {
+ return isDecommissioned;
+ }
+
+ @Override
+ public void setDecommissioned(boolean isDecommissioned) {
+ this.isDecommissioned = isDecommissioned;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Tue Aug 19 23:49:39 2014
@@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.record
public interface NodeStatusUpdater extends Service {
+ /**
+ * Schedule a heartbeat to the ResourceManager outside of the normal,
+ * periodic heartbeating process. This is typically called when the state
+ * of containers on the node has changed to notify the RM sooner.
+ */
void sendOutofBandHeartBeat();
+ /**
+ * Get the ResourceManager identifier received during registration
+ * @return the ResourceManager ID
+ */
long getRMIdentifier();
+ /**
+ * Query if a container has recently completed
+ * @param containerId the container ID
+ * @return true if the container has recently completed
+ */
public boolean isContainerRecentlyStopped(ContainerId containerId);
+ /**
+ * Add a container to the list of containers that have recently completed
+ * @param containerId the ID of the completed container
+ */
+ public void addCompletedContainer(ContainerId containerId);
+
+ /**
+ * Clear the list of recently completed containers
+ */
public void clearFinishedContainersFromCache();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Aug 19 23:49:39 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -246,13 +247,12 @@ public class NodeStatusUpdaterImpl exten
@VisibleForTesting
protected void registerWithRM()
throws YarnException, IOException {
- List<ContainerStatus> containerStatuses = getContainerStatuses();
+ List<NMContainerStatus> containerReports = getNMContainerStatuses();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
- nodeManagerVersionId, containerStatuses);
- if (containerStatuses != null) {
- LOG.info("Registering with RM using finished containers :"
- + containerStatuses);
+ nodeManagerVersionId, containerReports, getRunningApplications());
+ if (containerReports != null) {
+ LOG.info("Registering with RM using containers :" + containerReports);
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
@@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl exten
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
- updateStoppedContainersInCache(container.getContainerId());
- addCompletedContainer(container);
+ addCompletedContainer(container.getContainerId());
}
}
if (LOG.isDebugEnabled()) {
@@ -374,10 +373,42 @@ public class NodeStatusUpdaterImpl exten
}
return containerStatuses;
}
+
+ private List<ApplicationId> getRunningApplications() {
+ List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
+ runningApplications.addAll(this.context.getApplications().keySet());
+ return runningApplications;
+ }
+
+ // These NMContainerStatus are sent on NM registration and used by YARN only.
+ private List<NMContainerStatus> getNMContainerStatuses() {
+ List<NMContainerStatus> containerStatuses =
+ new ArrayList<NMContainerStatus>();
+ for (Container container : this.context.getContainers().values()) {
+ NMContainerStatus status =
+ container.getNMContainerStatus();
+ containerStatuses.add(status);
+ if (status.getContainerState().equals(ContainerState.COMPLETE)) {
+ // Adding to finished containers cache. Cache will keep it around at
+ // least for #durationToTrackStoppedContainers duration. In the
+ // subsequent call to stop container it will get removed from cache.
+ addCompletedContainer(container.getContainerId());
+ }
+ }
+ LOG.info("Sending out " + containerStatuses.size()
+ + " NM container statuses: " + containerStatuses);
+ return containerStatuses;
+ }
- private void addCompletedContainer(Container container) {
+ @Override
+ public void addCompletedContainer(ContainerId containerId) {
synchronized (previousCompletedContainers) {
- previousCompletedContainers.add(container.getContainerId());
+ previousCompletedContainers.add(containerId);
+ }
+ synchronized (recentlyStoppedContainers) {
+ removeVeryOldStoppedContainersFromCache();
+ recentlyStoppedContainers.put(containerId,
+ System.currentTimeMillis() + durationToTrackStoppedContainers);
}
}
@@ -424,16 +455,6 @@ public class NodeStatusUpdaterImpl exten
}
}
- @Private
- @VisibleForTesting
- public void updateStoppedContainersInCache(ContainerId containerId) {
- synchronized (recentlyStoppedContainers) {
- removeVeryOldStoppedContainersFromCache();
- recentlyStoppedContainers.put(containerId,
- System.currentTimeMillis() + durationToTrackStoppedContainers);
- }
- }
-
@Override
public void clearFinishedContainersFromCache() {
synchronized (recentlyStoppedContainers) {
@@ -449,8 +470,14 @@ public class NodeStatusUpdaterImpl exten
Iterator<ContainerId> i =
recentlyStoppedContainers.keySet().iterator();
while (i.hasNext()) {
- if (recentlyStoppedContainers.get(i.next()) < currentTime) {
+ ContainerId cid = i.next();
+ if (recentlyStoppedContainers.get(cid) < currentTime) {
i.remove();
+ try {
+ context.getNMStateStore().removeContainer(cid);
+ } catch (IOException e) {
+ LOG.error("Unable to remove container " + cid + " in store", e);
+ }
} else {
break;
}
@@ -493,6 +520,7 @@ public class NodeStatusUpdaterImpl exten
+ " hence shutting down.");
LOG.warn("Message from ResourceManager: "
+ response.getDiagnosticsMessage());
+ context.setDecommissioned(true);
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java Tue Aug 19 23:49:39 2014
@@ -27,4 +27,6 @@ public interface ResourceView {
long getPmemAllocatedForContainers();
boolean isPmemCheckEnabled();
+
+ long getVCoresAllocatedForContainers();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Aug 19 23:49:39 2014
@@ -20,8 +20,10 @@ package org.apache.hadoop.yarn.server.no
import static org.apache.hadoop.service.Service.STATE.STARTED;
+import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -41,6 +43,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
@@ -62,13 +65,17 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -79,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -116,11 +125,16 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol,
@@ -218,6 +232,108 @@ public class ContainerManagerImpl extend
SHUTDOWN_CLEANUP_SLOP_MS;
super.serviceInit(conf);
+ recover();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void recover() throws IOException, URISyntaxException {
+ NMStateStoreService stateStore = context.getNMStateStore();
+ if (stateStore.canRecover()) {
+ rsrcLocalizationSrvc.recoverLocalizedResources(
+ stateStore.loadLocalizationState());
+
+ RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
+ for (ContainerManagerApplicationProto proto :
+ appsState.getApplications()) {
+ recoverApplication(proto);
+ }
+
+ for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
+ recoverContainer(rcs);
+ }
+
+ String diagnostic = "Application marked finished during recovery";
+ for (ApplicationId appId : appsState.getFinishedApplications()) {
+ dispatcher.getEventHandler().handle(
+ new ApplicationFinishEvent(appId, diagnostic));
+ }
+ }
+ }
+
+ private void recoverApplication(ContainerManagerApplicationProto p)
+ throws IOException {
+ ApplicationId appId = new ApplicationIdPBImpl(p.getId());
+ Credentials creds = new Credentials();
+ creds.readTokenStorageStream(
+ new DataInputStream(p.getCredentials().newInput()));
+
+ List<ApplicationACLMapProto> aclProtoList = p.getAclsList();
+ Map<ApplicationAccessType, String> acls =
+ new HashMap<ApplicationAccessType, String>(aclProtoList.size());
+ for (ApplicationACLMapProto aclProto : aclProtoList) {
+ acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()),
+ aclProto.getAcl());
+ }
+
+ LOG.info("Recovering application " + appId);
+ ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
+ creds, context);
+ context.getApplications().put(appId, app);
+ app.handle(new ApplicationInitEvent(appId, acls));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void recoverContainer(RecoveredContainerState rcs)
+ throws IOException {
+ StartContainerRequest req = rcs.getStartRequest();
+ ContainerLaunchContext launchContext = req.getContainerLaunchContext();
+ ContainerTokenIdentifier token =
+ BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+ ContainerId containerId = token.getContainerID();
+ ApplicationId appId =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
+ + " with exit code " + rcs.getExitCode());
+
+ if (context.getApplications().containsKey(appId)) {
+ Credentials credentials = parseCredentials(launchContext);
+ Container container = new ContainerImpl(getConfig(), dispatcher,
+ context.getNMStateStore(), req.getContainerLaunchContext(),
+ credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
+ rcs.getDiagnostics(), rcs.getKilled());
+ context.getContainers().put(containerId, container);
+ dispatcher.getEventHandler().handle(
+ new ApplicationContainerInitEvent(container));
+ } else {
+ if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
+ LOG.warn(containerId + " has no corresponding application!");
+ }
+ LOG.info("Adding " + containerId + " to recently stopped containers");
+ nodeStatusUpdater.addCompletedContainer(containerId);
+ }
+ }
+
+ private void waitForRecoveredContainers() throws InterruptedException {
+ final int sleepMsec = 100;
+ int waitIterations = 100;
+ List<ContainerId> newContainers = new ArrayList<ContainerId>();
+ while (--waitIterations >= 0) {
+ newContainers.clear();
+ for (Container container : context.getContainers().values()) {
+ if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) {
+ newContainers.add(container.getContainerId());
+ }
+ }
+ if (newContainers.isEmpty()) {
+ break;
+ }
+ LOG.info("Waiting for containers: " + newContainers);
+ Thread.sleep(sleepMsec);
+ }
+ if (waitIterations < 0) {
+ LOG.warn("Timeout waiting for recovered containers");
+ }
}
protected LogHandler createLogHandler(Configuration conf, Context context,
@@ -239,7 +355,7 @@ public class ContainerManagerImpl extend
protected ResourceLocalizationService createResourceLocalizationService(
ContainerExecutor exec, DeletionService deletionContext) {
return new ResourceLocalizationService(this.dispatcher, exec,
- deletionContext, dirsHandler);
+ deletionContext, dirsHandler, context.getNMStateStore());
}
protected ContainersLauncher createContainersLauncher(Context context,
@@ -253,6 +369,23 @@ public class ContainerManagerImpl extend
// Enqueue user dirs in deletion context
Configuration conf = getConfig();
+ final InetSocketAddress initialAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_PORT);
+ boolean usingEphemeralPort = (initialAddress.getPort() == 0);
+ if (context.getNMStateStore().canRecover() && usingEphemeralPort) {
+ throw new IllegalArgumentException("Cannot support recovery with an "
+ + "ephemeral server port. Check the setting of "
+ + YarnConfiguration.NM_ADDRESS);
+ }
+ // If recovering then delay opening the RPC service until the recovery
+ // of resources and containers have completed, otherwise requests from
+ // clients during recovery can interfere with the recovery process.
+ final boolean delayedRpcServerStart =
+ context.getNMStateStore().canRecover();
+
Configuration serverConf = new Configuration(conf);
// always enforce it to be token-based.
@@ -262,11 +395,6 @@ public class ContainerManagerImpl extend
YarnRPC rpc = YarnRPC.create(conf);
- InetSocketAddress initialAddress = conf.getSocketAddr(
- YarnConfiguration.NM_ADDRESS,
- YarnConfiguration.DEFAULT_NM_ADDRESS,
- YarnConfiguration.DEFAULT_NM_PORT);
-
server =
rpc.getServer(ContainerManagementProtocol.class, this, initialAddress,
serverConf, this.context.getNMTokenSecretManager(),
@@ -283,16 +411,61 @@ public class ContainerManagerImpl extend
LOG.info("Blocking new container-requests as container manager rpc" +
" server is still starting.");
this.setBlockNewContainerRequests(true);
- server.start();
- InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
- NodeId nodeId = NodeId.newInstance(
- connectAddress.getAddress().getCanonicalHostName(),
- connectAddress.getPort());
+
+ String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
+ String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
+ String hostOverride = null;
+ if (bindHost != null && !bindHost.isEmpty()
+ && nmAddress != null && !nmAddress.isEmpty()) {
+ //a bind-host case with an address, to support overriding the first
+ //hostname found when querying for our hostname with the specified
+ //address, combine the specified address with the actual port listened
+ //on by the server
+ hostOverride = nmAddress.split(":")[0];
+ }
+
+ // setup node ID
+ InetSocketAddress connectAddress;
+ if (delayedRpcServerStart) {
+ connectAddress = NetUtils.getConnectAddress(initialAddress);
+ } else {
+ server.start();
+ connectAddress = NetUtils.getConnectAddress(server);
+ }
+ NodeId nodeId = buildNodeId(connectAddress, hostOverride);
((NodeManager.NMContext)context).setNodeId(nodeId);
this.context.getNMTokenSecretManager().setNodeId(nodeId);
this.context.getContainerTokenSecretManager().setNodeId(nodeId);
- LOG.info("ContainerManager started at " + connectAddress);
+
+ // start remaining services
super.serviceStart();
+
+ if (delayedRpcServerStart) {
+ waitForRecoveredContainers();
+ server.start();
+
+ // check that the node ID is as previously advertised
+ connectAddress = NetUtils.getConnectAddress(server);
+ NodeId serverNode = buildNodeId(connectAddress, hostOverride);
+ if (!serverNode.equals(nodeId)) {
+ throw new IOException("Node mismatch after server started, expected '"
+ + nodeId + "' but found '" + serverNode + "'");
+ }
+ }
+
+ LOG.info("ContainerManager started at " + connectAddress);
+ LOG.info("ContainerManager bound to " + initialAddress);
+ }
+
+ private NodeId buildNodeId(InetSocketAddress connectAddress,
+ String hostOverride) {
+ if (hostOverride != null) {
+ connectAddress = NetUtils.getConnectAddress(
+ new InetSocketAddress(hostOverride, connectAddress.getPort()));
+ }
+ return NodeId.newInstance(
+ connectAddress.getAddress().getCanonicalHostName(),
+ connectAddress.getPort());
}
void refreshServiceAcls(Configuration configuration,
@@ -329,6 +502,12 @@ public class ContainerManagerImpl extend
}
LOG.info("Applications still running : " + applications.keySet());
+ if (this.context.getNMStateStore().canRecover()
+ && !this.context.getDecommissioned()) {
+ // do not cleanup apps as they can be recovered on restart
+ return;
+ }
+
List<ApplicationId> appIds =
new ArrayList<ApplicationId>(applications.keySet());
this.handle(
@@ -463,8 +642,8 @@ public class ContainerManagerImpl extend
boolean unauthorized = false;
StringBuilder messageBuilder =
new StringBuilder("Unauthorized request to start container. ");
- if (!nmTokenIdentifier.getApplicationAttemptId().equals(
- containerId.getApplicationAttemptId())) {
+ if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().equals(
+ containerId.getApplicationAttemptId().getApplicationId())) {
unauthorized = true;
messageBuilder.append("\nNMToken for application attempt : ")
.append(nmTokenIdentifier.getApplicationAttemptId())
@@ -485,6 +664,8 @@ public class ContainerManagerImpl extend
messageBuilder.append("\nThis token is expired. current time is ")
.append(System.currentTimeMillis()).append(" found ")
.append(containerTokenIdentifier.getExpiryTimeStamp());
+ messageBuilder.append("\nNote: System times on machines may be out of sync.")
+ .append(" Check system time and time zones.");
}
if (unauthorized) {
String msg = messageBuilder.toString();
@@ -536,6 +717,41 @@ public class ContainerManagerImpl extend
succeededContainers, failedContainers);
}
+ private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
+ String user, Credentials credentials,
+ Map<ApplicationAccessType, String> appAcls) {
+
+ ContainerManagerApplicationProto.Builder builder =
+ ContainerManagerApplicationProto.newBuilder();
+ builder.setId(((ApplicationIdPBImpl) appId).getProto());
+ builder.setUser(user);
+
+ builder.clearCredentials();
+ if (credentials != null) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ try {
+ credentials.writeTokenStorageToStream(dob);
+ builder.setCredentials(ByteString.copyFrom(dob.getData()));
+ } catch (IOException e) {
+ // should not occur
+ LOG.error("Cannot serialize credentials", e);
+ }
+ }
+
+ builder.clearAcls();
+ if (appAcls != null) {
+ for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) {
+ ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder()
+ .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
+ .setAcl(acl.getValue())
+ .build();
+ builder.addAcls(p);
+ }
+ }
+
+ return builder.build();
+ }
+
@SuppressWarnings("unchecked")
private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
@@ -588,7 +804,8 @@ public class ContainerManagerImpl extend
Credentials credentials = parseCredentials(launchContext);
Container container =
- new ContainerImpl(getConfig(), this.dispatcher, launchContext,
+ new ContainerImpl(getConfig(), this.dispatcher,
+ context.getNMStateStore(), launchContext,
credentials, metrics, containerTokenIdentifier);
ApplicationId applicationID =
containerId.getApplicationAttemptId().getApplicationId();
@@ -609,12 +826,15 @@ public class ContainerManagerImpl extend
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
-
+ Map<ApplicationAccessType, String> appAcls =
+ container.getLaunchContext().getApplicationACLs();
+ context.getNMStateStore().storeApplication(applicationID,
+ buildAppProto(applicationID, user, credentials, appAcls));
dispatcher.getEventHandler().handle(
- new ApplicationInitEvent(applicationID, container.getLaunchContext()
- .getApplicationACLs()));
+ new ApplicationInitEvent(applicationID, appAcls));
}
+ this.context.getNMStateStore().storeContainer(containerId, request);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
@@ -662,7 +882,7 @@ public class ContainerManagerImpl extend
}
private Credentials parseCredentials(ContainerLaunchContext launchContext)
- throws YarnException {
+ throws IOException {
Credentials credentials = new Credentials();
// //////////// Parse credentials
ByteBuffer tokens = launchContext.getTokens();
@@ -671,15 +891,11 @@ public class ContainerManagerImpl extend
DataInputByteBuffer buf = new DataInputByteBuffer();
tokens.rewind();
buf.reset(tokens);
- try {
- credentials.readTokenStorageStream(buf);
- if (LOG.isDebugEnabled()) {
- for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
- LOG.debug(tk.getService() + " = " + tk.toString());
- }
+ credentials.readTokenStorageStream(buf);
+ if (LOG.isDebugEnabled()) {
+ for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
+ LOG.debug(tk.getService() + " = " + tk.toString());
}
- } catch (IOException e) {
- throw RPCUtil.getRemoteException(e);
}
}
// //////////// End of parsing credentials
@@ -712,7 +928,7 @@ public class ContainerManagerImpl extend
@SuppressWarnings("unchecked")
private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
- ContainerId containerID) throws YarnException {
+ ContainerId containerID) throws YarnException, IOException {
String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID);
LOG.info("Stopping container with container Id: " + containerIDStr);
@@ -725,9 +941,11 @@ public class ContainerManagerImpl extend
+ " is not handled by this NodeManager");
}
} else {
+ context.getNMStateStore().storeContainerKilled(containerID);
dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
- "Container killed by the ApplicationMaster."));
+ ContainerExitStatus.KILLED_BY_APPMASTER,
+ "Container killed by the ApplicationMaster."));
NMAuditLogger.logSuccess(container.getUser(),
AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
@@ -797,26 +1015,24 @@ public class ContainerManagerImpl extend
* belongs to the same application attempt (NMToken) which was used. (Note:-
* This will prevent user in knowing another application's containers).
*/
-
- if ((!identifier.getApplicationAttemptId().equals(
- containerId.getApplicationAttemptId()))
- || (container != null && !identifier.getApplicationAttemptId().equals(
- container.getContainerId().getApplicationAttemptId()))) {
+ ApplicationId nmTokenAppId =
+ identifier.getApplicationAttemptId().getApplicationId();
+ if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId()))
+ || (container != null && !nmTokenAppId.equals(container
+ .getContainerId().getApplicationAttemptId().getApplicationId()))) {
if (stopRequest) {
LOG.warn(identifier.getApplicationAttemptId()
+ " attempted to stop non-application container : "
- + container.getContainerId().toString());
+ + container.getContainerId());
NMAuditLogger.logFailure("UnknownUser", AuditConstants.STOP_CONTAINER,
"ContainerManagerImpl", "Trying to stop unknown container!",
- identifier.getApplicationAttemptId().getApplicationId(),
- container.getContainerId());
+ nmTokenAppId, container.getContainerId());
} else {
LOG.warn(identifier.getApplicationAttemptId()
+ " attempted to get status for non-application container : "
- + container.getContainerId().toString());
+ + container.getContainerId());
}
}
-
}
class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
@@ -864,6 +1080,11 @@ public class ContainerManagerImpl extend
} else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
diagnostic = "Application killed by ResourceManager";
}
+ try {
+ this.context.getNMStateStore().storeFinishedApplication(appID);
+ } catch (IOException e) {
+ LOG.error("Unable to update application state in store", e);
+ }
this.dispatcher.getEventHandler().handle(
new ApplicationFinishEvent(appID,
diagnostic));
@@ -876,6 +1097,7 @@ public class ContainerManagerImpl extend
.getContainersToCleanup()) {
this.dispatcher.getEventHandler().handle(
new ContainerKillEvent(container,
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
"Container Killed by ResourceManager"));
}
break;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
@@ -375,6 +377,7 @@ public class ApplicationImpl implements
for (ContainerId containerID : app.containers.keySet()) {
app.dispatcher.getEventHandler().handle(
new ContainerKillEvent(containerID,
+ ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
"Container killed on application-finish event: " + appEvent.getDiagnostic()));
}
return ApplicationState.FINISHING_CONTAINERS_WAIT;
@@ -426,6 +429,11 @@ public class ApplicationImpl implements
ApplicationId appId = event.getApplicationID();
app.context.getApplications().remove(appId);
app.aclsManager.removeApplication(appId);
+ try {
+ app.context.getNMStateStore().removeApplication(appId);
+ } catch (IOException e) {
+ LOG.error("Unable to remove application from state store", e);
+ }
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java Tue Aug 19 23:49:39 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
public interface Container extends EventHandler<ContainerEvent> {
@@ -39,7 +40,7 @@ public interface Container extends Event
ContainerTokenIdentifier getContainerTokenIdentifier();
String getUser();
-
+
ContainerState getContainerState();
ContainerLaunchContext getLaunchContext();
@@ -50,6 +51,8 @@ public interface Container extends Event
ContainerStatus cloneAndGetContainerStatus();
+ NMContainerStatus getNMContainerStatus();
+
String toString();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -47,7 +48,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -75,6 +78,7 @@ public class ContainerImpl implements Co
private final Lock readLock;
private final Lock writeLock;
private final Dispatcher dispatcher;
+ private final NMStateStoreService stateStore;
private final Credentials credentials;
private final NodeManagerMetrics metrics;
private final ContainerLaunchContext launchContext;
@@ -101,12 +105,19 @@ public class ContainerImpl implements Co
private final List<LocalResourceRequest> appRsrcs =
new ArrayList<LocalResourceRequest>();
+ // whether container has been recovered after a restart
+ private RecoveredContainerStatus recoveredStatus =
+ RecoveredContainerStatus.REQUESTED;
+ // whether container was marked as killed after recovery
+ private boolean recoveredAsKilled = false;
+
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
- ContainerLaunchContext launchContext, Credentials creds,
- NodeManagerMetrics metrics,
+ NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+ Credentials creds, NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier) {
this.daemonConf = conf;
this.dispatcher = dispatcher;
+ this.stateStore = stateStore;
this.launchContext = launchContext;
this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID();
@@ -122,6 +133,21 @@ public class ContainerImpl implements Co
stateMachine = stateMachineFactory.make(this);
}
+ // constructor for a recovered container
+ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+ NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+ Credentials creds, NodeManagerMetrics metrics,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ RecoveredContainerStatus recoveredStatus, int exitCode,
+ String diagnostics, boolean wasKilled) {
+ this(conf, dispatcher, stateStore, launchContext, creds, metrics,
+ containerTokenIdentifier);
+ this.recoveredStatus = recoveredStatus;
+ this.exitCode = exitCode;
+ this.recoveredAsKilled = wasKilled;
+ this.diagnostics.append(diagnostics);
+ }
+
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition();
@@ -135,8 +161,10 @@ public class ContainerImpl implements Co
new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
// From NEW State
.addTransition(ContainerState.NEW,
- EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
- ContainerState.LOCALIZATION_FAILED),
+ EnumSet.of(ContainerState.LOCALIZING,
+ ContainerState.LOCALIZED,
+ ContainerState.LOCALIZATION_FAILED,
+ ContainerState.DONE),
ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
.addTransition(ContainerState.NEW, ContainerState.NEW,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@@ -281,7 +309,9 @@ public class ContainerImpl implements Co
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
- ContainerEventType.KILL_CONTAINER)
+ EnumSet.of(ContainerEventType.KILL_CONTAINER,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -295,7 +325,9 @@ public class ContainerImpl implements Co
// we notify container of failed localization if localizer thread (for
// that container) fails for some reason
.addTransition(ContainerState.DONE, ContainerState.DONE,
- ContainerEventType.RESOURCE_FAILED)
+ EnumSet.of(ContainerEventType.RESOURCE_FAILED,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
// create the topology tables
.installTopology();
@@ -388,6 +420,19 @@ public class ContainerImpl implements Co
}
@Override
+ public NMContainerStatus getNMContainerStatus() {
+ this.readLock.lock();
+ try {
+ return NMContainerStatus.newInstance(this.containerId, getCurrentState(),
+ getResource(), diagnostics.toString(), exitCode,
+ containerTokenIdentifier.getPriority(),
+ containerTokenIdentifier.getCreationTime());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
public ContainerId getContainerId() {
return this.containerId;
}
@@ -407,7 +452,7 @@ public class ContainerImpl implements Co
}
}
- @SuppressWarnings({"fallthrough", "unchecked"})
+ @SuppressWarnings("fallthrough")
private void finished() {
ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId();
@@ -445,7 +490,11 @@ public class ContainerImpl implements Co
}
metrics.releaseContainer(this.resource);
+ sendFinishedEvents();
+ }
+ @SuppressWarnings("unchecked")
+ private void sendFinishedEvents() {
// Inform the application
@SuppressWarnings("rawtypes")
EventHandler eventHandler = dispatcher.getEventHandler();
@@ -458,6 +507,45 @@ public class ContainerImpl implements Co
}
@SuppressWarnings("unchecked") // dispatcher not typed
+ private void sendLaunchEvent() {
+ ContainersLauncherEventType launcherEvent =
+ ContainersLauncherEventType.LAUNCH_CONTAINER;
+ if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
+ // try to recover a container that was previously launched
+ launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+ }
+ dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(this, launcherEvent));
+ }
+
+ // Inform the ContainersMonitor to start monitoring the container's
+ // resource usage.
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ private void sendContainerMonitorStartEvent() {
+ long pmemBytes = getResource().getMemory() * 1024 * 1024L;
+ float pmemRatio = daemonConf.getFloat(
+ YarnConfiguration.NM_VMEM_PMEM_RATIO,
+ YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+ long vmemBytes = (long) (pmemRatio * pmemBytes);
+
+ dispatcher.getEventHandler().handle(
+ new ContainerStartMonitoringEvent(containerId,
+ vmemBytes, pmemBytes));
+ }
+
+ private void addDiagnostics(String... diags) {
+ for (String s : diags) {
+ this.diagnostics.append(s);
+ }
+ try {
+ stateStore.storeContainerDiagnostics(containerId, diagnostics);
+ } catch (IOException e) {
+ LOG.warn("Unable to update diagnostics in state store for "
+ + containerId, e);
+ }
+ }
+
+ @SuppressWarnings("unchecked") // dispatcher not typed
public void cleanup() {
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
new HashMap<LocalResourceVisibility,
@@ -505,6 +593,16 @@ public class ContainerImpl implements Co
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
+ if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
+ container.sendFinishedEvents();
+ return ContainerState.DONE;
+ } else if (container.recoveredAsKilled &&
+ container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
+ // container was killed but never launched
+ container.finished();
+ return ContainerState.DONE;
+ }
+
final ContainerLaunchContext ctxt = container.launchContext;
container.metrics.initingContainer();
@@ -580,9 +678,7 @@ public class ContainerImpl implements Co
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
- container.dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(container,
- ContainersLauncherEventType.LAUNCH_CONTAINER));
+ container.sendLaunchEvent();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
@@ -593,7 +689,6 @@ public class ContainerImpl implements Co
* Transition when one of the requested resources for this container
* has been successfully localized.
*/
- @SuppressWarnings("unchecked") // dispatcher not typed
static class LocalizedTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@Override
@@ -613,9 +708,8 @@ public class ContainerImpl implements Co
if (!container.pendingResources.isEmpty()) {
return ContainerState.LOCALIZING;
}
- container.dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(container,
- ContainersLauncherEventType.LAUNCH_CONTAINER));
+
+ container.sendLaunchEvent();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
@@ -625,24 +719,22 @@ public class ContainerImpl implements Co
* Transition from LOCALIZED state to RUNNING state upon receiving
* a CONTAINER_LAUNCHED event
*/
- @SuppressWarnings("unchecked") // dispatcher not typed
static class LaunchTransition extends ContainerTransition {
+ @SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
- // Inform the ContainersMonitor to start monitoring the container's
- // resource usage.
- long pmemBytes =
- container.getResource().getMemory() * 1024 * 1024L;
- float pmemRatio = container.daemonConf.getFloat(
- YarnConfiguration.NM_VMEM_PMEM_RATIO,
- YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
- long vmemBytes = (long) (pmemRatio * pmemBytes);
-
- container.dispatcher.getEventHandler().handle(
- new ContainerStartMonitoringEvent(container.containerId,
- vmemBytes, pmemBytes));
+ container.sendContainerMonitorStartEvent();
container.metrics.runningContainer();
container.wasLaunched = true;
+
+ if (container.recoveredAsKilled) {
+ LOG.info("Killing " + container.containerId
+ + " due to recovered as killed");
+ container.addDiagnostics("Container recovered as killed.\n");
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.CLEANUP_CONTAINER));
+ }
}
}
@@ -694,8 +786,7 @@ public class ContainerImpl implements Co
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
container.exitCode = exitEvent.getExitCode();
if (exitEvent.getDiagnosticInfo() != null) {
- container.diagnostics.append(exitEvent.getDiagnosticInfo())
- .append('\n');
+ container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
// TODO: Add containerWorkDir to the deletion service.
@@ -722,7 +813,7 @@ public class ContainerImpl implements Co
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
super.transition(container, event);
- container.diagnostics.append("Killed by external signal\n");
+ container.addDiagnostics("Killed by external signal\n");
}
}
@@ -737,9 +828,7 @@ public class ContainerImpl implements Co
ContainerResourceFailedEvent rsrcFailedEvent =
(ContainerResourceFailedEvent) event;
- container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage()
- + "\n");
-
+ container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n");
// Inform the localizer to decrement reference counts and cleanup
// resources.
@@ -761,9 +850,9 @@ public class ContainerImpl implements Co
container.cleanup();
container.metrics.endInitingContainer();
ContainerKillEvent killEvent = (ContainerKillEvent) event;
- container.exitCode = ExitCode.TERMINATED.getExitCode();
- container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
- container.diagnostics.append("Container is killed before being launched.\n");
+ container.exitCode = killEvent.getContainerExitStatus();
+ container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+ container.addDiagnostics("Container is killed before being launched.\n");
}
}
@@ -804,7 +893,8 @@ public class ContainerImpl implements Co
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
ContainerKillEvent killEvent = (ContainerKillEvent) event;
- container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
+ container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+ container.exitCode = killEvent.getContainerExitStatus();
}
}
@@ -817,10 +907,12 @@ public class ContainerImpl implements Co
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
- container.exitCode = exitEvent.getExitCode();
+ if (container.hasDefaultExitCode()) {
+ container.exitCode = exitEvent.getExitCode();
+ }
+
if (exitEvent.getDiagnosticInfo() != null) {
- container.diagnostics.append(exitEvent.getDiagnosticInfo())
- .append('\n');
+ container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
// The process/process-grp is killed. Decrement reference counts and
@@ -859,9 +951,9 @@ public class ContainerImpl implements Co
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerKillEvent killEvent = (ContainerKillEvent) event;
- container.exitCode = ExitCode.TERMINATED.getExitCode();
- container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
- container.diagnostics.append("Container is killed before being launched.\n");
+ container.exitCode = killEvent.getContainerExitStatus();
+ container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+ container.addDiagnostics("Container is killed before being launched.\n");
super.transition(container, event);
}
}
@@ -875,8 +967,14 @@ public class ContainerImpl implements Co
public void transition(ContainerImpl container, ContainerEvent event) {
ContainerDiagnosticsUpdateEvent updateEvent =
(ContainerDiagnosticsUpdateEvent) event;
- container.diagnostics.append(updateEvent.getDiagnosticsUpdate())
- .append("\n");
+ container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n");
+ try {
+ container.stateStore.storeContainerDiagnostics(container.containerId,
+ container.diagnostics);
+ } catch (IOException e) {
+ LOG.warn("Unable to update state store diagnostics for "
+ + container.containerId, e);
+ }
}
}
@@ -916,4 +1014,8 @@ public class ContainerImpl implements Co
this.readLock.unlock();
}
}
+
+ private boolean hasDefaultExitCode() {
+ return (this.exitCode == ContainerExitStatus.INVALID);
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java Tue Aug 19 23:49:39 2014
@@ -23,13 +23,21 @@ import org.apache.hadoop.yarn.api.record
public class ContainerKillEvent extends ContainerEvent {
private final String diagnostic;
+ private final int exitStatus;
- public ContainerKillEvent(ContainerId cID, String diagnostic) {
+ public ContainerKillEvent(ContainerId cID,
+ int exitStatus, String diagnostic) {
super(cID, ContainerEventType.KILL_CONTAINER);
+ this.exitStatus = exitStatus;
this.diagnostic = diagnostic;
}
public String getDiagnostic() {
return this.diagnostic;
}
+
+ public int getContainerExitStatus() {
+ return this.exitStatus;
+ }
+
}