You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/10 17:18:59 UTC
svn commit: r1101502 [1/2] - in /hadoop/mapreduce/branches/MR-279: ./
yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/
yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/
yarn/ya...
Author: vinodkv
Date: Tue May 10 15:18:58 2011
New Revision: 1101502
URL: http://svn.apache.org/viewvc?rev=1101502&view=rev
Log:
Fix NodeManager to use multiple disks for the local files and the logs. Contributed by Vinod Kumar Vavilapalli.
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 10 15:18:58 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+ Fix NM to use multiple disks for local files and the userlogs. (vinodkv)
+
Improved TestJobHistoryEvents and TestJobHistoryParsing. (sharad)
MAPREDUCE-2478. Improve history server. (Siddharth Seth via sharad)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c Tue May 10 15:18:58 2011
@@ -261,24 +261,11 @@ char *get_task_credentials_file(const ch
}
/**
- * Get the job log directory.
- * Ensures that the result is a realpath and that it is underneath the
- * tt log root.
- */
-char* get_job_log_directory(const char* jobid) {
- char* log_dir = get_value(TT_LOG_DIR_KEY);
- if (log_dir == NULL) {
- fprintf(LOGFILE, "Log directory %s is not configured.\n", TT_LOG_DIR_KEY);
- return NULL;
- }
- char *result = concatenate("%s/%s", "job log dir", 2, log_dir,
+ * Get the job log directory under the given log_root
+ */
+char* get_job_log_directory(const char *log_root, const char* jobid) {
+ return concatenate("%s/%s", "job log dir", 2, log_root,
jobid);
- if (result == NULL) {
- fprintf(LOGFILE, "failed to get memory in get_job_log_directory for %s"
- " and %s\n", log_dir, jobid);
- }
- free(log_dir);
- return result;
}
/*
@@ -345,7 +332,8 @@ static int create_attempt_directories(co
"Either task_id is null or the user passed is null.\n");
return -1;
}
- int result = 0;
+
+ int result = -1;
char **local_dir = get_values(TT_SYS_DIR_KEY);
@@ -366,11 +354,16 @@ static int create_attempt_directories(co
// continue on to create other task directories
free(task_dir);
} else {
+ result = 0;
free(task_dir);
}
}
free_values(local_dir);
+ if (result != 0) {
+ return result;
+ }
+ result = -1;
// also make the directory for the task logs
char *job_task_name = malloc(strlen(job_id) + strlen(task_id) + 2);
if (job_task_name == NULL) {
@@ -378,14 +371,28 @@ static int create_attempt_directories(co
result = -1;
} else {
sprintf(job_task_name, "%s/%s", job_id, task_id);
- char *log_dir = get_job_log_directory(job_task_name);
- free(job_task_name);
+
+ char **log_dir = get_values(TT_LOG_DIR_KEY);
if (log_dir == NULL) {
- result = -1;
- } else if (mkdirs(log_dir, perms) != 0) {
- result = -1;
+ fprintf(LOGFILE, "%s is not configured.\n", TT_LOG_DIR_KEY);
+ return -1;
+ }
+
+ char **log_dir_ptr;
+ for(log_dir_ptr = log_dir; *log_dir_ptr != NULL; ++log_dir_ptr) {
+ char *job_log_dir = get_job_log_directory(*log_dir_ptr, job_task_name);
+ if (job_log_dir == NULL) {
+ free_values(log_dir);
+ return -1;
+ } else if (mkdirs(job_log_dir, perms) != 0) {
+ free(job_log_dir);
+ } else {
+ result = 0;
+ free(job_log_dir);
+ }
}
- free(log_dir);
+ free(job_task_name);
+ free_values(log_dir);
}
return result;
}
@@ -654,22 +661,39 @@ int initialize_job(const char *user, con
return INVALID_ARGUMENT_NUMBER;
}
- // create the user directory
+ // create the user directory on all disks
int result = initialize_user(user);
if (result != 0) {
return result;
}
- // create the log directory for the job
- char *job_log_dir = get_job_log_directory(jobid);
- if (job_log_dir == NULL) {
- return -1;
+ ////////////// create the log directories for the app on all disks
+ char **log_roots = get_values(TT_LOG_DIR_KEY);
+ if (log_roots == NULL) {
+ return INVALID_CONFIG_FILE;
}
- result = create_directory_for_user(job_log_dir);
- free(job_log_dir);
- if (result != 0) {
+ char **log_root;
+ char *any_one_job_log_dir = NULL;
+ for(log_root=log_roots; *log_root != NULL; ++log_root) {
+ char *job_log_dir = get_job_log_directory(*log_root, jobid);
+ if (job_log_dir == NULL) {
+ // try the next one
+ } else if (create_directory_for_user(job_log_dir) != 0) {
+ free(job_log_dir);
+ return -1;
+ } else if (any_one_job_log_dir == NULL) {
+ any_one_job_log_dir = job_log_dir;
+ } else {
+ free(job_log_dir);
+ }
+ }
+ free_values(log_roots);
+ if (any_one_job_log_dir == NULL) {
+ fprintf(LOGFILE, "Did not create any job-log directories\n");
return -1;
}
+ free(any_one_job_log_dir);
+ ////////////// End of creating the log directories for the app on all disks
// open up the credentials file
int cred_file = open_file_as_task_tracker(nmPrivate_credentials_file);
@@ -711,6 +735,8 @@ int initialize_job(const char *user, con
}
char *nmPrivate_credentials_file_copy = strdup(nmPrivate_credentials_file);
+ // TODO: FIXME. The user's copy of creds should go to a path selected by
+ // localDirAllocatoir
char *cred_file_name = concatenate("%s/%s", "cred file", 2,
primary_job_dir, basename(nmPrivate_credentials_file_copy));
if (cred_file_name == NULL) {
@@ -1014,6 +1040,7 @@ int delete_as_user(const char *user,
char** ptr;
+ // TODO: No switching user? !!!!
if (baseDirs == NULL || *baseDirs == NULL) {
return delete_path(subdir, strlen(subdir) == 0);
}
@@ -1038,7 +1065,8 @@ int delete_as_user(const char *user,
* delete a given log directory
*/
int delete_log_directory(const char *subdir) {
- char* log_subdir = get_job_log_directory(subdir);
+ // TODO: FIXME. This isn't used at all!!!!!
+ char* log_subdir = get_job_log_directory(subdir, subdir); // TODO: FIX
int ret = -1;
if (log_subdir != NULL) {
ret = delete_path(log_subdir, strchr(subdir, '/') == NULL);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h Tue May 10 15:18:58 2011
@@ -134,11 +134,9 @@ char *get_task_launcher_file(const char*
char *get_task_credentials_file(const char* work_dir);
/**
- * Get the job log directory.
- * Ensures that the result is a realpath and that it is underneath the
- * tt log root.
+ * Get the job log directory under log_root
*/
-char* get_job_log_directory(const char* jobid);
+char* get_job_log_directory(const char* log_root, const char* jobid);
char *get_task_log_dir(const char *log_dir, const char *job_id,
const char *attempt_id);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Tue May 10 15:18:58 2011
@@ -67,14 +67,14 @@ public abstract class ContainerExecutor
* Copy $rsrc -> $N/$user/$appId/filecache/idef
* @param user user name of application owner
* @param appId id of the application
- * @param nmLocal path to localized credentials, rsrc by NM
+ * @param nmPrivateContainerTokens path to localized credentials, rsrc by NM
* @param nmAddr RPC address to contact NM
* @throws IOException For most application init failures
* @throws InterruptedException If application init thread is halted by NM
*/
- public abstract void startLocalizer(Path nmLocal,
+ public abstract void startLocalizer(Path nmPrivateContainerTokens,
InetSocketAddress nmAddr, String user, String appId, String locId,
- Path logDir, List<Path> localDirs)
+ List<Path> localDirs)
throws IOException, InterruptedException;
/**
@@ -83,9 +83,9 @@ public abstract class ContainerExecutor
*
* @param launchCtxt
*/
- public abstract int launchContainer(Container container, Path nmLocal,
- String user, String appId, Path appLogDir, List<Path> appDirs)
- throws IOException;
+ public abstract int launchContainer(Container container,
+ Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
+ String user, String appId, Path containerWorkDir) throws IOException;
public abstract boolean signalContainer(String user, String pid,
Signal signal)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Tue May 10 15:18:58 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.UnsupportedF
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@@ -56,24 +57,25 @@ public class DefaultContainerExecutor ex
}
@Override
- public void startLocalizer(Path nmLocal, InetSocketAddress nmAddr,
- String user, String appId, String locId, Path logDir,
+ public void startLocalizer(Path nmPrivateContainerTokensPath,
+ InetSocketAddress nmAddr, String user, String appId, String locId,
List<Path> localDirs) throws IOException, InterruptedException {
ContainerLocalizer localizer =
- new ContainerLocalizer(user, appId, locId, logDir, localDirs);
+ new ContainerLocalizer(this.lfs, user, appId, locId,
+ localDirs, RecordFactoryProvider.getRecordFactory(getConf()));
createUserLocalDirs(localDirs, user);
createUserCacheDirs(localDirs, user);
createAppDirs(localDirs, user, appId);
- createAppLogDir(logDir, appId);
+ createAppLogDirs(appId);
- Path appStorageDir = getApplicationDir(localDirs, user, appId);
+ // TODO: Why pick first app dir. The same in LCE why not random?
+ Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_FMT, locId);
- Path appTokens = new Path(nmLocal, tokenFn);
Path tokenDst = new Path(appStorageDir, tokenFn);
- lfs.util().copy(appTokens, tokenDst);
+ lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
lfs.setWorkingDirectory(appStorageDir);
// TODO: DO it over RPC for maintaining similarity?
@@ -81,29 +83,39 @@ public class DefaultContainerExecutor ex
}
@Override
- public int launchContainer(Container container, Path nmLocal, String user,
- String appId, Path appLogDir, List<Path> appDirs) throws IOException {
- // create container dirs
+ public int launchContainer(Container container,
+ Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
+ String userName, String appId, Path containerWorkDir)
+ throws IOException {
+
+ // create container dirs on all disks
String containerIdStr = ConverterUtils.toString(container.getContainerID());
- for (Path p : appDirs) {
- lfs.mkdir(new Path(p, containerIdStr), null, false);
+ String appIdStr =
+ ConverterUtils.toString(container.getContainerID().getAppId());
+ String[] sLocalDirs =
+ getConf().getStrings(NMConfig.NM_LOCAL_DIR, NMConfig.DEFAULT_NM_LOCAL_DIR);
+ for (String sLocalDir : sLocalDirs) {
+ Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
+ Path userdir = new Path(usersdir, userName);
+ Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
+ Path appDir = new Path(appCacheDir, appIdStr);
+ Path containerDir = new Path(appDir, containerIdStr);
+ lfs.mkdir(containerDir, null, false);
}
- lfs.mkdir(new Path(appLogDir, containerIdStr), null, false);
+
+ // Create the container log-dirs on all disks
+ createContainerLogDirs(appIdStr, containerIdStr);
+
// copy launch script to work dir
- // TODO: ROUND_ROBIN Below
- Path appWorkDir = new Path(appDirs.get(0), containerIdStr);
- Path launchScript = new Path(nmLocal, ContainerLaunch.CONTAINER_SCRIPT);
- Path launchDst = new Path(appWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
- lfs.util().copy(launchScript, launchDst);
- // copy container tokens to work dir
- Path appTokens = new Path(nmLocal, String.format(
- ContainerLocalizer.TOKEN_FILE_FMT,
- containerIdStr));
+ Path launchDst =
+ new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
+ lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
+ // copy container tokens to work dir
Path tokenDst =
- new Path(appWorkDir, ContainerLaunch.CONTAINER_TOKENS);
+ new Path(containerWorkDir, ContainerLaunch.CONTAINER_TOKENS);
+ lfs.util().copy(nmPrivateTokensPath, tokenDst);
- lfs.util().copy(appTokens, tokenDst);
// create log dir under app
// fork script
ShellCommandExecutor shExec = null;
@@ -113,7 +125,7 @@ public class DefaultContainerExecutor ex
String[] command =
new String[] { "bash", "-c", launchDst.toUri().getPath().toString() };
shExec = new ShellCommandExecutor(command,
- new File(appWorkDir.toUri().getPath()));
+ new File(containerWorkDir.toUri().getPath()));
launchCommandObjs.put(container.getLaunchContext().getContainerId(), shExec);
shExec.execute();
} catch (Exception e) {
@@ -199,7 +211,7 @@ public class DefaultContainerExecutor ex
* $logdir/$user/$appId */
private static final short LOGDIR_PERM = (short)0710;
- private Path getApplicationDir(List<Path> localDirs, String user,
+ private Path getFirstApplicationDir(List<Path> localDirs, String user,
String appId) {
return getApplicationDir(localDirs.get(0), user, appId);
}
@@ -233,7 +245,7 @@ public class DefaultContainerExecutor ex
boolean userDirStatus = false;
FsPermission userperms = new FsPermission(USER_PERM);
for (Path localDir : localDirs) {
- // create $local.dir/usercache/$user
+ // create $local.dir/usercache/$user and its immediate parent
try {
lfs.mkdir(getUserCacheDir(localDir, user), userperms, true);
} catch (IOException e) {
@@ -328,26 +340,65 @@ public class DefaultContainerExecutor ex
+ "in any of the configured local directories for app "
+ appId.toString());
}
- // pick random work dir for compatibility
- // create $local.dir/usercache/$user/appcache/$appId/work
- Path workDir =
- new Path(getApplicationDir(localDirs, user, appId),
- ContainerLocalizer.WORKDIR);
- lfs.mkdir(workDir, null, true);
}
/**
- * Create application log directory.
+ * Create application log directories on all disks.
*/
- private void createAppLogDir(Path logDir, String appId)
+ private void createAppLogDirs(String appId)
throws IOException {
- Path appUserLogDir = new Path(logDir, appId);
- try {
- lfs.mkdir(appUserLogDir, new FsPermission(LOGDIR_PERM), true);
- } catch (IOException e) {
- throw new IOException(
- "Could not create app user log directory: " + appUserLogDir, e);
+ String[] rootLogDirs =
+ getConf()
+ .getStrings(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR);
+
+ boolean appLogDirStatus = false;
+ FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
+ for (String rootLogDir : rootLogDirs) {
+ // create $log.dir/$appid
+ Path appLogDir = new Path(rootLogDir, appId);
+ try {
+ lfs.mkdir(appLogDir, appLogDirPerms, true);
+ } catch (IOException e) {
+ LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
+ continue;
+ }
+ appLogDirStatus = true;
+ }
+ if (!appLogDirStatus) {
+ throw new IOException("Not able to initialize app-log directories "
+ + "in any of the configured local directories for app " + appId);
}
}
+ /**
+ * Create application log directories on all disks.
+ */
+ private void createContainerLogDirs(String appId, String containerId)
+ throws IOException {
+ String[] rootLogDirs =
+ getConf()
+ .getStrings(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR);
+
+ boolean containerLogDirStatus = false;
+ FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
+ for (String rootLogDir : rootLogDirs) {
+ // create $log.dir/$appid/$containerid
+ Path appLogDir = new Path(rootLogDir, appId);
+ Path containerLogDir = new Path(appLogDir, containerId);
+ try {
+ lfs.mkdir(containerLogDir, containerLogDirPerms, true);
+ } catch (IOException e) {
+ LOG.warn("Unable to create the container-log directory : "
+ + appLogDir, e);
+ continue;
+ }
+ containerLogDirStatus = true;
+ }
+ if (!containerLogDirStatus) {
+ throw new IOException(
+ "Not able to initialize container-log directories "
+ + "in any of the configured local directories for container "
+ + containerId);
+ }
+ }
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue May 10 15:18:58 2011
@@ -100,17 +100,15 @@ public class LinuxContainerExecutor exte
}
@Override
- public void startLocalizer(Path nmLocal, InetSocketAddress nmAddr,
- String user, String appId, String locId, Path logDir,
+ public void startLocalizer(Path nmPrivateContainerTokensPath,
+ InetSocketAddress nmAddr, String user, String appId, String locId,
List<Path> localDirs) throws IOException, InterruptedException {
- Path appTokens = new Path(nmLocal, String.format(
- ContainerLocalizer.TOKEN_FILE_FMT, locId));
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe,
user,
Integer.toString(Commands.INITIALIZE_JOB.getValue()),
appId,
- appTokens.toUri().getPath().toString()));
+ nmPrivateContainerTokensPath.toUri().getPath().toString()));
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");
command.add(jvm.toString());
@@ -122,7 +120,6 @@ public class LinuxContainerExecutor exte
command.add(locId);
command.add(nmAddr.getHostName());
command.add(Integer.toString(nmAddr.getPort()));
- command.add(logDir.toUri().getPath().toString());
for (Path p : localDirs) {
command.add(p.toUri().getPath().toString());
}
@@ -147,22 +144,19 @@ public class LinuxContainerExecutor exte
}
@Override
- public int launchContainer(Container container, Path nmLocal, String user,
- String appId, Path appLogDir, List<Path> appDirs) throws IOException {
- Path appWorkDir = new Path(appDirs.get(0), container.toString()); // TODO: Use ROUND_ROBIN
- Path launchScript = new Path(nmLocal, ContainerLaunch.CONTAINER_SCRIPT);
- Path nmPrivateAppTokenFile = new Path(nmLocal, String.format(
- ContainerLocalizer.TOKEN_FILE_FMT,
- ConverterUtils.toString(container.getContainerID())));
+ public int launchContainer(Container container,
+ Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
+ String user, String appId, Path containerWorkDir) throws IOException {
+
List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe,
user,
Integer.toString(Commands.LAUNCH_CONTAINER.getValue()),
appId,
container.toString(),
- appWorkDir.toString(),
- launchScript.toUri().getPath().toString(),
- nmPrivateAppTokenFile.toUri().getPath().toString()));
+ containerWorkDir.toString(),
+ nmPrivateCotainerScriptPath.toUri().getPath().toString(),
+ nmPrivateTokensPath.toUri().getPath().toString()));
String[] commandArray = command.toArray(new String[command.size()]);
ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
launchCommandObjs.put(container.getLaunchContext().getContainerId(), shExec);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue May 10 15:18:58 2011
@@ -51,10 +51,38 @@ public class NodeManager extends Composi
public NodeManager() {
super(NodeManager.class.getName());
+ }
+
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker);
+ }
+
+ protected NodeResourceMonitor createNodeResourceMonitor() {
+ return new NodeResourceMonitorImpl();
+ }
+
+ protected ContainerManagerImpl createContainerManager(Context context,
+ ContainerExecutor exec, DeletionService del,
+ NodeStatusUpdater nodeStatusUpdater) {
+ return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater);
+ }
+
+ protected WebServer createWebServer(Context nmContext,
+ ResourceView resourceView) {
+ return new WebServer(nmContext, resourceView);
+ }
+
+ protected void doSecureLogin() throws IOException {
+ SecurityUtil.login(getConfig(), NM_KEYTAB,
+ YarnServerConfig.NM_SERVER_PRINCIPAL_KEY);
+ }
+
+ @Override
+ public void init(Configuration conf) {
Context context = new NMContext();
- YarnConfiguration conf = new YarnConfiguration();
ContainerExecutor exec = ReflectionUtils.newInstance(
conf.getClass(NM_CONTAINER_EXECUTOR_CLASS,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
@@ -90,35 +118,7 @@ public class NodeManager extends Composi
dispatcher.register(ContainerManagerEventType.class, containerManager);
addService(dispatcher);
- }
-
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
- return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker);
- }
- protected NodeResourceMonitor createNodeResourceMonitor() {
- return new NodeResourceMonitorImpl();
- }
-
- protected ContainerManagerImpl createContainerManager(Context context,
- ContainerExecutor exec, DeletionService del,
- NodeStatusUpdater nodeStatusUpdater) {
- return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater);
- }
-
- protected WebServer createWebServer(Context nmContext,
- ResourceView resourceView) {
- return new WebServer(nmContext, resourceView);
- }
-
- protected void doSecureLogin() throws IOException {
- SecurityUtil.login(getConfig(), NM_KEYTAB,
- YarnServerConfig.NM_SERVER_PRINCIPAL_KEY);
- }
-
- @Override
- public void init(Configuration conf) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java Tue May 10 15:18:58 2011
@@ -20,13 +20,14 @@ package org.apache.hadoop.yarn.server.no
import java.util.List;
public interface LocalizerStatus {
- public String getLocalizerId();
- public List<LocalResourceStatus> getResources();
- public void setLocalizerId(String id);
- public void addAllResources(List<LocalResourceStatus> resources);
- public void addResourceStatus(LocalResourceStatus resource);
- public LocalResourceStatus getResourceStatus(int index);
- public void removeResource(int index);
- public void clearResources();
+ String getLocalizerId();
+ void setLocalizerId(String id);
+
+ List<LocalResourceStatus> getResources();
+ void addAllResources(List<LocalResourceStatus> resources);
+ void addResourceStatus(LocalResourceStatus resource);
+ LocalResourceStatus getResourceStatus(int index);
+ void removeResource(int index);
+ void clearResources();
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Tue May 10 15:18:58 2011
@@ -22,6 +22,9 @@ import static org.apache.hadoop.fs.Creat
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
@@ -30,21 +33,27 @@ import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> {
@@ -58,20 +67,17 @@ public class ContainerLaunch implements
private final ContainerExecutor exec;
private final Application app;
private final Container container;
- private final Path sysDir;
- private final Path appLogDir;
- private final List<Path> appDirs;
-
- public ContainerLaunch(Dispatcher dispatcher, ContainerExecutor exec,
- Application app, Container container, Path sysDir, Path appLogDir,
- List<Path> appDirs) {
+ private final Configuration conf;
+ private final LocalDirAllocator logDirsSelector;
+
+ public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
+ ContainerExecutor exec, Application app, Container container) {
+ this.conf = configuration;
this.app = app;
this.exec = exec;
- this.sysDir = sysDir;
- this.appLogDir = appLogDir;
- this.appDirs = appDirs;
this.container = container;
this.dispatcher = dispatcher;
+ this.logDirsSelector = new LocalDirAllocator(NMConfig.NM_LOG_DIR);
}
@Override
@@ -84,51 +90,90 @@ public class ContainerLaunch implements
final List<String> command = launchContext.getCommandList();
int ret = -1;
- // Variable expansion
- // Before the container script gets written out.
- List<String> cmds = container.getLaunchContext().getCommandList();
- List<String> newCmds = new ArrayList<String>(cmds.size());
- String containerIdStr = ConverterUtils.toString(container.getContainerID());
- Path containerLogDir = new Path(appLogDir, containerIdStr);
- for (String str : cmds) {
- newCmds.add(str.replace("<LOG_DIR>", containerLogDir.toUri()
- .getPath()));
- }
- container.getLaunchContext().clearCommands();
- container.getLaunchContext().addAllCommands(newCmds);
-
try {
+ // /////////////////////////// Variable expansion
+ // Before the container script gets written out.
+ List<String> cmds = container.getLaunchContext().getCommandList();
+ List<String> newCmds = new ArrayList<String>(cmds.size());
+ String containerIdStr = ConverterUtils.toString(container.getContainerID());
+ String appIdStr = app.toString();
+ Path containerLogDir =
+ this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
+ + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf,
+ false);
+ for (String str : cmds) {
+ newCmds.add(str.replace("<LOG_DIR>", containerLogDir.toUri()
+ .getPath()));
+ }
+ container.getLaunchContext().clearCommands();
+ container.getLaunchContext().addAllCommands(newCmds);
+ // /////////////////////////// End of variable expansion
+
FileContext lfs = FileContext.getLocalFSFileContext();
- Path launchSysDir = new Path(sysDir, containerIdStr);
- lfs.mkdir(launchSysDir, null, true);
- Path launchPath = new Path(launchSysDir, CONTAINER_SCRIPT);
- Path tokensPath =
- new Path(launchSysDir, String.format(
- ContainerLocalizer.TOKEN_FILE_FMT, containerIdStr));
- DataOutputStream launchOut = null;
- DataOutputStream tokensOut = null;
-
+ LocalDirAllocator lDirAllocator =
+ new LocalDirAllocator(NMConfig.NM_LOCAL_DIR); // TODO
+ Path nmPrivateContainerScriptPath =
+ lDirAllocator.getLocalPathForWrite(
+ ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ + appIdStr + Path.SEPARATOR + containerIdStr
+ + Path.SEPARATOR + CONTAINER_SCRIPT, this.conf);
+ Path nmPrivateTokensPath =
+ lDirAllocator.getLocalPathForWrite(
+ ResourceLocalizationService.NM_PRIVATE_DIR
+ + Path.SEPARATOR
+ + containerIdStr
+ + Path.SEPARATOR
+ + String.format(ContainerLocalizer.TOKEN_FILE_FMT,
+ containerIdStr), this.conf);
+ DataOutputStream containerScriptOutStream = null;
+ DataOutputStream tokensOutStream = null;
+
try {
- launchOut = lfs.create(launchPath, EnumSet.of(CREATE, OVERWRITE));
- ContainerLocalizer.writeLaunchEnv(launchOut, env, localResources,
+ // /////////// Write out the container-script in the nmPrivate space.
+ String[] localDirs =
+ this.conf.getStrings(NMConfig.NM_LOCAL_DIR,
+ NMConfig.DEFAULT_NM_LOCAL_DIR);
+ List<Path> appDirs = new ArrayList<Path>(localDirs.length);
+ for (String localDir : localDirs) {
+ Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
+ Path userdir = new Path(usersdir, user);
+ Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+ appDirs.add(new Path(appsdir, appIdStr));
+ }
+ containerScriptOutStream =
+ lfs.create(nmPrivateContainerScriptPath,
+ EnumSet.of(CREATE, OVERWRITE));
+ writeLaunchEnv(containerScriptOutStream, env, localResources,
command, appDirs);
-
- tokensOut = lfs.create(tokensPath, EnumSet.of(CREATE, OVERWRITE));
+ // /////////// End of writing out container-script
+
+ // /////////// Write out the container-tokens in the nmPrivate space.
+ tokensOutStream =
+ lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE));
Credentials creds = container.getCredentials();
- creds.writeTokenStorageToStream(tokensOut);
+ creds.writeTokenStorageToStream(tokensOutStream);
+ // /////////// End of writing out container-tokens
} finally {
- IOUtils.cleanup(LOG, launchOut, tokensOut);
- if (launchOut != null) {
- launchOut.close();
- }
+ IOUtils.cleanup(LOG, containerScriptOutStream, tokensOutStream);
}
+
+ // Select the working directory for the container
+ Path containerWorkDir =
+ lDirAllocator.getLocalPathForWrite(ContainerLocalizer.USERCACHE
+ + Path.SEPARATOR + user + Path.SEPARATOR
+ + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ + Path.SEPARATOR + containerIdStr,
+ LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
+
+ // LaunchContainer is a blocking call. We are here almost means the
+ // container is launched, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent(
container.getContainerID(),
ContainerEventType.CONTAINER_LAUNCHED));
ret =
- exec.launchContainer(container, launchSysDir, user, app.toString(),
- appLogDir, appDirs);
+ exec.launchContainer(container, nmPrivateContainerScriptPath,
+ nmPrivateTokensPath, user, appIdStr, containerWorkDir);
if (ret == ExitCode.KILLED.getExitCode()) {
// If the process was killed, Send container_cleanedup_after_kill and
// just break out of this method.
@@ -154,4 +199,99 @@ public class ContainerLaunch implements
return 0;
}
+ private static class ShellScriptBuilder {
+
+ private final StringBuilder sb;
+
+ public ShellScriptBuilder() {
+ this(new StringBuilder("#!/bin/bash\n\n"));
+ }
+
+ protected ShellScriptBuilder(StringBuilder sb) {
+ this.sb = sb;
+ }
+
+ public ShellScriptBuilder env(String key, String value) {
+ line("export ", key, "=\"", value, "\"");
+ return this;
+ }
+
+ public ShellScriptBuilder symlink(Path src, String dst) throws IOException {
+ return symlink(src, new Path(dst));
+ }
+
+ public ShellScriptBuilder symlink(Path src, Path dst) throws IOException {
+ if (!src.isAbsolute()) {
+ throw new IOException("Source must be absolute");
+ }
+ if (dst.isAbsolute()) {
+ throw new IOException("Destination must be relative");
+ }
+ if (dst.toUri().getPath().indexOf('/') != -1) {
+ line("mkdir -p ", dst.getParent().toString());
+ }
+ line("ln -sf ", src.toUri().getPath(), " ", dst.toString());
+ return this;
+ }
+
+ public void write(PrintStream out) throws IOException {
+ out.append(sb);
+ }
+
+ public void line(String... command) {
+ for (String s : command) {
+ sb.append(s);
+ }
+ sb.append("\n");
+ }
+
+ @Override
+ public String toString() {
+ return sb.toString();
+ }
+
+ }
+
+ private static void writeLaunchEnv(OutputStream out,
+ Map<String,String> environment, Map<Path,String> resources,
+ List<String> command, List<Path> appDirs)
+ throws IOException {
+ ShellScriptBuilder sb = new ShellScriptBuilder();
+ if (System.getenv("YARN_HOME") != null) {
+ sb.env("YARN_HOME", System.getenv("YARN_HOME"));
+ }
+ sb.env(YARNApplicationConstants.LOCAL_DIR_ENV,
+ StringUtils.join(",", appDirs));
+ if (environment != null) {
+ for (Map.Entry<String,String> env : environment.entrySet()) {
+ sb.env(env.getKey().toString(), env.getValue().toString());
+ }
+ }
+ if (resources != null) {
+ for (Map.Entry<Path,String> link : resources.entrySet()) {
+ sb.symlink(link.getKey(), link.getValue());
+ }
+ }
+ ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
+ cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec ");
+ cmd.add("/bin/bash ");
+ cmd.add("-c ");
+ cmd.add("\"");
+ for (String cs : command) {
+ cmd.add(cs.toString());
+ cmd.add(" ");
+ }
+ cmd.add("\"");
+ sb.line(cmd.toArray(new String[cmd.size()]));
+ PrintStream pout = null;
+ try {
+ pout = new PrintStream(out);
+ sb.write(pout);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Tue May 10 15:18:58 2011
@@ -18,16 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR;
-
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -35,7 +28,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -45,7 +38,6 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -64,9 +56,6 @@ public class ContainersLauncher extends
private final Dispatcher dispatcher;
private final ExecutorService containerLauncher =
Executors.newCachedThreadPool();
- private List<Path> logDirs;
- private List<Path> localDirs;
- private List<Path> sysDirs;
private final Map<ContainerId,RunningContainer> running =
Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>());
@@ -91,32 +80,11 @@ public class ContainersLauncher extends
@Override
public void init(Configuration conf) {
- // TODO factor this out of Localizer
try {
FileContext lfs = FileContext.getLocalFSFileContext(conf);
- String[] sLocalDirs = conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
-
- localDirs = new ArrayList<Path>(sLocalDirs.length);
- logDirs = new ArrayList<Path>(sLocalDirs.length);
- sysDirs = new ArrayList<Path>(sLocalDirs.length);
- for (String sLocaldir : sLocalDirs) {
- Path localdir = new Path(sLocaldir);
- localDirs.add(localdir);
- // $local/nmPrivate
- Path sysdir = new Path(localdir, ResourceLocalizationService.NM_PRIVATE_DIR);
- sysDirs.add(sysdir);
- }
- String[] sLogdirs = conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
- for (String sLogdir : sLogdirs) {
- Path logdir = new Path(sLogdir);
- logDirs.add(logdir);
- }
- } catch (IOException e) {
+ } catch (UnsupportedFileSystemException e) {
throw new YarnException("Failed to start ContainersLauncher", e);
}
- localDirs = Collections.unmodifiableList(localDirs);
- logDirs = Collections.unmodifiableList(logDirs);
- sysDirs = Collections.unmodifiableList(sysDirs);
super.init(conf);
}
@@ -137,24 +105,10 @@ public class ContainersLauncher extends
Application app =
context.getApplications().get(containerId.getAppId());
String appIdStr = ConverterUtils.toString(app.getAppId());
- List<Path> appDirs = new ArrayList<Path>(localDirs.size());
- for (Path p : localDirs) {
- Path usersdir = new Path(p, ContainerLocalizer.USERCACHE);
- Path userdir = new Path(usersdir, userName);
- Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
- appDirs.add(new Path(appsdir, appIdStr));
- }
- Path appSysDir =
- new Path(sysDirs.get(0), appIdStr);
- Path appLogDir =
- new Path(logDirs.get(0), appIdStr);
- // TODO: ROUND_ROBIN above.
// TODO set in Application
- //Path appLogDir = new Path(logDirs.get(0), app.toString());
- ContainerLaunch launch =
- new ContainerLaunch(dispatcher, exec, app,
- event.getContainer(), appSysDir, appLogDir, appDirs);
- // TODO: ROUND_ROBIN above.
+ ContainerLaunch launch =
+ new ContainerLaunch(getConfig(), dispatcher, exec, app,
+ event.getContainer());
running.put(containerId,
new RunningContainer(userName,
containerLauncher.submit(launch)));
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Tue May 10 15:18:58 2011
@@ -20,14 +20,8 @@ package org.apache.hadoop.yarn.server.no
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-
import java.net.InetSocketAddress;
-
import java.security.PrivilegedAction;
-import java.security.PrivilegedExceptionAction;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -56,16 +50,12 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -91,7 +81,6 @@ public class ContainerLocalizer {
private final String user;
private final String appId;
- private final Path logDir;
private final List<Path> localDirs;
private final String localizerId;
private final FileContext lfs;
@@ -101,16 +90,8 @@ public class ContainerLocalizer {
private final RecordFactory recordFactory;
private final Map<LocalResource,Future<Path>> pendingResources;
- public ContainerLocalizer(String user, String appId, String localizerId,
- Path logDir, List<Path> localDirs) throws IOException {
- this(FileContext.getLocalFSFileContext(), user, appId, localizerId, logDir,
- localDirs, new HashMap<LocalResource,Future<Path>>(),
- RecordFactoryProvider.getRecordFactory(null));
- }
-
- ContainerLocalizer(FileContext lfs, String user, String appId,
- String localizerId, Path logDir, List<Path> localDirs,
- Map<LocalResource,Future<Path>> pendingResources,
+ public ContainerLocalizer(FileContext lfs, String user, String appId,
+ String localizerId, List<Path> localDirs,
RecordFactory recordFactory) throws IOException {
if (null == user) {
throw new IOException("Cannot initialize for null user");
@@ -121,7 +102,6 @@ public class ContainerLocalizer {
this.lfs = lfs;
this.user = user;
this.appId = appId;
- this.logDir = logDir;
this.localDirs = localDirs;
this.localizerId = localizerId;
this.recordFactory = recordFactory;
@@ -130,7 +110,7 @@ public class ContainerLocalizer {
new LocalDirAllocator(String.format(APPCACHE_CTXT_FMT, appId));
this.userDirs =
new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, appId));
- this.pendingResources = pendingResources;
+ this.pendingResources = new HashMap<LocalResource,Future<Path>>();
}
LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
@@ -148,11 +128,12 @@ public class ContainerLocalizer {
public int runLocalization(final InetSocketAddress nmAddr)
throws IOException, InterruptedException {
// load credentials
- initDirs(conf, user, appId, lfs, logDir, localDirs);
+ initDirs(conf, user, appId, lfs, localDirs);
final Credentials creds = new Credentials();
DataInputStream credFile = null;
try {
// assume credentials in cwd
+ // TODO: Fix
credFile = lfs.open(
new Path(String.format(TOKEN_FILE_FMT, localizerId)));
creds.readTokenStorageStream(credFile);
@@ -191,6 +172,8 @@ public class ContainerLocalizer {
localizeFiles(nodeManager, exec, ugi);
return 0;
} catch (Throwable e) {
+ // Print traces to stdout so that they can be logged by the NM address
+ // space.
e.printStackTrace(System.out);
return -1;
} finally {
@@ -201,6 +184,7 @@ public class ContainerLocalizer {
}
ExecutorService createDownloadThreadPool() {
+ // TODO: Only Single thread?
return Executors.newSingleThreadExecutor();
}
@@ -213,7 +197,7 @@ public class ContainerLocalizer {
TimeUnit.SECONDS.sleep(duration);
}
- void localizeFiles(LocalizationProtocol nodemanager, ExecutorService exec,
+ private void localizeFiles(LocalizationProtocol nodemanager, ExecutorService exec,
UserGroupInformation ugi) {
while (true) {
try {
@@ -236,6 +220,7 @@ public class ContainerLocalizer {
lda = appDirs;
break;
}
+ // TODO: Synchronization??
pendingResources.put(r, exec.submit(download(lda, r, ugi)));
}
}
@@ -263,9 +248,17 @@ public class ContainerLocalizer {
}
}
- LocalizerStatus createStatus() throws InterruptedException {
+ /**
+ * Create the payload for the HeartBeat. Mainly the list of
+ * {@link LocalResourceStatus}es
+ *
+ * @return a {@link LocalizerStatus} that can be sent via heartbeat.
+ * @throws InterruptedException
+ */
+ private LocalizerStatus createStatus() throws InterruptedException {
final List<LocalResourceStatus> currentResources =
new ArrayList<LocalResourceStatus>();
+ // TODO: Synchronization??
for (Iterator<LocalResource> i = pendingResources.keySet().iterator();
i.hasNext();) {
LocalResource rsrc = i.next();
@@ -315,8 +308,7 @@ public class ContainerLocalizer {
String locId = argv[2];
InetSocketAddress nmAddr =
new InetSocketAddress(argv[3], Integer.parseInt(argv[4]));
- Path logDir = new Path(argv[5]);
- String[] sLocaldirs = Arrays.copyOfRange(argv, 6, argv.length);
+ String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length);
ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
for (String sLocaldir : sLocaldirs) {
localDirs.add(new Path(sLocaldir));
@@ -330,7 +322,9 @@ public class ContainerLocalizer {
}
ContainerLocalizer localizer =
- new ContainerLocalizer(user, appId, locId, logDir, localDirs);
+ new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
+ appId, locId, localDirs,
+ RecordFactoryProvider.getRecordFactory(null));
System.exit(localizer.runLocalization(nmAddr));
} catch (Throwable e) {
// Print error to stdout so that LCE can use it.
@@ -339,130 +333,33 @@ public class ContainerLocalizer {
}
}
+ // TODO: Won't there be a race for creating these dirs.
private static void initDirs(Configuration conf, String user, String appId,
- FileContext lfs, Path logDir, List<Path> localDirs) throws IOException {
+ FileContext lfs, List<Path> localDirs) throws IOException {
if (null == localDirs || 0 == localDirs.size()) {
throw new IOException("Cannot initialize without local dirs");
}
- String[] appCacheDirs = new String[localDirs.size()];
- String[] userCacheDirs = new String[localDirs.size()];
+ String[] appsFileCacheDirs = new String[localDirs.size()];
+ String[] usersFileCacheDirs = new String[localDirs.size()];
for (int i = 0, n = localDirs.size(); i < n; ++i) {
// $x/usercache/$user
Path base = lfs.makeQualified(
new Path(new Path(localDirs.get(i), USERCACHE), user));
// $x/usercache/$user/filecache
- Path uCacheDir = new Path(base, FILECACHE);
- userCacheDirs[i] = uCacheDir.toString();
- lfs.mkdir(uCacheDir, null, true);
+ Path userFileCacheDir = new Path(base, FILECACHE);
+ usersFileCacheDirs[i] = userFileCacheDir.toString();
+ lfs.mkdir(userFileCacheDir, null, false);
// $x/usercache/$user/appcache/$appId
Path appBase = new Path(base, new Path(APPCACHE, appId));
- lfs.mkdir(appBase, null, true);
// $x/usercache/$user/appcache/$appId/filecache
- Path aCacheDir = new Path(appBase, FILECACHE);
- appCacheDirs[i] = aCacheDir.toString();
- lfs.mkdir(aCacheDir, null, true);
+ Path appFileCacheDir = new Path(appBase, FILECACHE);
+ appsFileCacheDirs[i] = appFileCacheDir.toString();
+ lfs.mkdir(appFileCacheDir, null, false);
// $x/usercache/$user/appcache/$appId/output
- lfs.mkdir(new Path(appBase, OUTPUTDIR), null, true);
- }
- conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appCacheDirs);
- conf.setStrings(String.format(USERCACHE_CTXT_FMT, appId), userCacheDirs);
- Path appLogDir = new Path(logDir, appId);
- lfs.mkdir(appLogDir, null, true);
- }
-
- public static void writeLaunchEnv(OutputStream out,
- Map<String,String> environment, Map<Path,String> resources,
- List<String> command, List<Path> appDirs)
- throws IOException {
- ShellScriptBuilder sb = new ShellScriptBuilder();
- if (System.getenv("YARN_HOME") != null) {
- sb.env("YARN_HOME", System.getenv("YARN_HOME"));
- }
- sb.env(YARNApplicationConstants.LOCAL_DIR_ENV,
- StringUtils.join(",", appDirs));
- if (environment != null) {
- for (Map.Entry<String,String> env : environment.entrySet()) {
- sb.env(env.getKey().toString(), env.getValue().toString());
- }
- }
- if (resources != null) {
- for (Map.Entry<Path,String> link : resources.entrySet()) {
- sb.symlink(link.getKey(), link.getValue());
- }
- }
- ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
- cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec ");
- cmd.add("/bin/bash ");
- cmd.add("-c ");
- cmd.add("\"");
- for (String cs : command) {
- cmd.add(cs.toString());
- cmd.add(" ");
- }
- cmd.add("\"");
- sb.line(cmd.toArray(new String[cmd.size()]));
- PrintStream pout = null;
- try {
- pout = new PrintStream(out);
- sb.write(pout);
- } finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
- private static class ShellScriptBuilder {
-
- private final StringBuilder sb;
-
- public ShellScriptBuilder() {
- this(new StringBuilder("#!/bin/bash\n\n"));
- }
-
- protected ShellScriptBuilder(StringBuilder sb) {
- this.sb = sb;
- }
-
- public ShellScriptBuilder env(String key, String value) {
- line("export ", key, "=\"", value, "\"");
- return this;
- }
-
- public ShellScriptBuilder symlink(Path src, String dst) throws IOException {
- return symlink(src, new Path(dst));
- }
-
- public ShellScriptBuilder symlink(Path src, Path dst) throws IOException {
- if (!src.isAbsolute()) {
- throw new IOException("Source must be absolute");
- }
- if (dst.isAbsolute()) {
- throw new IOException("Destination must be relative");
- }
- if (dst.toUri().getPath().indexOf('/') != -1) {
- line("mkdir -p ", dst.getParent().toString());
- }
- line("ln -sf ", src.toUri().getPath(), " ", dst.toString());
- return this;
- }
-
- public void write(PrintStream out) throws IOException {
- out.append(sb);
- }
-
- public void line(String... command) {
- for (String s : command) {
- sb.append(s);
- }
- sb.append("\n");
+ lfs.mkdir(new Path(appBase, OUTPUTDIR), null, false);
}
-
- @Override
- public String toString() {
- return sb.toString();
- }
-
+ conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
+ conf.setStrings(String.format(USERCACHE_CTXT_FMT, appId), usersFileCacheDirs);
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Tue May 10 15:18:58 2011
@@ -18,11 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
-public interface LocalResourcesTracker extends EventHandler<ResourceEvent> {
+/**
+ * Component tracking resources all of the same {@link LocalResourceVisibility}
+ *
+ */
+interface LocalResourcesTracker extends EventHandler<ResourceEvent> {
- public boolean contains(LocalResourceRequest resource);
+ // TODO: Not used at all!!
+ boolean contains(LocalResourceRequest resource);
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue May 10 15:18:58 2011
@@ -21,9 +21,15 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+/**
+ * A collection of {@link LocalizedResource}s all of same
+ * {@link LocalResourceVisibility}.
+ *
+ */
class LocalResourcesTrackerImpl implements LocalResourcesTracker {
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
@@ -36,8 +42,9 @@ class LocalResourcesTrackerImpl implemen
this.dispatcher = dispatcher;
}
+ @Override
public void handle(ResourceEvent event) {
- LocalResourceRequest req = event.getLocalResource();
+ LocalResourceRequest req = event.getLocalResourceRequest();
LocalizedResource rsrc = localrsrc.get(req);
switch (event.getType()) {
case REQUEST:
@@ -57,6 +64,7 @@ class LocalResourcesTrackerImpl implemen
rsrc.handle(event);
}
+ @Override
public boolean contains(LocalResourceRequest resource) {
return localrsrc.contains(resource);
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Tue May 10 15:18:58 2011
@@ -41,6 +41,11 @@ import org.apache.hadoop.yarn.state.Sing
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
+/**
+ * Datum representing a localized resource. Holds the statemachine of a
+ * resource. State of the resource is one of {@link ResourceState}.
+ *
+ */
public class LocalizedResource implements EventHandler<ResourceEvent> {
private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
@@ -52,13 +57,15 @@ public class LocalizedResource implement
final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
stateMachine;
final Semaphore sem = new Semaphore(1);
- final Queue<ContainerId> ref;
+ final Queue<ContainerId> ref; // Queue of containers using this localized
+ // resource
final AtomicLong timestamp = new AtomicLong(currentTime());
private static final StateMachineFactory<LocalizedResource,ResourceState,
ResourceEventType,ResourceEvent> stateMachineFactory =
new StateMachineFactory<LocalizedResource,ResourceState,
ResourceEventType,ResourceEvent>(ResourceState.INIT)
+
// From INIT (ref == 0, awaiting req)
.addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
ResourceEventType.REQUEST, new FetchResourceTransition())
@@ -66,14 +73,16 @@ public class LocalizedResource implement
ResourceEventType.LOCALIZED, new FetchDirectTransition())
.addTransition(ResourceState.INIT, ResourceState.INIT,
ResourceEventType.RELEASE, new ReleaseTransition())
+
// From DOWNLOADING (ref > 0, may be localizing)
.addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
- ResourceEventType.REQUEST, new FetchResourceTransition())
+ ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!!
.addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED,
ResourceEventType.LOCALIZED, new FetchSuccessTransition())
.addTransition(ResourceState.DOWNLOADING,
EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
ResourceEventType.RELEASE, new ReleasePendingTransition())
+
// From LOCALIZED (ref >= 0, on disk)
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.REQUEST, new LocalizedResourceTransition())
@@ -132,8 +141,10 @@ public class LocalizedResource implement
sem.release();
}
+ @Override
public synchronized void handle(ResourceEvent event) {
stateMachine.doTransition(event.getType(), event);
+ // TODO: Invalid transitions?
}
static abstract class ResourceTransition implements