You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [7/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Tue Aug 19 23:49:39 2014
@@ -87,22 +87,23 @@ public class ContainerLaunch implements
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
private static final String PID_FILE_NAME_FMT = "%s.pid";
+ private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
- private final Dispatcher dispatcher;
- private final ContainerExecutor exec;
+ protected final Dispatcher dispatcher;
+ protected final ContainerExecutor exec;
private final Application app;
- private final Container container;
+ protected final Container container;
private final Configuration conf;
private final Context context;
private final ContainerManagerImpl containerManager;
- private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
- private volatile AtomicBoolean completed = new AtomicBoolean(false);
+ protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
+ protected AtomicBoolean completed = new AtomicBoolean(false);
private long sleepDelayBeforeSigKill = 250;
private long maxKillWaitTime = 2000;
- private Path pidFilePath = null;
+ protected Path pidFilePath = null;
private final LocalDirsHandlerService dirsHandler;
@@ -223,14 +224,11 @@ public class ContainerLaunch implements
+ Path.SEPARATOR + containerIdStr,
LocalDirAllocator.SIZE_UNKNOWN, false);
- String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
- containerIdStr);
+ String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
// pid file should be in nm private dir so that it is not
// accessible by users
- pidFilePath = dirsHandler.getLocalPathForWrite(
- ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
- + pidFileSuffix);
+ pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
@@ -288,6 +286,7 @@ public class ContainerLaunch implements
dispatcher.getEventHandler().handle(new ContainerEvent(
containerID,
ContainerEventType.CONTAINER_LAUNCHED));
+ context.getNMStateStore().storeContainerLaunched(containerID);
// Check if the container is signalled to be killed.
if (!shouldLaunchContainer.compareAndSet(false, true)) {
@@ -310,6 +309,11 @@ public class ContainerLaunch implements
} finally {
completed.set(true);
exec.deactivateContainer(containerID);
+ try {
+ context.getNMStateStore().storeContainerCompleted(containerID, ret);
+ } catch (IOException e) {
+ LOG.error("Unable to set exit code for container " + containerID);
+ }
}
if (LOG.isDebugEnabled()) {
@@ -342,6 +346,11 @@ public class ContainerLaunch implements
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
return 0;
}
+
+ protected String getPidFileSubpath(String appIdStr, String containerIdStr) {
+ return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
+ }
/**
* Cleanup the container.
@@ -357,6 +366,13 @@ public class ContainerLaunch implements
String containerIdStr = ConverterUtils.toString(containerId);
LOG.info("Cleaning up container " + containerIdStr);
+ try {
+ context.getNMStateStore().storeContainerKilled(containerId);
+ } catch (IOException e) {
+ LOG.error("Unable to mark container " + containerId
+ + " killed in store", e);
+ }
+
// launch flag will be set to true if process already launched
boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
if (!alreadyLaunched) {
@@ -421,6 +437,7 @@ public class ContainerLaunch implements
if (pidFilePath != null) {
FileContext lfs = FileContext.getLocalFSFileContext();
lfs.delete(pidFilePath, false);
+ lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
}
}
}
@@ -479,15 +496,24 @@ public class ContainerLaunch implements
+ appIdStr;
}
- private static abstract class ShellScriptBuilder {
+ Context getContext() {
+ return context;
+ }
+
+ @VisibleForTesting
+ static abstract class ShellScriptBuilder {
+ public static ShellScriptBuilder create() {
+ return Shell.WINDOWS ? new WindowsShellScriptBuilder() :
+ new UnixShellScriptBuilder();
+ }
private static final String LINE_SEPARATOR =
System.getProperty("line.separator");
private final StringBuilder sb = new StringBuilder();
- public abstract void command(List<String> command);
+ public abstract void command(List<String> command) throws IOException;
- public abstract void env(String key, String value);
+ public abstract void env(String key, String value) throws IOException;
public final void symlink(Path src, Path dst) throws IOException {
if (!src.isAbsolute()) {
@@ -520,11 +546,19 @@ public class ContainerLaunch implements
protected abstract void link(Path src, Path dst) throws IOException;
- protected abstract void mkdir(Path path);
+ protected abstract void mkdir(Path path) throws IOException;
}
private static final class UnixShellScriptBuilder extends ShellScriptBuilder {
+ private void errorCheck() {
+ line("hadoop_shell_errorcode=$?");
+ line("if [ $hadoop_shell_errorcode -ne 0 ]");
+ line("then");
+ line(" exit $hadoop_shell_errorcode");
+ line("fi");
+ }
+
public UnixShellScriptBuilder(){
line("#!/bin/bash");
line();
@@ -533,6 +567,7 @@ public class ContainerLaunch implements
@Override
public void command(List<String> command) {
line("exec /bin/bash -c \"", StringUtils.join(" ", command), "\"");
+ errorCheck();
}
@Override
@@ -543,31 +578,43 @@ public class ContainerLaunch implements
@Override
protected void link(Path src, Path dst) throws IOException {
line("ln -sf \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
+ errorCheck();
}
@Override
protected void mkdir(Path path) {
line("mkdir -p ", path.toString());
+ errorCheck();
}
}
private static final class WindowsShellScriptBuilder
extends ShellScriptBuilder {
+ private void errorCheck() {
+ line("@if %errorlevel% neq 0 exit /b %errorlevel%");
+ }
+
+ private void lineWithLenCheck(String... commands) throws IOException {
+ Shell.checkWindowsCommandLineLength(commands);
+ line(commands);
+ }
+
public WindowsShellScriptBuilder() {
line("@setlocal");
line();
}
@Override
- public void command(List<String> command) {
- line("@call ", StringUtils.join(" ", command));
+ public void command(List<String> command) throws IOException {
+ lineWithLenCheck("@call ", StringUtils.join(" ", command));
+ errorCheck();
}
@Override
- public void env(String key, String value) {
- line("@set ", key, "=", value,
- "\nif %errorlevel% neq 0 exit /b %errorlevel%");
+ public void env(String key, String value) throws IOException {
+ lineWithLenCheck("@set ", key, "=", value);
+ errorCheck();
}
@Override
@@ -578,16 +625,20 @@ public class ContainerLaunch implements
// If not on Java7+ on Windows, then copy file instead of symlinking.
// See also FileUtil#symLink for full explanation.
if (!Shell.isJava7OrAbove() && srcFile.isFile()) {
- line(String.format("@copy \"%s\" \"%s\"", srcFileStr, dstFileStr));
+ lineWithLenCheck(String.format("@copy \"%s\" \"%s\"", srcFileStr, dstFileStr));
+ errorCheck();
} else {
- line(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS,
+ lineWithLenCheck(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS,
dstFileStr, srcFileStr));
+ errorCheck();
}
}
@Override
- protected void mkdir(Path path) {
- line("@if not exist ", path.toString(), " mkdir ", path.toString());
+ protected void mkdir(Path path) throws IOException {
+ lineWithLenCheck(String.format("@if not exist \"%s\" mkdir \"%s\"",
+ path.toString(), path.toString()));
+ errorCheck();
}
}
@@ -730,8 +781,7 @@ public class ContainerLaunch implements
Map<String,String> environment, Map<Path,List<String>> resources,
List<String> command)
throws IOException {
- ShellScriptBuilder sb = Shell.WINDOWS ? new WindowsShellScriptBuilder() :
- new UnixShellScriptBuilder();
+ ShellScriptBuilder sb = ShellScriptBuilder.create();
if (environment != null) {
for (Map.Entry<String,String> env : environment.entrySet()) {
sb.env(env.getKey().toString(), env.getValue().toString());
@@ -758,4 +808,7 @@ public class ContainerLaunch implements
}
}
+ public static String getExitCodeFile(String pidFile) {
+ return pidFile + EXIT_CODE_FILE_SUFFIX;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import com.google.common.annotations.VisibleForTesting;
@@ -107,7 +101,6 @@ public class ContainersLauncher extends
super.serviceStop();
}
- @SuppressWarnings("unchecked")
@Override
public void handle(ContainersLauncherEvent event) {
// TODO: ContainersLauncher launches containers one by one!!
@@ -125,6 +118,14 @@ public class ContainersLauncher extends
containerLauncher.submit(launch);
running.put(containerId, launch);
break;
+ case RECOVER_CONTAINER:
+ app = context.getApplications().get(
+ containerId.getApplicationAttemptId().getApplicationId());
+ launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,
+ exec, app, event.getContainer(), dirsHandler, containerManager);
+ containerLauncher.submit(launch);
+ running.put(containerId, launch);
+ break;
case CLEANUP_CONTAINER:
ContainerLaunch launcher = running.remove(containerId);
if (launcher == null) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java Tue Aug 19 23:49:39 2014
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no
public enum ContainersLauncherEventType {
LAUNCH_CONTAINER,
+ RECOVER_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself.
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* {@link LocalCacheDirectoryManager} is used for managing hierarchical
* directories for local cache. It will allow to restrict the number of files in
@@ -99,6 +101,57 @@ public class LocalCacheDirectoryManager
}
}
+ /**
+ * Increment the file count for a relative directory within the cache
+ *
+ * @param relPath the relative path
+ */
+ public synchronized void incrementFileCountForPath(String relPath) {
+ relPath = relPath == null ? "" : relPath.trim();
+ Directory subDir = knownDirectories.get(relPath);
+ if (subDir == null) {
+ int dirnum = Directory.getDirectoryNumber(relPath);
+ totalSubDirectories = Math.max(dirnum, totalSubDirectories);
+ subDir = new Directory(dirnum);
+ nonFullDirectories.add(subDir);
+ knownDirectories.put(subDir.getRelativePath(), subDir);
+ }
+ if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
+ nonFullDirectories.remove(subDir);
+ }
+ }
+
+ /**
+ * Given a path to a directory within a local cache tree return the
+ * root of the cache directory.
+ *
+ * @param path the directory within a cache directory
+ * @return the local cache directory root or null if not found
+ */
+ public static Path getCacheDirectoryRoot(Path path) {
+ while (path != null) {
+ String name = path.getName();
+ if (name.length() != 1) {
+ return path;
+ }
+ int dirnum = DIRECTORIES_PER_LEVEL;
+ try {
+ dirnum = Integer.parseInt(name, DIRECTORIES_PER_LEVEL);
+ } catch (NumberFormatException e) {
+ }
+ if (dirnum >= DIRECTORIES_PER_LEVEL) {
+ return path;
+ }
+ path = path.getParent();
+ }
+ return path;
+ }
+
+ @VisibleForTesting
+ synchronized Directory getDirectory(String relPath) {
+ return knownDirectories.get(relPath);
+ }
+
/*
* It limits the number of files and sub directories in the directory to the
* limit LocalCacheDirectoryManager#perDirectoryFileLimit.
@@ -108,11 +161,9 @@ public class LocalCacheDirectoryManager
private final String relativePath;
private int fileCount;
- public Directory(int directoryNo) {
- fileCount = 0;
- if (directoryNo == 0) {
- relativePath = "";
- } else {
+ static String getRelativePath(int directoryNo) {
+ String relativePath = "";
+ if (directoryNo > 0) {
String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL);
StringBuffer sb = new StringBuffer();
if (tPath.length() == 1) {
@@ -128,6 +179,27 @@ public class LocalCacheDirectoryManager
}
relativePath = sb.toString();
}
+ return relativePath;
+ }
+
+ static int getDirectoryNumber(String relativePath) {
+ String numStr = relativePath.replace("/", "");
+ if (relativePath.isEmpty()) {
+ return 0;
+ }
+ if (numStr.length() > 1) {
+ // undo step from getRelativePath() to reuse 0th sub directory
+ String firstChar = Integer.toString(
+ Integer.parseInt(numStr.substring(0, 1),
+ DIRECTORIES_PER_LEVEL) + 1, DIRECTORIES_PER_LEVEL);
+ numStr = firstChar + numStr.substring(1);
+ }
+ return Integer.parseInt(numStr, DIRECTORIES_PER_LEVEL) + 1;
+ }
+
+ public Directory(int directoryNo) {
+ fileCount = 0;
+ relativePath = getRelativePath(directoryNo);
}
public int incrementAndGetCount() {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Tue Aug 19 23:49:39 2014
@@ -18,15 +18,12 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* Component tracking resources all of the same {@link LocalResourceVisibility}
*
@@ -34,18 +31,11 @@ import com.google.common.annotations.Vis
interface LocalResourcesTracker
extends EventHandler<ResourceEvent>, Iterable<LocalizedResource> {
- // TODO: Not used at all!!
- boolean contains(LocalResourceRequest resource);
-
boolean remove(LocalizedResource req, DeletionService delService);
Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
String getUser();
- long nextUniqueNumber();
-
- @VisibleForTesting
- @Private
LocalizedResource getLocalizedResource(LocalResourceRequest request);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.io.File;
+import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -27,14 +28,21 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import com.google.common.annotations.VisibleForTesting;
@@ -53,6 +61,7 @@ class LocalResourcesTrackerImpl implemen
.compile(RANDOM_DIR_REGEX);
private final String user;
+ private final ApplicationId appId;
private final Dispatcher dispatcher;
private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
private Configuration conf;
@@ -77,17 +86,22 @@ class LocalResourcesTrackerImpl implemen
* per APPLICATION, USER and PUBLIC cache.
*/
private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
+ private NMStateStoreService stateStore;
- public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
- boolean useLocalCacheDirectoryManager, Configuration conf) {
- this(user, dispatcher,
+ public LocalResourcesTrackerImpl(String user, ApplicationId appId,
+ Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
+ Configuration conf, NMStateStoreService stateStore) {
+ this(user, appId, dispatcher,
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
- useLocalCacheDirectoryManager, conf);
+ useLocalCacheDirectoryManager, conf, stateStore);
}
- LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ LocalResourcesTrackerImpl(String user, ApplicationId appId,
+ Dispatcher dispatcher,
ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
- boolean useLocalCacheDirectoryManager, Configuration conf) {
+ boolean useLocalCacheDirectoryManager, Configuration conf,
+ NMStateStoreService stateStore) {
+ this.appId = appId;
this.user = user;
this.dispatcher = dispatcher;
this.localrsrc = localrsrc;
@@ -98,6 +112,7 @@ class LocalResourcesTrackerImpl implemen
new ConcurrentHashMap<LocalResourceRequest, Path>();
}
this.conf = conf;
+ this.stateStore = stateStore;
}
/*
@@ -119,8 +134,7 @@ class LocalResourcesTrackerImpl implemen
if (rsrc != null && (!isResourcePresent(rsrc))) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
- localrsrc.remove(req);
- decrementFileCountForLocalCacheDirectory(req, rsrc);
+ removeResource(req);
rsrc = null;
}
if (null == rsrc) {
@@ -141,15 +155,102 @@ class LocalResourcesTrackerImpl implemen
}
break;
case LOCALIZATION_FAILED:
- decrementFileCountForLocalCacheDirectory(req, null);
/*
* If resource localization fails then Localized resource will be
* removed from local cache.
*/
- localrsrc.remove(req);
+ removeResource(req);
+ break;
+ case RECOVERED:
+ if (rsrc != null) {
+ LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
+ return;
+ }
+ rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
+ localrsrc.put(req, rsrc);
break;
}
+
rsrc.handle(event);
+
+ if (event.getType() == ResourceEventType.LOCALIZED) {
+ if (rsrc.getLocalPath() != null) {
+ try {
+ stateStore.finishResourceLocalization(user, appId,
+ buildLocalizedResourceProto(rsrc));
+ } catch (IOException ioe) {
+ LOG.error("Error storing resource state for " + rsrc, ioe);
+ }
+ } else {
+ LOG.warn("Resource " + rsrc + " localized without a location");
+ }
+ }
+ }
+
+ private LocalizedResource recoverResource(LocalResourceRequest req,
+ ResourceRecoveredEvent event) {
+ // unique number for a resource is the directory of the resource
+ Path localDir = event.getLocalPath().getParent();
+ long rsrcId = Long.parseLong(localDir.getName());
+
+ // update ID generator to avoid conflicts with existing resources
+ while (true) {
+ long currentRsrcId = uniqueNumberGenerator.get();
+ long nextRsrcId = Math.max(currentRsrcId, rsrcId);
+ if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
+ break;
+ }
+ }
+
+ incrementFileCountForLocalCacheDirectory(localDir.getParent());
+
+ return new LocalizedResource(req, dispatcher);
+ }
+
+ private LocalizedResourceProto buildLocalizedResourceProto(
+ LocalizedResource rsrc) {
+ return LocalizedResourceProto.newBuilder()
+ .setResource(buildLocalResourceProto(rsrc.getRequest()))
+ .setLocalPath(rsrc.getLocalPath().toString())
+ .setSize(rsrc.getSize())
+ .build();
+ }
+
+ private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
+ LocalResourcePBImpl lrpb;
+ if (!(lr instanceof LocalResourcePBImpl)) {
+ lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
+ lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
+ lr.getPattern());
+ }
+ lrpb = (LocalResourcePBImpl) lr;
+ return lrpb.getProto();
+ }
+
+ public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
+ if (useLocalCacheDirectoryManager) {
+ Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
+ cacheDir);
+ if (cacheRoot != null) {
+ LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
+ if (dir == null) {
+ dir = new LocalCacheDirectoryManager(conf);
+ LocalCacheDirectoryManager otherDir =
+ directoryManagers.putIfAbsent(cacheRoot, dir);
+ if (otherDir != null) {
+ dir = otherDir;
+ }
+ }
+ if (cacheDir.equals(cacheRoot)) {
+ dir.incrementFileCountForPath("");
+ } else {
+ String dirStr = cacheDir.toUri().getRawPath();
+ String rootStr = cacheRoot.toUri().getRawPath();
+ dir.incrementFileCountForPath(
+ dirStr.substring(rootStr.length() + 1));
+ }
+ }
+ }
}
/*
@@ -217,11 +318,6 @@ class LocalResourcesTrackerImpl implemen
}
@Override
- public boolean contains(LocalResourceRequest resource) {
- return localrsrc.containsKey(resource);
- }
-
- @Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
// current synchronization guaranteed by crude RLS event for cleanup
LocalizedResource rsrc = localrsrc.get(rem.getRequest());
@@ -237,16 +333,31 @@ class LocalResourcesTrackerImpl implemen
+ " with non-zero refcount");
return false;
} else { // ResourceState is LOCALIZED or INIT
- localrsrc.remove(rem.getRequest());
if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
}
- decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
+ removeResource(rem.getRequest());
LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
return true;
}
}
+ private void removeResource(LocalResourceRequest req) {
+ LocalizedResource rsrc = localrsrc.remove(req);
+ decrementFileCountForLocalCacheDirectory(req, rsrc);
+ if (rsrc != null) {
+ Path localPath = rsrc.getLocalPath();
+ if (localPath != null) {
+ try {
+ stateStore.removeLocalizedResource(user, appId, localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to remove resource " + rsrc + " from state store",
+ e);
+ }
+ }
+ }
+ }
+
/**
* Returns the path up to the random directory component.
*/
@@ -285,6 +396,7 @@ class LocalResourcesTrackerImpl implemen
@Override
public Path
getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+ Path rPath = localDirPath;
if (useLocalCacheDirectoryManager && localDirPath != null) {
if (!directoryManagers.containsKey(localDirPath)) {
@@ -293,7 +405,7 @@ class LocalResourcesTrackerImpl implemen
}
LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
- Path rPath = localDirPath;
+ rPath = localDirPath;
String hierarchicalPath = dir.getRelativePathForLocalization();
// For most of the scenarios we will get root path only which
// is an empty string
@@ -301,21 +413,36 @@ class LocalResourcesTrackerImpl implemen
rPath = new Path(localDirPath, hierarchicalPath);
}
inProgressLocalResourcesMap.put(req, rPath);
- return rPath;
- } else {
- return localDirPath;
}
- }
- @Override
- public long nextUniqueNumber() {
- return uniqueNumberGenerator.incrementAndGet();
+ rPath = new Path(rPath,
+ Long.toString(uniqueNumberGenerator.incrementAndGet()));
+ Path localPath = new Path(rPath, req.getPath().getName());
+ LocalizedResource rsrc = localrsrc.get(req);
+ rsrc.setLocalPath(localPath);
+ LocalResource lr = LocalResource.newInstance(req.getResource(),
+ req.getType(), req.getVisibility(), req.getSize(),
+ req.getTimestamp());
+ try {
+ stateStore.startResourceLocalization(user, appId,
+ ((LocalResourcePBImpl) lr).getProto(), localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to record localization start for " + rsrc, e);
+ }
+ return rPath;
}
- @VisibleForTesting
- @Private
@Override
public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
return localrsrc.get(request);
}
-}
\ No newline at end of file
+
+ @VisibleForTesting
+ LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
+ LocalCacheDirectoryManager mgr = null;
+ if (useLocalCacheDirectoryManager) {
+ mgr = directoryManagers.get(localDirPath);
+ }
+ return mgr;
+ }
+}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Tue Aug 19 23:49:39 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -54,8 +55,8 @@ public class LocalizedResource implement
private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
- Path localPath;
- long size = -1;
+ volatile Path localPath;
+ volatile long size = -1;
final LocalResourceRequest rsrc;
final Dispatcher dispatcher;
final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
@@ -76,6 +77,8 @@ public class LocalizedResource implement
// From INIT (ref == 0, awaiting req)
.addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition())
+ .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
+ ResourceEventType.RECOVERED, new RecoveredTransition())
// From DOWNLOADING (ref > 0, may be localizing)
.addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
@@ -157,6 +160,10 @@ public class LocalizedResource implement
return localPath;
}
+ public void setLocalPath(Path localPath) {
+ this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath);
+ }
+
public long getTimestamp() {
return timestamp.get();
}
@@ -234,7 +241,8 @@ public class LocalizedResource implement
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
- rsrc.localPath = locEvent.getLocation();
+ rsrc.localPath =
+ Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
rsrc.size = locEvent.getSize();
for (ContainerId container : rsrc.ref) {
rsrc.dispatcher.getEventHandler().handle(
@@ -291,4 +299,13 @@ public class LocalizedResource implement
rsrc.release(relEvent.getContainer());
}
}
+
+ private static class RecoveredTransition extends ResourceTransition {
+ @Override
+ public void transition(LocalizedResource rsrc, ResourceEvent event) {
+ ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event;
+ rsrc.localPath = recoveredEvent.getLocalPath();
+ rsrc.size = recoveredEvent.getSize();
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Aug 19 23:49:39 2014
@@ -74,13 +74,17 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
@@ -109,10 +113,15 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -142,6 +151,7 @@ public class ResourceLocalizationService
private RecordFactory recordFactory;
private final ScheduledExecutorService cacheCleanup;
private LocalizerTokenSecretManager secretManager;
+ private NMStateStoreService stateStore;
private LocalResourcesTracker publicRsrc;
@@ -163,7 +173,7 @@ public class ResourceLocalizationService
public ResourceLocalizationService(Dispatcher dispatcher,
ContainerExecutor exec, DeletionService delService,
- LocalDirsHandlerService dirsHandler) {
+ LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
super(ResourceLocalizationService.class.getName());
this.exec = exec;
@@ -175,6 +185,7 @@ public class ResourceLocalizationService
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
+ this.stateStore = stateStore;
}
FileContext getLocalFileContext(Configuration conf) {
@@ -203,15 +214,17 @@ public class ResourceLocalizationService
@Override
public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
- this.publicRsrc =
- new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
+ this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
+ true, conf, stateStore);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
- cleanUpLocalDir(lfs,delService);
+ if (!stateStore.canRecover()) {
+ cleanUpLocalDir(lfs,delService);
+ }
List<String> localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
@@ -239,6 +252,7 @@ public class ResourceLocalizationService
cacheCleanupPeriod =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizationServerAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
@@ -249,6 +263,74 @@ public class ResourceLocalizationService
super.serviceInit(conf);
}
+ //Recover localized resources after an NM restart
+ public void recoverLocalizedResources(RecoveredLocalizationState state)
+ throws URISyntaxException {
+ LocalResourceTrackerState trackerState = state.getPublicTrackerState();
+ recoverTrackerResources(publicRsrc, trackerState);
+
+ for (Map.Entry<String, RecoveredUserResources> userEntry :
+ state.getUserResources().entrySet()) {
+ String user = userEntry.getKey();
+ RecoveredUserResources userResources = userEntry.getValue();
+ trackerState = userResources.getPrivateTrackerState();
+ if (!trackerState.isEmpty()) {
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ null, dispatcher, true, super.getConfig(), stateStore);
+ LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
+ tracker);
+ if (oldTracker != null) {
+ tracker = oldTracker;
+ }
+ recoverTrackerResources(tracker, trackerState);
+ }
+
+ for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
+ userResources.getAppTrackerStates().entrySet()) {
+ trackerState = appEntry.getValue();
+ if (!trackerState.isEmpty()) {
+ ApplicationId appId = appEntry.getKey();
+ String appIdStr = ConverterUtils.toString(appId);
+ LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+ appId, dispatcher, false, super.getConfig(), stateStore);
+ LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+ tracker);
+ if (oldTracker != null) {
+ tracker = oldTracker;
+ }
+ recoverTrackerResources(tracker, trackerState);
+ }
+ }
+ }
+ }
+
+ private void recoverTrackerResources(LocalResourcesTracker tracker,
+ LocalResourceTrackerState state) throws URISyntaxException {
+ for (LocalizedResourceProto proto : state.getLocalizedResources()) {
+ LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ LOG.info("Recovering localized resource " + req + " at "
+ + proto.getLocalPath());
+ tracker.handle(new ResourceRecoveredEvent(req,
+ new Path(proto.getLocalPath()), proto.getSize()));
+ }
+
+ for (Map.Entry<LocalResourceProto, Path> entry :
+ state.getInProgressResources().entrySet()) {
+ LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
+ LocalResourceRequest req = new LocalResourceRequest(rsrc);
+ Path localPath = entry.getValue();
+ tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
+
+ // delete any in-progress localizations, containers will request again
+ LOG.info("Deleting in-progress localization for " + req + " at "
+ + localPath);
+ tracker.remove(tracker.getLocalizedResource(req), delService);
+ }
+
+ // TODO: remove untracked directories in local filesystem
+ }
+
@Override
public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
return localizerTracker.processHeartbeat(status);
@@ -261,7 +343,9 @@ public class ResourceLocalizationService
server = createServer();
server.start();
localizationServerAddress =
- getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
+ getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_LOCALIZER_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort());
super.serviceStart();
@@ -337,17 +421,10 @@ public class ResourceLocalizationService
// 0) Create application tracking structs
String userName = app.getUser();
privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
- dispatcher, true, super.getConfig()));
- if (null != appRsrc.putIfAbsent(
- ConverterUtils.toString(app.getAppId()),
- new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
- .getConfig()))) {
- LOG.warn("Initializing application " + app + " already present");
- assert false; // TODO: FIXME assert doesn't help
- // ^ The condition is benign. Tests should fail and it
- // should appear in logs, but it's an internal error
- // that should have no effect on applications
- }
+ null, dispatcher, true, super.getConfig(), stateStore));
+ String appIdStr = ConverterUtils.toString(app.getAppId());
+ appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
+ app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
// 1) Signal container init
//
// This is handled by the ApplicationImpl state machine and allows
@@ -446,18 +523,28 @@ public class ResourceLocalizationService
@SuppressWarnings({"unchecked"})
private void handleDestroyApplicationResources(Application application) {
- String userName;
- String appIDStr;
+ String userName = application.getUser();
+ ApplicationId appId = application.getAppId();
+ String appIDStr = application.toString();
LocalResourcesTracker appLocalRsrcsTracker =
- appRsrc.remove(ConverterUtils.toString(application.getAppId()));
- if (null == appLocalRsrcsTracker) {
+ appRsrc.remove(ConverterUtils.toString(appId));
+ if (appLocalRsrcsTracker != null) {
+ for (LocalizedResource rsrc : appLocalRsrcsTracker ) {
+ Path localPath = rsrc.getLocalPath();
+ if (localPath != null) {
+ try {
+ stateStore.removeLocalizedResource(userName, appId, localPath);
+ } catch (IOException e) {
+ LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr
+ + " from state store", e);
+ }
+ }
+ }
+ } else {
LOG.warn("Removing uninitialized application " + application);
}
- // TODO: What to do with appLocalRsrcsTracker?
// Delete the application directories
- userName = application.getUser();
- appIDStr = application.toString();
for (String localDir : dirsHandler.getLocalDirs()) {
// Delete the user-owned app-dir
@@ -668,19 +755,15 @@ public class ResourceLocalizationService
if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
LocalResource resource = request.getResource().getRequest();
try {
- Path publicDirDestPath =
+ Path publicRootPath =
dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
+ ContainerLocalizer.FILECACHE,
ContainerLocalizer.getEstimatedSize(resource), true);
- Path hierarchicalPath =
- publicRsrc.getPathForLocalization(key, publicDirDestPath);
- if (!hierarchicalPath.equals(publicDirDestPath)) {
- publicDirDestPath = hierarchicalPath;
+ Path publicDirDestPath =
+ publicRsrc.getPathForLocalization(key, publicRootPath);
+ if (!publicDirDestPath.getParent().equals(publicRootPath)) {
DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
}
- publicDirDestPath =
- new Path(publicDirDestPath, Long.toString(publicRsrc
- .nextUniqueNumber()));
// explicitly synchronize pending here to avoid future task
// completing and being dequeued before pending updated
synchronized (pending) {
@@ -968,9 +1051,8 @@ public class ResourceLocalizationService
Path dirPath =
dirsHandler.getLocalPathForWrite(cacheDirectory,
ContainerLocalizer.getEstimatedSize(rsrc), false);
- dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
- dirPath);
- return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+ return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+ dirPath);
}
@Override
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Tue Aug 19 23:49:39 2014
@@ -31,5 +31,7 @@ public enum ResourceEventType {
/** See {@link ResourceReleaseEvent} */
RELEASE,
/** See {@link ResourceFailedLocalizationEvent} */
- LOCALIZATION_FAILED
+ LOCALIZATION_FAILED,
+ /** See {@link ResourceRecoveredEvent} */
+ RECOVERED
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Tue Aug 19 23:49:39 2014
@@ -25,5 +25,7 @@ public interface AppLogAggregator extend
void startContainerLogAggregation(ContainerId containerId,
boolean wasContainerSuccessful);
+ void abortLogAggregation();
+
void finishLogAggregation();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Tue Aug 19 23:49:39 2014
@@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implem
private final BlockingQueue<ContainerId> pendingContainers;
private final AtomicBoolean appFinishing = new AtomicBoolean();
private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+ private final AtomicBoolean aborted = new AtomicBoolean();
private final Map<ApplicationAccessType, String> appAcls;
private LogWriter writer = null;
@@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implem
private void doAppLogAggregation() {
ContainerId containerId;
- while (!this.appFinishing.get()) {
+ while (!this.appFinishing.get() && !this.aborted.get()) {
synchronized(this) {
try {
wait(THREAD_SLEEP_TIME);
@@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implem
}
}
+ if (this.aborted.get()) {
+ return;
+ }
+
// Application is finished. Finish pending-containers
while ((containerId = this.pendingContainers.poll()) != null) {
uploadLogsForContainer(containerId);
@@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implem
this.appFinishing.set(true);
this.notifyAll();
}
+
+ @Override
+ public synchronized void abortLogAggregation() {
+ LOG.info("Aborting log aggregation for " + this.applicationId);
+ this.aborted.set(true);
+ this.notifyAll();
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue Aug 19 23:49:39 2014
@@ -142,9 +142,17 @@ public class LogAggregationService exten
private void stopAggregators() {
threadPool.shutdown();
+ // if recovery on restart is supported then leave outstanding aggregations
+ // to the next restart
+ boolean shouldAbort = context.getNMStateStore().canRecover()
+ && !context.getDecommissioned();
// politely ask to finish
for (AppLogAggregator aggregator : appLogAggregators.values()) {
- aggregator.finishLogAggregation();
+ if (shouldAbort) {
+ aggregator.abortLogAggregation();
+ } else {
+ aggregator.finishLogAggregation();
+ }
}
while (!threadPool.isTerminated()) { // wait for all threads to finish
for (ApplicationId appId : appLogAggregators.keySet()) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -69,6 +70,8 @@ public class ContainersMonitorImpl exten
private boolean pmemCheckEnabled;
private boolean vmemCheckEnabled;
+ private long maxVCoresAllottedForContainers;
+
private static final long UNKNOWN_MEMORY_LIMIT = -1L;
public ContainersMonitorImpl(ContainerExecutor exec,
@@ -107,10 +110,16 @@ public class ContainersMonitorImpl exten
YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
+ long configuredVCoresForContainers = conf.getLong(
+ YarnConfiguration.NM_VCORES,
+ YarnConfiguration.DEFAULT_NM_VCORES);
+
+
// Setting these irrespective of whether checks are enabled. Required in
// the UI.
// ///////// Physical memory configuration //////
this.maxPmemAllottedForContainers = configuredPMemForContainers;
+ this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
// ///////// Virtual memory configuration //////
float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
@@ -403,6 +412,7 @@ public class ContainersMonitorImpl exten
boolean isMemoryOverLimit = false;
String msg = "";
+ int containerExitStatus = ContainerExitStatus.INVALID;
if (isVmemCheckEnabled()
&& isProcessTreeOverLimit(containerId.toString(),
currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
@@ -414,6 +424,7 @@ public class ContainersMonitorImpl exten
currentPmemUsage, pmemLimit,
pId, containerId, pTree);
isMemoryOverLimit = true;
+ containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;
} else if (isPmemCheckEnabled()
&& isProcessTreeOverLimit(containerId.toString(),
currentPmemUsage, curRssMemUsageOfAgedProcesses,
@@ -426,6 +437,7 @@ public class ContainersMonitorImpl exten
currentPmemUsage, pmemLimit,
pId, containerId, pTree);
isMemoryOverLimit = true;
+ containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
}
if (isMemoryOverLimit) {
@@ -440,7 +452,8 @@ public class ContainersMonitorImpl exten
}
// kill the container
eventDispatcher.getEventHandler().handle(
- new ContainerKillEvent(containerId, msg));
+ new ContainerKillEvent(containerId,
+ containerExitStatus, msg));
it.remove();
LOG.info("Removed ProcessTree with root " + pId);
} else {
@@ -513,6 +526,11 @@ public class ContainersMonitorImpl exten
return this.maxPmemAllottedForContainers;
}
+ @Override
+ public long getVCoresAllocatedForContainers() {
+ return this.maxVCoresAllottedForContainers;
+ }
+
/**
* Is the total virtual memory check enabled?
*
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java Tue Aug 19 23:49:39 2014
@@ -40,6 +40,9 @@ public class NodeManagerMetrics {
@Metric("Current # of allocated containers")
MutableGaugeInt allocatedContainers;
@Metric MutableGaugeInt availableGB;
+ @Metric("Current allocated Virtual Cores")
+ MutableGaugeInt allocatedVCores;
+ @Metric MutableGaugeInt availableVCores;
public static NodeManagerMetrics create() {
return create(DefaultMetricsSystem.instance());
@@ -88,16 +91,21 @@ public class NodeManagerMetrics {
allocatedContainers.incr();
allocatedGB.incr(res.getMemory() / 1024);
availableGB.decr(res.getMemory() / 1024);
+ allocatedVCores.incr(res.getVirtualCores());
+ availableVCores.decr(res.getVirtualCores());
}
public void releaseContainer(Resource res) {
allocatedContainers.decr();
allocatedGB.decr(res.getMemory() / 1024);
availableGB.incr(res.getMemory() / 1024);
+ allocatedVCores.decr(res.getVirtualCores());
+ availableVCores.incr(res.getVirtualCores());
}
public void addResource(Resource res) {
availableGB.incr(res.getMemory() / 1024);
+ availableVCores.incr(res.getVirtualCores());
}
public int getRunningContainers() {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.security;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -33,6 +34,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
@@ -49,14 +53,74 @@ public class NMContainerTokenSecretManag
private MasterKeyData previousMasterKey;
private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
-
+ private final NMStateStoreService stateStore;
private String nodeHostAddr;
public NMContainerTokenSecretManager(Configuration conf) {
+ this(conf, new NMNullStateStoreService());
+ }
+
+ public NMContainerTokenSecretManager(Configuration conf,
+ NMStateStoreService stateStore) {
super(conf);
recentlyStartedContainerTracker =
new TreeMap<Long, List<ContainerId>>();
+ this.stateStore = stateStore;
+ }
+
+ public synchronized void recover()
+ throws IOException {
+ RecoveredContainerTokensState state =
+ stateStore.loadContainerTokensState();
+ MasterKey key = state.getCurrentMasterKey();
+ if (key != null) {
+ super.currentMasterKey =
+ new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+ }
+
+ key = state.getPreviousMasterKey();
+ if (key != null) {
+ previousMasterKey =
+ new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+ }
+
+ // restore the serial number from the current master key
+ if (super.currentMasterKey != null) {
+ super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
+ }
+
+ for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
+ ContainerId containerId = entry.getKey();
+ Long expTime = entry.getValue();
+ List<ContainerId> containerList =
+ recentlyStartedContainerTracker.get(expTime);
+ if (containerList == null) {
+ containerList = new ArrayList<ContainerId>();
+ recentlyStartedContainerTracker.put(expTime, containerList);
+ }
+ if (!containerList.contains(containerId)) {
+ containerList.add(containerId);
+ }
+ }
+ }
+
+ private void updateCurrentMasterKey(MasterKeyData key) {
+ super.currentMasterKey = key;
+ try {
+ stateStore.storeContainerTokenCurrentMasterKey(key.getMasterKey());
+ } catch (IOException e) {
+ LOG.error("Unable to update current master key in state store", e);
+ }
+ }
+
+ private void updatePreviousMasterKey(MasterKeyData key) {
+ previousMasterKey = key;
+ try {
+ stateStore.storeContainerTokenPreviousMasterKey(key.getMasterKey());
+ } catch (IOException e) {
+ LOG.error("Unable to update previous master key in state store", e);
+ }
}
/**
@@ -68,21 +132,16 @@ public class NMContainerTokenSecretManag
*/
@Private
public synchronized void setMasterKey(MasterKey masterKeyRecord) {
- LOG.info("Rolling master-key for container-tokens, got key with id "
- + masterKeyRecord.getKeyId());
- if (super.currentMasterKey == null) {
- super.currentMasterKey =
- new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
- .getBytes().array()));
- } else {
- if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord
- .getKeyId()) {
- // Update keys only if the key has changed.
- this.previousMasterKey = super.currentMasterKey;
- super.currentMasterKey =
- new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
- .getBytes().array()));
+ // Update keys only if the key has changed.
+ if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
+ .getKeyId() != masterKeyRecord.getKeyId()) {
+ LOG.info("Rolling master-key for container-tokens, got key with id "
+ + masterKeyRecord.getKeyId());
+ if (super.currentMasterKey != null) {
+ updatePreviousMasterKey(super.currentMasterKey);
}
+ updateCurrentMasterKey(new MasterKeyData(masterKeyRecord,
+ createSecretKey(masterKeyRecord.getBytes().array())));
}
}
@@ -137,14 +196,19 @@ public class NMContainerTokenSecretManag
removeAnyContainerTokenIfExpired();
+ ContainerId containerId = tokenId.getContainerID();
Long expTime = tokenId.getExpiryTimeStamp();
// We might have multiple containers with same expiration time.
if (!recentlyStartedContainerTracker.containsKey(expTime)) {
recentlyStartedContainerTracker
.put(expTime, new ArrayList<ContainerId>());
}
- recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
-
+ recentlyStartedContainerTracker.get(expTime).add(containerId);
+ try {
+ stateStore.storeContainerToken(containerId, expTime);
+ } catch (IOException e) {
+ LOG.error("Unable to store token for container " + containerId, e);
+ }
}
protected synchronized void removeAnyContainerTokenIfExpired() {
@@ -155,6 +219,13 @@ public class NMContainerTokenSecretManag
while (containersI.hasNext()) {
Entry<Long, List<ContainerId>> containerEntry = containersI.next();
if (containerEntry.getKey() < currTime) {
+ for (ContainerId container : containerEntry.getValue()) {
+ try {
+ stateStore.removeContainerToken(container);
+ } catch (IOException e) {
+ LOG.error("Unable to remove token for container " + container, e);
+ }
+ }
containersI.remove();
} else {
break;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.security;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -31,6 +32,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
@@ -45,16 +49,79 @@ public class NMTokenSecretManagerInNM ex
private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap;
+ private final NMStateStoreService stateStore;
private NodeId nodeId;
-
public NMTokenSecretManagerInNM() {
+ this(new NMNullStateStoreService());
+ }
+
+ public NMTokenSecretManagerInNM(NMStateStoreService stateStore) {
this.oldMasterKeys =
new HashMap<ApplicationAttemptId, MasterKeyData>();
appToAppAttemptMap =
new HashMap<ApplicationId, List<ApplicationAttemptId>>();
+ this.stateStore = stateStore;
}
+ public synchronized void recover()
+ throws IOException {
+ RecoveredNMTokensState state = stateStore.loadNMTokensState();
+ MasterKey key = state.getCurrentMasterKey();
+ if (key != null) {
+ super.currentMasterKey =
+ new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+ }
+
+ key = state.getPreviousMasterKey();
+ if (key != null) {
+ previousMasterKey =
+ new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+ }
+
+ // restore the serial number from the current master key
+ if (super.currentMasterKey != null) {
+ super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
+ }
+
+ for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
+ state.getApplicationMasterKeys().entrySet()) {
+ key = entry.getValue();
+ oldMasterKeys.put(entry.getKey(),
+ new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+ }
+
+ // reconstruct app to app attempts map
+ appToAppAttemptMap.clear();
+ for (ApplicationAttemptId attempt : oldMasterKeys.keySet()) {
+ ApplicationId app = attempt.getApplicationId();
+ List<ApplicationAttemptId> attempts = appToAppAttemptMap.get(app);
+ if (attempts == null) {
+ attempts = new ArrayList<ApplicationAttemptId>();
+ appToAppAttemptMap.put(app, attempts);
+ }
+ attempts.add(attempt);
+ }
+ }
+
+ private void updateCurrentMasterKey(MasterKeyData key) {
+ super.currentMasterKey = key;
+ try {
+ stateStore.storeNMTokenCurrentMasterKey(key.getMasterKey());
+ } catch (IOException e) {
+ LOG.error("Unable to update current master key in state store", e);
+ }
+ }
+
+ private void updatePreviousMasterKey(MasterKeyData key) {
+ previousMasterKey = key;
+ try {
+ stateStore.storeNMTokenPreviousMasterKey(key.getMasterKey());
+ } catch (IOException e) {
+ LOG.error("Unable to update previous master key in state store", e);
+ }
+ }
+
/**
* Used by NodeManagers to create a token-secret-manager with the key
* obtained from the RM. This can happen during registration or when the RM
@@ -62,20 +129,16 @@ public class NMTokenSecretManagerInNM ex
*/
@Private
public synchronized void setMasterKey(MasterKey masterKey) {
- LOG.info("Rolling master-key for nm-tokens, got key with id :"
- + masterKey.getKeyId());
- if (super.currentMasterKey == null) {
- super.currentMasterKey =
- new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
- .array()));
- } else {
- if (super.currentMasterKey.getMasterKey().getKeyId() != masterKey
- .getKeyId()) {
- this.previousMasterKey = super.currentMasterKey;
- super.currentMasterKey =
- new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
- .array()));
+ // Update keys only if the key has changed.
+ if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
+ .getKeyId() != masterKey.getKeyId()) {
+ LOG.info("Rolling master-key for container-tokens, got key with id "
+ + masterKey.getKeyId());
+ if (super.currentMasterKey != null) {
+ updatePreviousMasterKey(super.currentMasterKey);
}
+ updateCurrentMasterKey(new MasterKeyData(masterKey,
+ createSecretKey(masterKey.getBytes().array())));
}
}
@@ -128,7 +191,7 @@ public class NMTokenSecretManagerInNM ex
LOG.debug("Removing application attempts NMToken keys for application "
+ appId);
for (ApplicationAttemptId appAttemptId : appAttemptList) {
- this.oldMasterKeys.remove(appAttemptId);
+ removeAppAttemptKey(appAttemptId);
}
appToAppAttemptMap.remove(appId);
} else {
@@ -164,11 +227,11 @@ public class NMTokenSecretManagerInNM ex
+ identifier.getApplicationAttemptId().toString());
if (identifier.getKeyId() == currentMasterKey.getMasterKey()
.getKeyId()) {
- oldMasterKeys.put(appAttemptId, currentMasterKey);
+ updateAppAttemptKey(appAttemptId, currentMasterKey);
} else if (previousMasterKey != null
&& identifier.getKeyId() == previousMasterKey.getMasterKey()
.getKeyId()) {
- oldMasterKeys.put(appAttemptId, previousMasterKey);
+ updateAppAttemptKey(appAttemptId, previousMasterKey);
} else {
throw new InvalidToken(
"Older NMToken should not be used while starting the container.");
@@ -193,4 +256,24 @@ public class NMTokenSecretManagerInNM ex
public synchronized NodeId getNodeId() {
return this.nodeId;
}
+
+ private void updateAppAttemptKey(ApplicationAttemptId attempt,
+ MasterKeyData key) {
+ this.oldMasterKeys.put(attempt, key);
+ try {
+ stateStore.storeNMTokenApplicationMasterKey(attempt,
+ key.getMasterKey());
+ } catch (IOException e) {
+ LOG.error("Unable to store master key for application " + attempt, e);
+ }
+ }
+
+ private void removeAppAttemptKey(ApplicationAttemptId attempt) {
+ this.oldMasterKeys.remove(attempt);
+ try {
+ stateStore.removeNMTokenApplicationMasterKey(attempt);
+ } catch (IOException e) {
+ LOG.error("Unable to remove master key for application " + attempt, e);
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java Tue Aug 19 23:49:39 2014
@@ -85,6 +85,7 @@ public class ContainerPage extends NMVie
._("Diagnostics", info.getDiagnostics())
._("User", info.getUser())
._("TotalMemoryNeeded", info.getMemoryNeeded())
+ ._("TotalVCoresNeeded", info.getVCoresNeeded())
._("logs", info.getShortLogLink(), "Link to logs");
html._(InfoBlock.class);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java Tue Aug 19 23:49:39 2014
@@ -72,7 +72,9 @@ public class NodePage extends NMView {
._("Total Pmem allocated for Container",
StringUtils.byteDesc(info.getTotalPmemAllocated() * BYTES_IN_MB))
._("Pmem enforcement enabled",
- info.isVmemCheckEnabled())
+ info.isPmemCheckEnabled())
+ ._("Total VCores allocated for Containers",
+ String.valueOf(info.getTotalVCoresAllocated()))
._("NodeHealthyStatus",
info.getHealthStatus())
._("LastNodeHealthTime", new Date(
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Tue Aug 19 23:49:39 2014
@@ -55,7 +55,9 @@ public class WebServer extends AbstractS
@Override
protected void serviceStart() throws Exception {
- String bindAddress = WebAppUtils.getNMWebAppURLWithoutScheme(getConfig());
+ String bindAddress = WebAppUtils.getWebAppBindURL(getConfig(),
+ YarnConfiguration.NM_BIND_HOST,
+ WebAppUtils.getNMWebAppURLWithoutScheme(getConfig()));
LOG.info("Instantiating NMWebApp at " + bindAddress);
try {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java Tue Aug 19 23:49:39 2014
@@ -42,6 +42,7 @@ public class ContainerInfo {
protected String diagnostics;
protected String user;
protected long totalMemoryNeededMB;
+ protected long totalVCoresNeeded;
protected String containerLogsLink;
protected String nodeId;
@XmlTransient
@@ -76,6 +77,7 @@ public class ContainerInfo {
Resource res = container.getResource();
if (res != null) {
this.totalMemoryNeededMB = res.getMemory();
+ this.totalVCoresNeeded = res.getVirtualCores();
}
this.containerLogsShortLink = ujoin("containerlogs", this.id,
container.getUser());
@@ -130,4 +132,8 @@ public class ContainerInfo {
return this.totalMemoryNeededMB;
}
+ public long getVCoresNeeded() {
+ return this.totalVCoresNeeded;
+ }
+
}