You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/09/14 09:26:38 UTC
svn commit: r1170459 [1/2] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/...
Author: mahadev
Date: Wed Sep 14 07:26:37 2011
New Revision: 1170459
URL: http://svn.apache.org/viewvc?rev=1170459&view=rev
Log:
MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a ContainerLaunchContext (Arun Murthy via mahadev)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Sep 14 07:26:37 2011
@@ -289,6 +289,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2676. MR-279: JobHistory Job page needs reformatted. (Robert Evans via
mahadev)
+ MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a
+ ContainerLaunchContext (Arun Murthy via mahadev)
+
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed Sep 14 07:26:37 2011
@@ -579,13 +579,12 @@ public abstract class TaskAttemptImpl im
+ remoteJobConfPath.toUri().toASCIIString());
// //////////// End of JobConf setup
-
// Setup DistributedCache
- setupDistributedCache(remoteFS, conf, localResources, environment);
+ MRApps.setupDistributedCache(conf, localResources, environment);
// Set local-resources and environment
container.setLocalResources(localResources);
- container.setEnv(environment);
+ container.setEnvironment(environment);
// Setup up tokens
Credentials taskCredentials = new Credentials();
@@ -618,7 +617,7 @@ public abstract class TaskAttemptImpl im
ShuffleHandler.serializeServiceData(jobToken));
container.setServiceData(serviceData);
- MRApps.addToClassPath(container.getEnv(), getInitialClasspath());
+ MRApps.addToClassPath(container.getEnvironment(), getInitialClasspath());
} catch (IOException e) {
throw new YarnException(e);
}
@@ -645,7 +644,7 @@ public abstract class TaskAttemptImpl im
taskAttemptListener.getAddress(), remoteTask, javaHome,
workDir.toString(), containerLogDir, childTmpDir, jvmID));
- MapReduceChildJVM.setVMEnv(container.getEnv(), classPaths,
+ MapReduceChildJVM.setVMEnv(container.getEnvironment(), classPaths,
workDir.toString(), containerLogDir, nmLdLibraryPath, remoteTask,
localizedApplicationTokensFile);
@@ -656,116 +655,6 @@ public abstract class TaskAttemptImpl im
return container;
}
- private static long[] parseTimeStamps(String[] strs) {
- if (null == strs) {
- return null;
- }
- long[] result = new long[strs.length];
- for(int i=0; i < strs.length; ++i) {
- result[i] = Long.parseLong(strs[i]);
- }
- return result;
- }
-
- private void setupDistributedCache(FileSystem remoteFS,
- Configuration conf,
- Map<String, LocalResource> localResources,
- Map<String, String> env)
- throws IOException {
-
- // Cache archives
- parseDistributedCacheArtifacts(remoteFS, localResources, env,
- LocalResourceType.ARCHIVE,
- DistributedCache.getCacheArchives(conf),
- parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
- getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
- DistributedCache.getArchiveVisibilities(conf),
- DistributedCache.getArchiveClassPaths(conf));
-
- // Cache files
- parseDistributedCacheArtifacts(remoteFS,
- localResources, env,
- LocalResourceType.FILE,
- DistributedCache.getCacheFiles(conf),
- parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
- getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
- DistributedCache.getFileVisibilities(conf),
- DistributedCache.getFileClassPaths(conf));
- }
-
- // TODO - Move this to MR!
- // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
- // long[], boolean[], Path[], FileType)
- private void parseDistributedCacheArtifacts(
- FileSystem remoteFS,
- Map<String, LocalResource> localResources,
- Map<String, String> env,
- LocalResourceType type,
- URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
- Path[] pathsToPutOnClasspath) throws IOException {
-
- if (uris != null) {
- // Sanity check
- if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
- (uris.length != visibilities.length)) {
- throw new IllegalArgumentException("Invalid specification for " +
- "distributed-cache artifacts of type " + type + " :" +
- " #uris=" + uris.length +
- " #timestamps=" + timestamps.length +
- " #visibilities=" + visibilities.length
- );
- }
-
- Map<String, Path> classPaths = new HashMap<String, Path>();
- if (pathsToPutOnClasspath != null) {
- for (Path p : pathsToPutOnClasspath) {
- p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory()));
- classPaths.put(p.toUri().getPath().toString(), p);
- }
- }
- for (int i = 0; i < uris.length; ++i) {
- URI u = uris[i];
- Path p = new Path(u);
- p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory()));
- // Add URI fragment or just the filename
- Path name = new Path((null == u.getFragment())
- ? p.getName()
- : u.getFragment());
- if (name.isAbsolute()) {
- throw new IllegalArgumentException("Resource name must be relative");
- }
- String linkName = name.toUri().getPath();
- localResources.put(
- linkName,
- BuilderUtils.newLocalResource(
- p.toUri(), type,
- visibilities[i]
- ? LocalResourceVisibility.PUBLIC
- : LocalResourceVisibility.PRIVATE,
- sizes[i], timestamps[i])
- );
- if (classPaths.containsKey(u.getPath())) {
- MRApps.addToClassPath(env, linkName);
- }
- }
- }
- }
-
- // TODO - Move this to MR!
- private static long[] getFileSizes(Configuration conf, String key) {
- String[] strs = conf.getStrings(key);
- if (strs == null) {
- return null;
- }
- long[] result = new long[strs.length];
- for(int i=0; i < strs.length; ++i) {
- result[i] = Long.parseLong(strs[i]);
- }
- return result;
- }
-
@Override
public ContainerId getAssignedContainerID() {
readLock.lock();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Wed Sep 14 07:26:37 2011
@@ -25,14 +25,20 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.net.URI;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -42,12 +48,18 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* Helper class for MR applications
*/
+@Private
+@Unstable
public class MRApps extends Apps {
public static final String JOB = "job";
public static final String TASK = "task";
@@ -232,4 +244,121 @@ public class MRApps extends Apps {
jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
return jobFile.toString();
}
+
+
+
+ private static long[] parseTimeStamps(String[] strs) {
+ if (null == strs) {
+ return null;
+ }
+ long[] result = new long[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = Long.parseLong(strs[i]);
+ }
+ return result;
+ }
+
+ public static void setupDistributedCache(
+ Configuration conf,
+ Map<String, LocalResource> localResources,
+ Map<String, String> env)
+ throws IOException {
+
+ // Cache archives
+ parseDistributedCacheArtifacts(conf, localResources, env,
+ LocalResourceType.ARCHIVE,
+ DistributedCache.getCacheArchives(conf),
+ parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
+ getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
+ DistributedCache.getArchiveVisibilities(conf),
+ DistributedCache.getArchiveClassPaths(conf));
+
+ // Cache files
+ parseDistributedCacheArtifacts(conf,
+ localResources, env,
+ LocalResourceType.FILE,
+ DistributedCache.getCacheFiles(conf),
+ parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
+ getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
+ DistributedCache.getFileVisibilities(conf),
+ DistributedCache.getFileClassPaths(conf));
+ }
+
+ // TODO - Move this to MR!
+ // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
+ // long[], boolean[], Path[], FileType)
+ private static void parseDistributedCacheArtifacts(
+ Configuration conf,
+ Map<String, LocalResource> localResources,
+ Map<String, String> env,
+ LocalResourceType type,
+ URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
+ Path[] pathsToPutOnClasspath) throws IOException {
+
+ if (uris != null) {
+ // Sanity check
+ if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+ (uris.length != visibilities.length)) {
+ throw new IllegalArgumentException("Invalid specification for " +
+ "distributed-cache artifacts of type " + type + " :" +
+ " #uris=" + uris.length +
+ " #timestamps=" + timestamps.length +
+ " #visibilities=" + visibilities.length
+ );
+ }
+
+ Map<String, Path> classPaths = new HashMap<String, Path>();
+ if (pathsToPutOnClasspath != null) {
+ for (Path p : pathsToPutOnClasspath) {
+ FileSystem remoteFS = p.getFileSystem(conf);
+ p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory()));
+ classPaths.put(p.toUri().getPath().toString(), p);
+ }
+ }
+ for (int i = 0; i < uris.length; ++i) {
+ URI u = uris[i];
+ Path p = new Path(u);
+ FileSystem remoteFS = p.getFileSystem(conf);
+ p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+ remoteFS.getWorkingDirectory()));
+ // Add URI fragment or just the filename
+ Path name = new Path((null == u.getFragment())
+ ? p.getName()
+ : u.getFragment());
+ if (name.isAbsolute()) {
+ throw new IllegalArgumentException("Resource name must be relative");
+ }
+ String linkName = name.toUri().getPath();
+ localResources.put(
+ linkName,
+ BuilderUtils.newLocalResource(
+ p.toUri(), type,
+ visibilities[i]
+ ? LocalResourceVisibility.PUBLIC
+ : LocalResourceVisibility.PRIVATE,
+ sizes[i], timestamps[i])
+ );
+ if (classPaths.containsKey(u.getPath())) {
+ MRApps.addToClassPath(env, linkName);
+ }
+ }
+ }
+ }
+
+ // TODO - Move this to MR!
+ private static long[] getFileSizes(Configuration conf, String key) {
+ String[] strs = conf.getStrings(key);
+ if (strs == null) {
+ return null;
+ }
+ long[] result = new long[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = Long.parseLong(strs[i]);
+ }
+ return result;
+ }
+
+
+
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Wed Sep 14 07:26:37 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -33,7 +32,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -55,7 +53,6 @@ import org.apache.hadoop.mapreduce.TaskR
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.MRConstants;
@@ -72,6 +69,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -237,7 +235,6 @@ public class YARNRunner implements Clien
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
- setupDistributedCache(conf, appContext);
// XXX Remove
in.close();
@@ -273,16 +270,18 @@ public class YARNRunner implements Clien
public ApplicationSubmissionContext createApplicationSubmissionContext(
Configuration jobConf,
String jobSubmitDir, Credentials ts) throws IOException {
- ApplicationSubmissionContext appContext =
- recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
ApplicationId applicationId = resMgrDelegate.getApplicationId();
- appContext.setApplicationId(applicationId);
+
+ // Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory(conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
MRJobConfig.DEFAULT_MR_AM_VMEM_MB));
LOG.info("AppMaster capability = " + capability);
- appContext.setMasterCapability(capability);
+ // Setup LocalResources
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
Path jobConfPath = new Path(jobSubmitDir, MRConstants.JOB_CONF_FILE);
URL yarnUrlForJobSubmitDir = ConverterUtils
@@ -292,14 +291,11 @@ public class YARNRunner implements Clien
LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir);
- appContext.setResource(MRConstants.JOB_SUBMIT_DIR,
- yarnUrlForJobSubmitDir);
-
- appContext.setResourceTodo(MRConstants.JOB_CONF_FILE,
+ localResources.put(MRConstants.JOB_CONF_FILE,
createApplicationResource(defaultFileContext,
jobConfPath));
if (jobConf.get(MRJobConfig.JAR) != null) {
- appContext.setResourceTodo(MRConstants.JOB_JAR,
+ localResources.put(MRConstants.JOB_JAR,
createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, MRConstants.JOB_JAR)));
} else {
@@ -312,30 +308,21 @@ public class YARNRunner implements Clien
// TODO gross hack
for (String s : new String[] { "job.split", "job.splitmetainfo",
MRConstants.APPLICATION_TOKENS_FILE }) {
- appContext.setResourceTodo(
+ localResources.put(
MRConstants.JOB_SUBMIT_DIR + "/" + s,
- createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s)));
- }
-
- // TODO: Only if security is on.
- List<String> fsTokens = new ArrayList<String>();
- for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
- fsTokens.add(token.encodeToUrlString());
+ createApplicationResource(defaultFileContext,
+ new Path(jobSubmitDir, s)));
}
- // TODO - Remove this!
- appContext.addAllFsTokens(fsTokens);
- DataOutputBuffer dob = new DataOutputBuffer();
- ts.writeTokenStorageToStream(dob);
- appContext.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+ // Setup security tokens
+ ByteBuffer securityTokens = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
- // Add queue information
- appContext.setQueue(jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME));
-
- // Add job name
- appContext.setApplicationName(jobConf.get(JobContext.JOB_NAME, "N/A"));
-
- // Add the command line
+ // Setup the command to run the AM
String javaHome = "$JAVA_HOME";
Vector<CharSequence> vargs = new Vector<CharSequence>(8);
vargs.add(javaHome + "/bin/java");
@@ -346,13 +333,6 @@ public class YARNRunner implements Clien
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
- // Add { job jar, MR app jar } to classpath.
- Map<String, String> environment = new HashMap<String, String>();
- MRApps.setInitialClasspath(environment);
- MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
- MRApps.addToClassPath(environment,
- MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
- appContext.addAllEnvironment(environment);
vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
vargs.add(String.valueOf(applicationId.getId()));
@@ -370,140 +350,43 @@ public class YARNRunner implements Clien
LOG.info("Command to launch container for ApplicationMaster is : "
+ mergedCommand);
+
+ // Setup the environment - Add { job jar, MR app jar } to classpath.
+ Map<String, String> environment = new HashMap<String, String>();
+ MRApps.setInitialClasspath(environment);
+ MRApps.addToClassPath(environment, MRConstants.JOB_JAR);
+ MRApps.addToClassPath(environment,
+ MRConstants.YARN_MAPREDUCE_APP_JAR_PATH);
- appContext.addAllCommands(vargsFinal);
- // TODO: RM should get this from RPC.
- appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
- return appContext;
- }
+ // Parse distributed cache
+ MRApps.setupDistributedCache(jobConf, localResources, environment);
- /**
- * * TODO: Copied for now from TaskAttemptImpl.java ... fixme
- * @param strs
- * @return
- */
- private static long[] parseTimeStamps(String[] strs) {
- if (null == strs) {
- return null;
- }
- long[] result = new long[strs.length];
- for(int i=0; i < strs.length; ++i) {
- result[i] = Long.parseLong(strs[i]);
- }
- return result;
- }
+ // Setup ContainerLaunchContext for AM container
+ ContainerLaunchContext amContainer =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ amContainer.setResource(capability); // Resource (mem) required
+ amContainer.setLocalResources(localResources); // Local resources
+ amContainer.setEnvironment(environment); // Environment
+ amContainer.setCommands(vargsFinal); // Command for AM
+ amContainer.setContainerTokens(securityTokens); // Security tokens
- /**
- * TODO: Copied for now from TaskAttemptImpl.java ... fixme
- *
- * TODO: This is currently needed in YarnRunner as user code like setupJob,
- * cleanupJob may need access to dist-cache. Once we separate distcache for
- * maps, reduces, setup etc, this can include only a subset of artificats.
- * This is also needed for uberAM case where we run everything inside AM.
- */
- private void setupDistributedCache(Configuration conf,
- ApplicationSubmissionContext container) throws IOException {
-
- // Cache archives
- parseDistributedCacheArtifacts(conf, container, LocalResourceType.ARCHIVE,
- DistributedCache.getCacheArchives(conf),
- parseTimeStamps(DistributedCache.getArchiveTimestamps(conf)),
- getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
- DistributedCache.getArchiveVisibilities(conf),
- DistributedCache.getArchiveClassPaths(conf));
-
- // Cache files
- parseDistributedCacheArtifacts(conf, container, LocalResourceType.FILE,
- DistributedCache.getCacheFiles(conf),
- parseTimeStamps(DistributedCache.getFileTimestamps(conf)),
- getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
- DistributedCache.getFileVisibilities(conf),
- DistributedCache.getFileClassPaths(conf));
- }
-
- // TODO - Move this to MR!
- // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
- private void parseDistributedCacheArtifacts(Configuration conf,
- ApplicationSubmissionContext container, LocalResourceType type,
- URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
- Path[] pathsToPutOnClasspath) throws IOException {
-
- if (uris != null) {
- // Sanity check
- if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
- (uris.length != visibilities.length)) {
- throw new IllegalArgumentException("Invalid specification for " +
- "distributed-cache artifacts of type " + type + " :" +
- " #uris=" + uris.length +
- " #timestamps=" + timestamps.length +
- " #visibilities=" + visibilities.length
- );
- }
-
- Map<String, Path> classPaths = new HashMap<String, Path>();
- if (pathsToPutOnClasspath != null) {
- for (Path p : pathsToPutOnClasspath) {
- FileSystem fs = p.getFileSystem(conf);
- p = p.makeQualified(fs.getUri(), fs.getWorkingDirectory());
- classPaths.put(p.toUri().getPath().toString(), p);
- }
- }
- for (int i = 0; i < uris.length; ++i) {
- URI u = uris[i];
- Path p = new Path(u);
- FileSystem fs = p.getFileSystem(conf);
- p = fs.resolvePath(
- p.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
- // Add URI fragment or just the filename
- Path name = new Path((null == u.getFragment())
- ? p.getName()
- : u.getFragment());
- if (name.isAbsolute()) {
- throw new IllegalArgumentException("Resource name must be relative");
- }
- String linkName = name.toUri().getPath();
- container.setResourceTodo(
- linkName,
- createLocalResource(
- p.toUri(), type,
- visibilities[i]
- ? LocalResourceVisibility.PUBLIC
- : LocalResourceVisibility.PRIVATE,
- sizes[i], timestamps[i])
- );
- if (classPaths.containsKey(u.getPath())) {
- Map<String, String> environment = container.getAllEnvironment();
- MRApps.addToClassPath(environment, linkName);
- }
- }
- }
- }
+ // Set up the ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(applicationId); // ApplicationId
+ appContext.setUser( // User name
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ appContext.setQueue( // Queue name
+ jobConf.get(JobContext.QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ appContext.setApplicationName( // Job name
+ jobConf.get(JobContext.JOB_NAME,
+ YarnConfiguration.DEFAULT_APPLICATION_NAME));
+ appContext.setAMContainerSpec(amContainer); // AM Container
- // TODO - Move this to MR!
- private static long[] getFileSizes(Configuration conf, String key) {
- String[] strs = conf.getStrings(key);
- if (strs == null) {
- return null;
- }
- long[] result = new long[strs.length];
- for(int i=0; i < strs.length; ++i) {
- result[i] = Long.parseLong(strs[i]);
- }
- return result;
- }
-
- private LocalResource createLocalResource(URI uri,
- LocalResourceType type, LocalResourceVisibility visibility,
- long size, long timestamp) throws IOException {
- LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
- resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
- resource.setType(type);
- resource.setVisibility(visibility);
- resource.setSize(size);
- resource.setTimestamp(timestamp);
- return resource;
+ return appContext;
}
-
+
@Override
public void setJobPriority(JobID arg0, String arg1) throws IOException,
InterruptedException {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java Wed Sep 14 07:26:37 2011
@@ -18,14 +18,8 @@
package org.apache.hadoop.yarn.api.records;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
/**
@@ -36,26 +30,17 @@ import org.apache.hadoop.yarn.api.Client
* <p>It includes details such as:
* <ul>
* <li>{@link ApplicationId} of the application.</li>
- * <li>
- * {@link Resource} necessary to run the <code>ApplicationMaster</code>.
- * </li>
* <li>Application user.</li>
* <li>Application name.</li>
* <li>{@link Priority} of the application.</li>
- * <li>Security tokens (if security is enabled).</li>
- * <li>
- * {@link LocalResource} necessary for running the
- * <code>ApplicationMaster</code> container such
- * as binaries, jar, shared-objects, side-files etc.
- * </li>
* <li>
- * Environment variables for the launched <code>ApplicationMaster</code>
- * process.
+ * {@link ContainerLaunchContext} of the container in which the
+ * <code>ApplicationMaster</code> is executed.
* </li>
- * <li>Command to launch the <code>ApplicationMaster</code>.</li>
* </ul>
* </p>
*
+ * @see ContainerLaunchContext
* @see ClientRMProtocol#submitApplication(org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest)
*/
@Public
@@ -143,198 +128,25 @@ public interface ApplicationSubmissionCo
public void setUser(String user);
/**
- * Get the <code>Resource</code> required to run the
- * <code>ApplicationMaster</code>.
- * @return <code>Resource</code> required to run the
- * <code>ApplicationMaster</code>
- */
- @Public
- @Stable
- public Resource getMasterCapability();
-
- /**
- * Set <code>Resource</code> required to run the
- * <code>ApplicationMaster</code>.
- * @param masterCapability <code>Resource</code> required to run the
- * <code>ApplicationMaster</code>
- */
- @Public
- @Stable
- public void setMasterCapability(Resource masterCapability);
-
- @Private
- @Unstable
- public Map<String, URL> getAllResources();
-
- @Private
- @Unstable
- public URL getResource(String key);
-
- @Private
- @Unstable
- public void addAllResources(Map<String, URL> resources);
-
- @Private
- @Unstable
- public void setResource(String key, URL url);
-
- @Private
- @Unstable
- public void removeResource(String key);
-
- @Private
- @Unstable
- public void clearResources();
-
- /**
- * Get all the <code>LocalResource</code> required to run the
- * <code>ApplicationMaster</code>.
- * @return <code>LocalResource</code> required to run the
- * <code>ApplicationMaster</code>
- */
- @Public
- @Stable
- public Map<String, LocalResource> getAllResourcesTodo();
-
- @Private
- @Unstable
- public LocalResource getResourceTodo(String key);
-
- /**
- * Add all the <code>LocalResource</code> required to run the
- * <code>ApplicationMaster</code>.
- * @param resources all <code>LocalResource</code> required to run the
- * <code>ApplicationMaster</code>
- */
- @Public
- @Stable
- public void addAllResourcesTodo(Map<String, LocalResource> resources);
-
- @Private
- @Unstable
- public void setResourceTodo(String key, LocalResource localResource);
-
- @Private
- @Unstable
- public void removeResourceTodo(String key);
-
- @Private
- @Unstable
- public void clearResourcesTodo();
-
- @Private
- @Unstable
- public List<String> getFsTokenList();
-
- @Private
- @Unstable
- public String getFsToken(int index);
-
- @Private
- @Unstable
- public int getFsTokenCount();
-
- @Private
- @Unstable
- public void addAllFsTokens(List<String> fsTokens);
-
- @Private
- @Unstable
- public void addFsToken(String fsToken);
-
- @Private
- @Unstable
- public void removeFsToken(int index);
-
- @Private
- @Unstable
- public void clearFsTokens();
-
- /**
- * Get <em>file-system tokens</em> for the <code>ApplicationMaster</code>.
- * @return file-system tokens for the <code>ApplicationMaster</code>
- */
- @Public
- @Stable
- public ByteBuffer getFsTokensTodo();
-
- /**
- * Set <em>file-system tokens</em> for the <code>ApplicationMaster</code>.
- * @param fsTokens file-system tokens for the <code>ApplicationMaster</code>
+ * Get the <code>ContainerLaunchContext</code> to describe the
+ * <code>Container</code> with which the <code>ApplicationMaster</code> is
+ * launched.
+ * @return <code>ContainerLaunchContext</code> for the
+ * <code>ApplicationMaster</code> container
*/
@Public
@Stable
- public void setFsTokensTodo(ByteBuffer fsTokens);
-
- /**
- * Get the <em>environment variables</em> for the
- * <code>ApplicationMaster</code>.
- * @return environment variables for the <code>ApplicationMaster</code>
- */
- @Public
- @Stable
- public Map<String, String> getAllEnvironment();
-
- @Private
- @Unstable
- public String getEnvironment(String key);
+ public ContainerLaunchContext getAMContainerSpec();
/**
- * Add all of the <em>environment variables</em> for the
- * <code>ApplicationMaster</code>.
- * @param environment environment variables for the
- * <code>ApplicationMaster</code>
+ * Set the <code>ContainerLaunchContext</code> to describe the
+ * <code>Container</code> with which the <code>ApplicationMaster</code> is
+ * launched.
+ * @param amContainer <code>ContainerLaunchContext</code> for the
+ * <code>ApplicationMaster</code> container
*/
@Public
@Stable
- public void addAllEnvironment(Map<String, String> environment);
+ public void setAMContainerSpec(ContainerLaunchContext amContainer);
- @Private
- @Unstable
- public void setEnvironment(String key, String env);
-
- @Private
- @Unstable
- public void removeEnvironment(String key);
-
- @Private
- @Unstable
- public void clearEnvironment();
-
- /**
- * Get the <em>commands</em> to launch the <code>ApplicationMaster</code>.
- * @return commands to launch the <code>ApplicationMaster</code>
- */
- @Public
- @Stable
- public List<String> getCommandList();
-
- @Private
- @Unstable
- public String getCommand(int index);
-
- @Private
- @Unstable
- public int getCommandCount();
-
- /**
- * Add all of the <em>commands</em> to launch the
- * <code>ApplicationMaster</code>.
- * @param commands commands to launch the <code>ApplicationMaster</code>
- */
- @Public
- @Stable
- public void addAllCommands(List<String> commands);
-
- @Private
- @Unstable
- public void addCommand(String command);
-
- @Private
- @Unstable
- public void removeCommand(int index);
-
- @Private
- @Unstable
- public void clearCommands();
}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java Wed Sep 14 07:26:37 2011
@@ -156,7 +156,7 @@ public interface ContainerLaunchContext
*/
@Public
@Stable
- Map<String, String> getEnv();
+ Map<String, String> getEnvironment();
/**
* Add <em>environment variables</em> for the container.
@@ -164,7 +164,7 @@ public interface ContainerLaunchContext
*/
@Public
@Stable
- void setEnv(Map<String, String> environment);
+ void setEnvironment(Map<String, String> environment);
/**
* Get the list of <em>commands</em> for launching the container.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java Wed Sep 14 07:26:37 2011
@@ -18,56 +18,35 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.StringURLMapProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.URLProto;
-
-
-public class ApplicationSubmissionContextPBImpl extends ProtoBase<ApplicationSubmissionContextProto> implements ApplicationSubmissionContext {
- ApplicationSubmissionContextProto proto = ApplicationSubmissionContextProto.getDefaultInstance();
+public class ApplicationSubmissionContextPBImpl
+extends ProtoBase<ApplicationSubmissionContextProto>
+implements ApplicationSubmissionContext {
+ ApplicationSubmissionContextProto proto =
+ ApplicationSubmissionContextProto.getDefaultInstance();
ApplicationSubmissionContextProto.Builder builder = null;
boolean viaProto = false;
private ApplicationId applicationId = null;
- private Resource masterCapability = null;
- private Map<String, URL> resources = null;
- private Map<String, LocalResource> resourcesTodo = null;
- private List<String> fsTokenList = null;
- private ByteBuffer fsTokenTodo = null;
- private Map<String, String> environment = null;
- private List<String> commandList = null;
private Priority priority = null;
-
-
+ private ContainerLaunchContext amContainer = null;
public ApplicationSubmissionContextPBImpl() {
builder = ApplicationSubmissionContextProto.newBuilder();
}
- public ApplicationSubmissionContextPBImpl(ApplicationSubmissionContextProto proto) {
+ public ApplicationSubmissionContextPBImpl(
+ ApplicationSubmissionContextProto proto) {
this.proto = proto;
viaProto = true;
}
@@ -83,30 +62,12 @@ public class ApplicationSubmissionContex
if (this.applicationId != null) {
builder.setApplicationId(convertToProtoFormat(this.applicationId));
}
- if (this.masterCapability != null) {
- builder.setMasterCapability(convertToProtoFormat(this.masterCapability));
- }
- if (this.resources != null) {
- addResourcesToProto();
- }
- if (this.resourcesTodo != null) {
- addResourcesTodoToProto();
- }
- if (this.fsTokenList != null) {
- addFsTokenListToProto();
- }
- if (this.fsTokenTodo != null) {
- builder.setFsTokensTodo(convertToProtoFormat(this.fsTokenTodo));
- }
- if (this.environment != null) {
- addEnvironmentToProto();
- }
- if (this.commandList != null) {
- addCommandsToProto();
- }
if (this.priority != null) {
builder.setPriority(convertToProtoFormat(this.priority));
}
+ if (this.amContainer != null) {
+ builder.setAmContainerSpec(convertToProtoFormat(this.amContainer));
+ }
}
private void mergeLocalToProto() {
@@ -145,6 +106,7 @@ public class ApplicationSubmissionContex
builder.clearPriority();
this.priority = priority;
}
+
@Override
public ApplicationId getApplicationId() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -165,6 +127,7 @@ public class ApplicationSubmissionContex
builder.clearApplicationId();
this.applicationId = applicationId;
}
+
@Override
public String getApplicationName() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -183,403 +146,7 @@ public class ApplicationSubmissionContex
}
builder.setApplicationName((applicationName));
}
- @Override
- public Resource getMasterCapability() {
- ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
- if (this.masterCapability != null) {
- return masterCapability;
- } // Else via proto
- if (!p.hasMasterCapability()) {
- return null;
- }
- masterCapability = convertFromProtoFormat(p.getMasterCapability());
- return this.masterCapability;
- }
-
- @Override
- public void setMasterCapability(Resource masterCapability) {
- maybeInitBuilder();
- if (masterCapability == null)
- builder.clearMasterCapability();
- this.masterCapability = masterCapability;
- }
- @Override
- public Map<String, URL> getAllResources() {
- initResources();
- return this.resources;
- }
- @Override
- public URL getResource(String key) {
- initResources();
- return this.resources.get(key);
- }
-
- private void initResources() {
- if (this.resources != null) {
- return;
- }
- ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
- List<StringURLMapProto> mapAsList = p.getResourcesList();
- this.resources = new HashMap<String, URL>();
-
- for (StringURLMapProto c : mapAsList) {
- this.resources.put(c.getKey(), convertFromProtoFormat(c.getValue()));
- }
- }
-
- @Override
- public void addAllResources(final Map<String, URL> resources) {
- if (resources == null)
- return;
- initResources();
- this.resources.putAll(resources);
- }
-
- private void addResourcesToProto() {
- maybeInitBuilder();
- builder.clearResources();
- if (this.resources == null)
- return;
- Iterable<StringURLMapProto> iterable = new Iterable<StringURLMapProto>() {
-
- @Override
- public Iterator<StringURLMapProto> iterator() {
- return new Iterator<StringURLMapProto>() {
-
- Iterator<String> keyIter = resources.keySet().iterator();
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public StringURLMapProto next() {
- String key = keyIter.next();
- return StringURLMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resources.get(key))).build();
- }
-
- @Override
- public boolean hasNext() {
- return keyIter.hasNext();
- }
- };
- }
- };
- builder.addAllResources(iterable);
- }
- @Override
- public void setResource(String key, URL val) {
- initResources();
- this.resources.put(key, val);
- }
- @Override
- public void removeResource(String key) {
- initResources();
- this.resources.remove(key);
- }
- @Override
- public void clearResources() {
- initResources();
- this.resources.clear();
- }
- @Override
- public Map<String, LocalResource> getAllResourcesTodo() {
- initResourcesTodo();
- return this.resourcesTodo;
- }
- @Override
- public LocalResource getResourceTodo(String key) {
- initResourcesTodo();
- return this.resourcesTodo.get(key);
- }
-
- private void initResourcesTodo() {
- if (this.resourcesTodo != null) {
- return;
- }
- ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
- List<StringLocalResourceMapProto> mapAsList = p.getResourcesTodoList();
- this.resourcesTodo = new HashMap<String, LocalResource>();
-
- for (StringLocalResourceMapProto c : mapAsList) {
- this.resourcesTodo.put(c.getKey(), convertFromProtoFormat(c.getValue()));
- }
- }
-
- @Override
- public void addAllResourcesTodo(final Map<String, LocalResource> resourcesTodo) {
- if (resourcesTodo == null)
- return;
- initResourcesTodo();
- this.resourcesTodo.putAll(resourcesTodo);
- }
-
- private void addResourcesTodoToProto() {
- maybeInitBuilder();
- builder.clearResourcesTodo();
- if (resourcesTodo == null)
- return;
- Iterable<StringLocalResourceMapProto> iterable = new Iterable<StringLocalResourceMapProto>() {
-
- @Override
- public Iterator<StringLocalResourceMapProto> iterator() {
- return new Iterator<StringLocalResourceMapProto>() {
-
- Iterator<String> keyIter = resourcesTodo.keySet().iterator();
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public StringLocalResourceMapProto next() {
- String key = keyIter.next();
- return StringLocalResourceMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(resourcesTodo.get(key))).build();
- }
-
- @Override
- public boolean hasNext() {
- return keyIter.hasNext();
- }
- };
- }
- };
- builder.addAllResourcesTodo(iterable);
- }
- @Override
- public void setResourceTodo(String key, LocalResource val) {
- initResourcesTodo();
- this.resourcesTodo.put(key, val);
- }
- @Override
- public void removeResourceTodo(String key) {
- initResourcesTodo();
- this.resourcesTodo.remove(key);
- }
- @Override
- public void clearResourcesTodo() {
- initResourcesTodo();
- this.resourcesTodo.clear();
- }
- @Override
- public List<String> getFsTokenList() {
- initFsTokenList();
- return this.fsTokenList;
- }
- @Override
- public String getFsToken(int index) {
- initFsTokenList();
- return this.fsTokenList.get(index);
- }
- @Override
- public int getFsTokenCount() {
- initFsTokenList();
- return this.fsTokenList.size();
- }
-
- private void initFsTokenList() {
- if (this.fsTokenList != null) {
- return;
- }
- ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
- List<String> list = p.getFsTokensList();
- this.fsTokenList = new ArrayList<String>();
-
- for (String c : list) {
- this.fsTokenList.add(c);
- }
- }
-
- @Override
- public void addAllFsTokens(final List<String> fsTokens) {
- if (fsTokens == null)
- return;
- initFsTokenList();
- this.fsTokenList.addAll(fsTokens);
- }
-
- private void addFsTokenListToProto() {
- maybeInitBuilder();
- builder.clearFsTokens();
- builder.addAllFsTokens(this.fsTokenList);
- }
-
- @Override
- public void addFsToken(String fsTokens) {
- initFsTokenList();
- this.fsTokenList.add(fsTokens);
- }
- @Override
- public void removeFsToken(int index) {
- initFsTokenList();
- this.fsTokenList.remove(index);
- }
- @Override
- public void clearFsTokens() {
- initFsTokenList();
- this.fsTokenList.clear();
- }
- @Override
- public ByteBuffer getFsTokensTodo() {
- ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
- if (this.fsTokenTodo != null) {
- return this.fsTokenTodo;
- }
- if (!p.hasFsTokensTodo()) {
- return null;
- }
- this.fsTokenTodo = convertFromProtoFormat(p.getFsTokensTodo());
- return this.fsTokenTodo;
- }
-
- @Override
- public void setFsTokensTodo(ByteBuffer fsTokensTodo) {
- maybeInitBuilder();
- if (fsTokensTodo == null)
- builder.clearFsTokensTodo();
- this.fsTokenTodo = fsTokensTodo;
- }
- @Override
- public Map<String, String> getAllEnvironment() {
- initEnvironment();
- return this.environment;
- }
- @Override
- public String getEnvironment(String key) {
- initEnvironment();
- return this.environment.get(key);
- }
-
- private void initEnvironment() {
- if (this.environment != null) {
- return;
- }
- ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
- List<StringStringMapProto> mapAsList = p.getEnvironmentList();
- this.environment = new HashMap<String, String>();
-
- for (StringStringMapProto c : mapAsList) {
- this.environment.put(c.getKey(), c.getValue());
- }
- }
-
- @Override
- public void addAllEnvironment(Map<String, String> environment) {
- if (environment == null)
- return;
- initEnvironment();
- this.environment.putAll(environment);
- }
-
- private void addEnvironmentToProto() {
- maybeInitBuilder();
- builder.clearEnvironment();
- if (environment == null)
- return;
- Iterable<StringStringMapProto> iterable = new Iterable<StringStringMapProto>() {
-
- @Override
- public Iterator<StringStringMapProto> iterator() {
- return new Iterator<StringStringMapProto>() {
-
- Iterator<String> keyIter = environment.keySet().iterator();
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public StringStringMapProto next() {
- String key = keyIter.next();
- return StringStringMapProto.newBuilder().setKey(key).setValue((environment.get(key))).build();
- }
-
- @Override
- public boolean hasNext() {
- return keyIter.hasNext();
- }
- };
- }
- };
- builder.addAllEnvironment(iterable);
- }
- @Override
- public void setEnvironment(String key, String val) {
- initEnvironment();
- this.environment.put(key, val);
- }
- @Override
- public void removeEnvironment(String key) {
- initEnvironment();
- this.environment.remove(key);
- }
- @Override
- public void clearEnvironment() {
- initEnvironment();
- this.environment.clear();
- }
- @Override
- public List<String> getCommandList() {
- initCommandList();
- return this.commandList;
- }
- @Override
- public String getCommand(int index) {
- initCommandList();
- return this.commandList.get(index);
- }
- @Override
- public int getCommandCount() {
- initCommandList();
- return this.commandList.size();
- }
-
- private void initCommandList() {
- if (this.commandList != null) {
- return;
- }
- ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
- List<String> list = p.getCommandList();
- this.commandList = new ArrayList<String>();
- for (String c : list) {
- this.commandList.add(c);
- }
- }
-
- @Override
- public void addAllCommands(final List<String> command) {
- if (command == null)
- return;
- initCommandList();
- this.commandList.addAll(command);
- }
-
- private void addCommandsToProto() {
- maybeInitBuilder();
- builder.clearCommand();
- if (this.commandList == null)
- return;
- builder.addAllCommand(this.commandList);
- }
- @Override
- public void addCommand(String command) {
- initCommandList();
- this.commandList.add(command);
- }
- @Override
- public void removeCommand(int index) {
- initCommandList();
- this.commandList.remove(index);
- }
- @Override
- public void clearCommands() {
- initCommandList();
- this.commandList.clear();
- }
@Override
public String getQueue() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -598,6 +165,7 @@ public class ApplicationSubmissionContex
}
builder.setQueue((queue));
}
+
@Override
public String getUser() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
@@ -617,6 +185,28 @@ public class ApplicationSubmissionContex
builder.setUser((user));
}
+ @Override
+ public ContainerLaunchContext getAMContainerSpec() {
+ ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.amContainer != null) {
+ return amContainer;
+ } // Else via proto
+ if (!p.hasAmContainerSpec()) {
+ return null;
+ }
+ amContainer = convertFromProtoFormat(p.getAmContainerSpec());
+ return amContainer;
+ }
+
+ @Override
+ public void setAMContainerSpec(ContainerLaunchContext amContainer) {
+ maybeInitBuilder();
+ if (amContainer == null) {
+ builder.clearAmContainerSpec();
+ }
+ this.amContainer = amContainer;
+ }
+
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
@@ -633,28 +223,12 @@ public class ApplicationSubmissionContex
return ((ApplicationIdPBImpl)t).getProto();
}
- private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
- return new ResourcePBImpl(p);
+ private ContainerLaunchContextPBImpl convertFromProtoFormat(
+ ContainerLaunchContextProto p) {
+ return new ContainerLaunchContextPBImpl(p);
}
- private ResourceProto convertToProtoFormat(Resource t) {
- return ((ResourcePBImpl)t).getProto();
+ private ContainerLaunchContextProto convertToProtoFormat(ContainerLaunchContext t) {
+ return ((ContainerLaunchContextPBImpl)t).getProto();
}
-
- private URLPBImpl convertFromProtoFormat(URLProto p) {
- return new URLPBImpl(p);
- }
-
- private URLProto convertToProtoFormat(URL t) {
- return ((URLPBImpl)t).getProto();
- }
-
- private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
- return new LocalResourcePBImpl(p);
- }
-
- private LocalResourceProto convertToProtoFormat(LocalResource t) {
- return ((LocalResourcePBImpl)t).getProto();
- }
-
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java Wed Sep 14 07:26:37 2011
@@ -39,8 +39,6 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
-
-
public class ContainerLaunchContextPBImpl
extends ProtoBase<ContainerLaunchContextProto>
implements ContainerLaunchContext {
@@ -54,10 +52,9 @@ implements ContainerLaunchContext {
private Map<String, LocalResource> localResources = null;
private ByteBuffer containerTokens = null;
private Map<String, ByteBuffer> serviceData = null;
- private Map<String, String> env = null;
+ private Map<String, String> environment = null;
private List<String> commands = null;
-
public ContainerLaunchContextPBImpl() {
builder = ContainerLaunchContextProto.newBuilder();
}
@@ -94,7 +91,7 @@ implements ContainerLaunchContext {
if (this.serviceData != null) {
addServiceDataToProto();
}
- if (this.env != null) {
+ if (this.environment != null) {
addEnvToProto();
}
if (this.commands != null) {
@@ -364,37 +361,37 @@ implements ContainerLaunchContext {
}
@Override
- public Map<String, String> getEnv() {
+ public Map<String, String> getEnvironment() {
initEnv();
- return this.env;
+ return this.environment;
}
private void initEnv() {
- if (this.env != null) {
+ if (this.environment != null) {
return;
}
ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
- List<StringStringMapProto> list = p.getEnvList();
- this.env = new HashMap<String, String>();
+ List<StringStringMapProto> list = p.getEnvironmentList();
+ this.environment = new HashMap<String, String>();
for (StringStringMapProto c : list) {
- this.env.put(c.getKey(), c.getValue());
+ this.environment.put(c.getKey(), c.getValue());
}
}
@Override
- public void setEnv(final Map<String, String> env) {
+ public void setEnvironment(final Map<String, String> env) {
if (env == null)
return;
initEnv();
- this.env.clear();
- this.env.putAll(env);
+ this.environment.clear();
+ this.environment.putAll(env);
}
private void addEnvToProto() {
maybeInitBuilder();
- builder.clearEnv();
- if (env == null)
+ builder.clearEnvironment();
+ if (environment == null)
return;
Iterable<StringStringMapProto> iterable =
new Iterable<StringStringMapProto>() {
@@ -403,7 +400,7 @@ implements ContainerLaunchContext {
public Iterator<StringStringMapProto> iterator() {
return new Iterator<StringStringMapProto>() {
- Iterator<String> keyIter = env.keySet().iterator();
+ Iterator<String> keyIter = environment.keySet().iterator();
@Override
public void remove() {
@@ -414,7 +411,7 @@ implements ContainerLaunchContext {
public StringStringMapProto next() {
String key = keyIter.next();
return StringStringMapProto.newBuilder().setKey(key).setValue(
- (env.get(key))).build();
+ (environment.get(key))).build();
}
@Override
@@ -424,7 +421,7 @@ implements ContainerLaunchContext {
};
}
};
- builder.addAllEnv(iterable);
+ builder.addAllEnvironment(iterable);
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto Wed Sep 14 07:26:37 2011
@@ -188,17 +188,11 @@ message AMResponseProto {
////////////////////////////////////////////////////////////////////////
message ApplicationSubmissionContextProto {
optional ApplicationIdProto application_id = 1;
- optional string application_name = 2;
- optional ResourceProto master_capability = 3;
- repeated StringURLMapProto resources = 4;
- repeated StringLocalResourceMapProto resources_todo = 5;
- repeated string fs_tokens = 6;
- optional bytes fs_tokens_todo = 7;
- repeated StringStringMapProto environment = 8;
- repeated string command = 9;
- optional string queue = 10;
- optional PriorityProto priority = 11;
- optional string user = 12;
+ optional string application_name = 2 [default = "N/A"];
+ optional string user = 3;
+ optional string queue = 4 [default = "default"];
+ optional PriorityProto priority = 5;
+ optional ContainerLaunchContextProto am_container_spec = 6;
}
message YarnClusterMetricsProto {
@@ -242,7 +236,7 @@ message ContainerLaunchContextProto {
repeated StringLocalResourceMapProto localResources = 4;
optional bytes container_tokens = 5;
repeated StringBytesMapProto service_data = 6;
- repeated StringStringMapProto env = 7;
+ repeated StringStringMapProto environment = 7;
repeated string command = 8;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Sep 14 07:26:37 2011
@@ -219,6 +219,12 @@ public class YarnConfiguration extends C
RM_PREFIX + "max-completed-applications";
public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000;
+ /** Default application name */
+ public static final String DEFAULT_APPLICATION_NAME = "N/A";
+
+ /** Default queue name */
+ public static final String DEFAULT_QUEUE_NAME = "default";
+
////////////////////////////////
// Node Manager Configs
////////////////////////////////
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Sep 14 07:26:37 2011
@@ -89,7 +89,7 @@ public class ContainerLaunch implements
final Map<Path,String> localResources = container.getLocalizedResources();
String containerIdStr = ConverterUtils.toString(container.getContainerID());
final String user = launchContext.getUser();
- final Map<String,String> env = launchContext.getEnv();
+ final Map<String,String> env = launchContext.getEnvironment();
final List<String> command = launchContext.getCommands();
int ret = -1;
@@ -109,7 +109,7 @@ public class ContainerLaunch implements
}
launchContext.setCommands(newCmds);
- Map<String, String> envs = launchContext.getEnv();
+ Map<String, String> envs = launchContext.getEnvironment();
Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
for (Entry<String, String> entry : envs.entrySet()) {
newEnvs.put(
@@ -118,7 +118,7 @@ public class ContainerLaunch implements
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
containerLogDir.toUri().getPath()));
}
- launchContext.setEnv(newEnvs);
+ launchContext.setEnvironment(newEnvs);
// /////////////////////////// End of variable expansion
FileContext lfs = FileContext.getLocalFSFileContext();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Sep 14 07:26:37 2011
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -90,7 +89,6 @@ public class ClientRMService extends Abs
final private AtomicInteger applicationCounter = new AtomicInteger(0);
final private YarnScheduler scheduler;
final private RMContext rmContext;
- private final AMLivelinessMonitor amLivelinessMonitor;
private final RMAppManager rmAppManager;
private String clientServiceBindAddress;
@@ -106,7 +104,6 @@ public class ClientRMService extends Abs
super(ClientRMService.class.getName());
this.scheduler = scheduler;
this.rmContext = rmContext;
- this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rmAppManager = rmAppManager;
}
@@ -195,15 +192,18 @@ public class ClientRMService extends Abs
SubmitApplicationRequest request) throws YarnRemoteException {
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
- ApplicationId applicationId = null;
- String user = null;
+ ApplicationId applicationId = submissionContext.getApplicationId();
+ String user = submissionContext.getUser();
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
- applicationId = submissionContext.getApplicationId();
if (rmContext.getRMApps().get(applicationId) != null) {
throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!");
}
+
+ // Safety
+ submissionContext.setUser(user);
+
// This needs to be synchronous as the client can query
// immediately following the submission to get the application status.
// So call handle directly and do not send an event.
@@ -226,6 +226,7 @@ public class ClientRMService extends Abs
return response;
}
+ @SuppressWarnings("unchecked")
@Override
public FinishApplicationResponse finishApplication(
FinishApplicationRequest request) throws YarnRemoteException {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Sep 14 07:26:37 2011
@@ -210,7 +210,9 @@ public class RMAppManager implements Eve
}
}
- protected synchronized void submitApplication(ApplicationSubmissionContext submissionContext) {
+ @SuppressWarnings("unchecked")
+ protected synchronized void submitApplication(
+ ApplicationSubmissionContext submissionContext) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {
@@ -224,27 +226,37 @@ public class RMAppManager implements Eve
clientTokenStr = clientToken.encodeToUrlString();
LOG.debug("Sending client token as " + clientTokenStr);
}
- submissionContext.setQueue(submissionContext.getQueue() == null
- ? "default" : submissionContext.getQueue());
- submissionContext.setApplicationName(submissionContext
- .getApplicationName() == null ? "N/A" : submissionContext
- .getApplicationName());
+
+ // Sanity checks
+ if (submissionContext.getQueue() == null) {
+ submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
+ if (submissionContext.getApplicationName() == null) {
+ submissionContext.setApplicationName(
+ YarnConfiguration.DEFAULT_APPLICATION_NAME);
+ }
+
+ // Store application for recovery
ApplicationStore appStore = rmContext.getApplicationsStore()
.createApplicationStore(submissionContext.getApplicationId(),
submissionContext);
+
+ // Create RMApp
application = new RMAppImpl(applicationId, rmContext,
this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext, clientTokenStr,
- appStore, rmContext.getAMLivelinessMonitor(), this.scheduler,
+ appStore, this.scheduler,
this.masterService);
- if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) {
+ if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
+ null) {
LOG.info("Application with id " + applicationId +
" is already present! Cannot add a duplicate!");
- // don't send event through dispatcher as it will be handled by app already
- // present with this id.
+ // don't send event through dispatcher as it will be handled by app
+ // already present with this id.
application.handle(new RMAppRejectedEvent(applicationId,
- "Application with this id is already present! Cannot add a duplicate!"));
+ "Application with this id is already present! " +
+ "Cannot add a duplicate!"));
} else {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START));
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerSubmitEvent.java Wed Sep 14 07:26:37 2011
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
public class RMAppManagerSubmitEvent extends RMAppManagerEvent {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Wed Sep 14 07:26:37 2011
@@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -120,7 +119,8 @@ public class AMLauncher implements Runna
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
- StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
+ StartContainerRequest request =
+ recordFactory.newRecordInstance(StartContainerRequest.class);
request.setContainerLaunchContext(launchContext);
containerMgrProxy.startContainer(request);
LOG.info("Done launching container " + application.getMasterContainer()
@@ -130,7 +130,8 @@ public class AMLauncher implements Runna
private void cleanup() throws IOException {
connect();
ContainerId containerId = application.getMasterContainer().getId();
- StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
+ StopContainerRequest stopRequest =
+ recordFactory.newRecordInstance(StopContainerRequest.class);
stopRequest.setContainerId(containerId);
containerMgrProxy.stopContainer(stopRequest);
}
@@ -145,7 +146,7 @@ public class AMLauncher implements Runna
final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again.
UserGroupInformation currentUser =
- UserGroupInformation.createRemoteUser("TODO"); // TODO
+ UserGroupInformation.createRemoteUser("yarn"); // TODO
if (UserGroupInformation.isSecurityEnabled()) {
ContainerToken containerToken = container.getContainerToken();
Token<ContainerTokenIdentifier> token =
@@ -170,8 +171,8 @@ public class AMLauncher implements Runna
ContainerId containerID) throws IOException {
// Construct the actual Container
- ContainerLaunchContext container = recordFactory.newRecordInstance(ContainerLaunchContext.class);
- container.setCommands(applicationMasterContext.getCommandList());
+ ContainerLaunchContext container =
+ applicationMasterContext.getAMContainerSpec();
StringBuilder mergedCommand = new StringBuilder();
String failCount = Integer.toString(application.getAppAttemptId()
.getAttemptId());
@@ -189,34 +190,28 @@ public class AMLauncher implements Runna
LOG.info("Command to launch container " +
containerID + " : " + mergedCommand);
- Map<String, String> environment =
- applicationMasterContext.getAllEnvironment();
- environment.putAll(setupTokensInEnv(applicationMasterContext));
- container.setEnv(environment);
-
- // Construct the actual Container
+
+ // Finalize the container
container.setContainerId(containerID);
container.setUser(applicationMasterContext.getUser());
- container.setResource(applicationMasterContext.getMasterCapability());
- container.setLocalResources(applicationMasterContext.getAllResourcesTodo());
- container.setContainerTokens(applicationMasterContext.getFsTokensTodo());
+ setupTokensAndEnv(container);
+
return container;
}
- private Map<String, String> setupTokensInEnv(
- ApplicationSubmissionContext asc)
+ private void setupTokensAndEnv(
+ ContainerLaunchContext container)
throws IOException {
- Map<String, String> env =
- new HashMap<String, String>();
+ Map<String, String> environment = container.getEnvironment();
if (UserGroupInformation.isSecurityEnabled()) {
// TODO: Security enabled/disabled info should come from RM.
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
- if (asc.getFsTokensTodo() != null) {
+ if (container.getContainerTokens() != null) {
// TODO: Don't do this kind of checks everywhere.
- dibb.reset(asc.getFsTokensTodo());
+ dibb.reset(container.getContainerTokens());
credentials.readTokenStorageStream(dibb);
}
@@ -236,14 +231,16 @@ public class AMLauncher implements Runna
token.setService(new Text(resolvedAddr));
String appMasterTokenEncoded = token.encodeToUrlString();
LOG.debug("Putting appMaster token in env : " + appMasterTokenEncoded);
- env.put(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
+ environment.put(
+ ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME,
appMasterTokenEncoded);
// Add the RM token
credentials.addToken(new Text(resolvedAddr), token);
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
- asc.setFsTokensTodo(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+ container.setContainerTokens(
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
ApplicationTokenIdentifier identifier = new ApplicationTokenIdentifier(
application.getAppAttemptId().getApplicationId());
@@ -252,9 +249,10 @@ public class AMLauncher implements Runna
String encoded =
Base64.encodeBase64URLSafeString(clientSecretKey.getEncoded());
LOG.debug("The encoded client secret-key to be put in env : " + encoded);
- env.put(ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME, encoded);
+ environment.put(
+ ApplicationConstants.APPLICATION_CLIENT_SECRET_ENV_NAME,
+ encoded);
}
- return env;
}
@SuppressWarnings("unchecked")
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1170459&r1=1170458&r2=1170459&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Sep 14 07:26:37 2011
@@ -86,7 +86,6 @@ public class RMAppImpl implements RMApp
// Mutable fields
private long startTime;
private long finishTime;
- private AMLivelinessMonitor amLivelinessMonitor;
private RMAppAttempt currentAttempt;
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
@@ -163,7 +162,7 @@ public class RMAppImpl implements RMApp
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
Configuration config, String name, String user, String queue,
ApplicationSubmissionContext submissionContext, String clientTokenStr,
- ApplicationStore appStore, AMLivelinessMonitor amLivelinessMonitor,
+ ApplicationStore appStore,
YarnScheduler scheduler, ApplicationMasterService masterService) {
this.applicationId = applicationId;
@@ -176,7 +175,6 @@ public class RMAppImpl implements RMApp
this.submissionContext = submissionContext;
this.clientTokenStr = clientTokenStr;
this.appStore = appStore;
- this.amLivelinessMonitor = amLivelinessMonitor;
this.scheduler = scheduler;
this.masterService = masterService;
this.startTime = System.currentTimeMillis();
@@ -380,6 +378,7 @@ public class RMAppImpl implements RMApp
}
}
+ @SuppressWarnings("unchecked")
private void createNewAttempt() {
ApplicationAttemptId appAttemptId = Records
.newRecord(ApplicationAttemptId.class);
@@ -434,6 +433,7 @@ public class RMAppImpl implements RMApp
return nodes;
}
+ @SuppressWarnings("unchecked")
public void transition(RMAppImpl app, RMAppEvent event) {
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
for (NodeId nodeId : nodes) {