You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by te...@apache.org on 2017/02/22 23:39:51 UTC
hadoop git commit: MAPREDUCE-6825. YARNRunner#createApplicationSubmissionContext method is longer than 150 lines (Contributed by Gergely Novák via Daniel Templeton)
Repository: hadoop
Updated Branches:
refs/heads/trunk d150f061f -> 732ee6f0b
MAPREDUCE-6825. YARNRunner#createApplicationSubmissionContext method is longer than 150 lines (Contributed by Gergely Nov�k via Daniel Templeton)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/732ee6f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/732ee6f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/732ee6f0
Branch: refs/heads/trunk
Commit: 732ee6f0b58a12500198c0d934cc570c7490b520
Parents: d150f06
Author: Daniel Templeton <te...@apache.org>
Authored: Wed Feb 22 15:38:11 2017 -0800
Committer: Daniel Templeton <te...@apache.org>
Committed: Wed Feb 22 15:38:11 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/mapred/YARNRunner.java | 141 +++++++++++--------
1 file changed, 86 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/732ee6f0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 98fe553..228c6af 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -291,8 +291,7 @@ public class YARNRunner implements ClientProtocol {
throws IOException, InterruptedException {
addHistoryToken(ts);
-
- // Construct necessary information to start the MR AM
+
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
@@ -331,34 +330,15 @@ public class YARNRunner implements ClientProtocol {
return rsrc;
}
- public ApplicationSubmissionContext createApplicationSubmissionContext(
- Configuration jobConf,
- String jobSubmitDir, Credentials ts) throws IOException {
- ApplicationId applicationId = resMgrDelegate.getApplicationId();
-
- // Setup resource requirements
- Resource capability = recordFactory.newRecordInstance(Resource.class);
- capability.setMemorySize(
- conf.getInt(
- MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
- )
- );
- capability.setVirtualCores(
- conf.getInt(
- MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
- )
- );
- LOG.debug("AppMaster capability = " + capability);
-
- // Setup LocalResources
- Map<String, LocalResource> localResources =
- new HashMap<String, LocalResource>();
+ private Map<String, LocalResource> setupLocalResources(Configuration jobConf,
+ String jobSubmitDir) throws IOException {
+ Map<String, LocalResource> localResources = new HashMap<>();
Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
- URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext.getDefaultFileSystem()
- .resolvePath(
- defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+ URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext
+ .getDefaultFileSystem().resolvePath(
+ defaultFileContext.makeQualified(new Path(jobSubmitDir))));
LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir);
@@ -371,7 +351,7 @@ public class YARNRunner implements ClientProtocol {
FileContext.getFileContext(jobJarPath.toUri(), jobConf),
jobJarPath,
LocalResourceType.PATTERN);
- String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+ String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
rc.setPattern(pattern);
localResources.put(MRJobConfig.JOB_JAR, rc);
@@ -392,13 +372,11 @@ public class YARNRunner implements ClientProtocol {
new Path(jobSubmitDir, s), LocalResourceType.FILE));
}
- // Setup security tokens
- DataOutputBuffer dob = new DataOutputBuffer();
- ts.writeTokenStorageToStream(dob);
- ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ return localResources;
+ }
- // Setup the command to run the AM
- List<String> vargs = new ArrayList<String>(8);
+ private List<String> setupAMCommand(Configuration jobConf) {
+ List<String> vargs = new ArrayList<>(8);
vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
+ "/bin/java");
@@ -409,27 +387,35 @@ public class YARNRunner implements ClientProtocol {
MRApps.addLog4jSystemProperties(null, vargs, conf);
// Check for Java Lib Path usage in MAP and REDUCE configs
- warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",
- MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
- warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map",
- MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
- warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce",
- MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
- warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce",
- MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
+ "map",
+ MRJobConfig.MAP_JAVA_OPTS,
+ MRJobConfig.MAP_ENV);
+ warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
+ "map",
+ MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+ MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
+ "reduce",
+ MRJobConfig.REDUCE_JAVA_OPTS,
+ MRJobConfig.REDUCE_ENV);
+ warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
+ "reduce",
+ MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+ MRJobConfig.MAPRED_ADMIN_USER_ENV);
// Add AM admin command opts before user command opts
// so that it can be overridden by user
String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
- warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
+ warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
vargs.add(mrAppMasterAdminOptions);
-
+
// Add AM user command opts
String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
- warnForJavaLibPath(mrAppMasterUserOptions, "app master",
+ warnForJavaLibPath(mrAppMasterUserOptions, "app master",
MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
vargs.add(mrAppMasterUserOptions);
@@ -449,9 +435,14 @@ public class YARNRunner implements ClientProtocol {
Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDERR);
+ return vargs;
+ }
+ private ContainerLaunchContext setupContainerLaunchContextForAM(
+ Configuration jobConf, Map<String, LocalResource> localResources,
+ ByteBuffer securityTokens, List<String> vargs) throws IOException {
- Vector<String> vargsFinal = new Vector<String>(8);
+ Vector<String> vargsFinal = new Vector<>(8);
// Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
@@ -464,7 +455,7 @@ public class YARNRunner implements ClientProtocol {
// Setup the CLASSPATH in environment
// i.e. add { Hadoop jars, job jar, CWD } to classpath.
- Map<String, String> environment = new HashMap<String, String>();
+ Map<String, String> environment = new HashMap<>();
MRApps.setClasspath(environment, conf);
// Shell
@@ -477,28 +468,68 @@ public class YARNRunner implements ClientProtocol {
MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
// Setup the environment variables for Admin first
- MRApps.setEnvFromInputString(environment,
+ MRApps.setEnvFromInputString(environment,
conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV,
MRJobConfig.DEFAULT_MR_AM_ADMIN_USER_ENV), conf);
// Setup the environment variables (LD_LIBRARY_PATH, etc)
- MRApps.setEnvFromInputString(environment,
+ MRApps.setEnvFromInputString(environment,
conf.get(MRJobConfig.MR_AM_ENV), conf);
// Parse distributed cache
MRApps.setupDistributedCache(jobConf, localResources);
- Map<ApplicationAccessType, String> acls
- = new HashMap<ApplicationAccessType, String>(2);
+ Map<ApplicationAccessType, String> acls = new HashMap<>(2);
acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
MRJobConfig.JOB_ACL_MODIFY_JOB,
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+ return ContainerLaunchContext.newInstance(localResources, environment,
+ vargsFinal, null, securityTokens, acls);
+ }
+
+ /**
+ * Constructs all the necessary information to start the MR AM.
+ * @param jobConf the configuration for the MR job
+ * @param jobSubmitDir the directory path for the job
+ * @param ts the security credentials for the job
+ * @return ApplicationSubmissionContext
+ * @throws IOException on IO error (e.g. path resolution)
+ */
+ public ApplicationSubmissionContext createApplicationSubmissionContext(
+ Configuration jobConf, String jobSubmitDir, Credentials ts)
+ throws IOException {
+ ApplicationId applicationId = resMgrDelegate.getApplicationId();
+
+ // Setup resource requirements
+ Resource capability = recordFactory.newRecordInstance(Resource.class);
+ capability.setMemorySize(
+ conf.getInt(
+ MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+ )
+ );
+ capability.setVirtualCores(
+ conf.getInt(
+ MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+ )
+ );
+ LOG.debug("AppMaster capability = " + capability);
+
+ // Setup LocalResources
+ Map<String, LocalResource> localResources =
+ setupLocalResources(jobConf, jobSubmitDir);
+
+ // Setup security tokens
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens =
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
// Setup ContainerLaunchContext for AM container
- ContainerLaunchContext amContainer =
- ContainerLaunchContext.newInstance(localResources, environment,
- vargsFinal, null, securityTokens, acls);
+ List<String> vargs = setupAMCommand(jobConf);
+ ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
+ jobConf, localResources, securityTokens, vargs);
String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
if (regex != null && !regex.isEmpty()) {
@@ -566,7 +597,7 @@ public class YARNRunner implements ClientProtocol {
appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
- appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
+ appContext.setApplicationTags(new HashSet<>(tagsFromConf));
}
String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org