You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by te...@apache.org on 2017/02/22 23:39:51 UTC

hadoop git commit: MAPREDUCE-6825. YARNRunner#createApplicationSubmissionContext method is longer than 150 lines (Contributed by Gergely Novák via Daniel Templeton)

Repository: hadoop
Updated Branches:
  refs/heads/trunk d150f061f -> 732ee6f0b


MAPREDUCE-6825. YARNRunner#createApplicationSubmissionContext method is longer than 150 lines (Contributed by Gergely Nov�k via Daniel Templeton)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/732ee6f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/732ee6f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/732ee6f0

Branch: refs/heads/trunk
Commit: 732ee6f0b58a12500198c0d934cc570c7490b520
Parents: d150f06
Author: Daniel Templeton <te...@apache.org>
Authored: Wed Feb 22 15:38:11 2017 -0800
Committer: Daniel Templeton <te...@apache.org>
Committed: Wed Feb 22 15:38:11 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/mapred/YARNRunner.java    | 141 +++++++++++--------
 1 file changed, 86 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/732ee6f0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 98fe553..228c6af 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -291,8 +291,7 @@ public class YARNRunner implements ClientProtocol {
   throws IOException, InterruptedException {
     
     addHistoryToken(ts);
-    
-    // Construct necessary information to start the MR AM
+
     ApplicationSubmissionContext appContext =
       createApplicationSubmissionContext(conf, jobSubmitDir, ts);
 
@@ -331,34 +330,15 @@ public class YARNRunner implements ClientProtocol {
     return rsrc;
   }
 
-  public ApplicationSubmissionContext createApplicationSubmissionContext(
-      Configuration jobConf,
-      String jobSubmitDir, Credentials ts) throws IOException {
-    ApplicationId applicationId = resMgrDelegate.getApplicationId();
-
-    // Setup resource requirements
-    Resource capability = recordFactory.newRecordInstance(Resource.class);
-    capability.setMemorySize(
-        conf.getInt(
-            MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
-            )
-        );
-    capability.setVirtualCores(
-        conf.getInt(
-            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
-            )
-        );
-    LOG.debug("AppMaster capability = " + capability);
-
-    // Setup LocalResources
-    Map<String, LocalResource> localResources =
-        new HashMap<String, LocalResource>();
+  private Map<String, LocalResource> setupLocalResources(Configuration jobConf,
+      String jobSubmitDir) throws IOException {
+    Map<String, LocalResource> localResources = new HashMap<>();
 
     Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
 
-    URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext.getDefaultFileSystem()
-            .resolvePath(
-                defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+    URL yarnUrlForJobSubmitDir = URL.fromPath(defaultFileContext
+        .getDefaultFileSystem().resolvePath(
+            defaultFileContext.makeQualified(new Path(jobSubmitDir))));
     LOG.debug("Creating setup context, jobSubmitDir url is "
         + yarnUrlForJobSubmitDir);
 
@@ -371,7 +351,7 @@ public class YARNRunner implements ClientProtocol {
           FileContext.getFileContext(jobJarPath.toUri(), jobConf),
           jobJarPath,
           LocalResourceType.PATTERN);
-      String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
+      String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
           JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
       rc.setPattern(pattern);
       localResources.put(MRJobConfig.JOB_JAR, rc);
@@ -392,13 +372,11 @@ public class YARNRunner implements ClientProtocol {
               new Path(jobSubmitDir, s), LocalResourceType.FILE));
     }
 
-    // Setup security tokens
-    DataOutputBuffer dob = new DataOutputBuffer();
-    ts.writeTokenStorageToStream(dob);
-    ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    return localResources;
+  }
 
-    // Setup the command to run the AM
-    List<String> vargs = new ArrayList<String>(8);
+  private List<String> setupAMCommand(Configuration jobConf) {
+    List<String> vargs = new ArrayList<>(8);
     vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
         + "/bin/java");
 
@@ -409,27 +387,35 @@ public class YARNRunner implements ClientProtocol {
     MRApps.addLog4jSystemProperties(null, vargs, conf);
 
     // Check for Java Lib Path usage in MAP and REDUCE configs
-    warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", 
-        MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
-    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", 
-        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
-    warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", 
-        MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
-    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", 
-        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
+        "map",
+        MRJobConfig.MAP_JAVA_OPTS,
+        MRJobConfig.MAP_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
+        "map",
+        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+        MRJobConfig.MAPRED_ADMIN_USER_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
+        "reduce",
+        MRJobConfig.REDUCE_JAVA_OPTS,
+        MRJobConfig.REDUCE_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
+        "reduce",
+        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+        MRJobConfig.MAPRED_ADMIN_USER_ENV);
 
     // Add AM admin command opts before user command opts
     // so that it can be overridden by user
     String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
         MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
-    warnForJavaLibPath(mrAppMasterAdminOptions, "app master", 
+    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
         MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
     vargs.add(mrAppMasterAdminOptions);
-    
+
     // Add AM user command opts
     String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
         MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
-    warnForJavaLibPath(mrAppMasterUserOptions, "app master", 
+    warnForJavaLibPath(mrAppMasterUserOptions, "app master",
         MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
     vargs.add(mrAppMasterUserOptions);
 
@@ -449,9 +435,14 @@ public class YARNRunner implements ClientProtocol {
         Path.SEPARATOR + ApplicationConstants.STDOUT);
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
         Path.SEPARATOR + ApplicationConstants.STDERR);
+    return vargs;
+  }
 
+  private ContainerLaunchContext setupContainerLaunchContextForAM(
+      Configuration jobConf, Map<String, LocalResource> localResources,
+      ByteBuffer securityTokens, List<String> vargs) throws IOException {
 
-    Vector<String> vargsFinal = new Vector<String>(8);
+    Vector<String> vargsFinal = new Vector<>(8);
     // Final command
     StringBuilder mergedCommand = new StringBuilder();
     for (CharSequence str : vargs) {
@@ -464,7 +455,7 @@ public class YARNRunner implements ClientProtocol {
 
     // Setup the CLASSPATH in environment
     // i.e. add { Hadoop jars, job jar, CWD } to classpath.
-    Map<String, String> environment = new HashMap<String, String>();
+    Map<String, String> environment = new HashMap<>();
     MRApps.setClasspath(environment, conf);
 
     // Shell
@@ -477,28 +468,68 @@ public class YARNRunner implements ClientProtocol {
         MRApps.crossPlatformifyMREnv(conf, Environment.PWD), conf);
 
     // Setup the environment variables for Admin first
-    MRApps.setEnvFromInputString(environment, 
+    MRApps.setEnvFromInputString(environment,
         conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV,
             MRJobConfig.DEFAULT_MR_AM_ADMIN_USER_ENV), conf);
     // Setup the environment variables (LD_LIBRARY_PATH, etc)
-    MRApps.setEnvFromInputString(environment, 
+    MRApps.setEnvFromInputString(environment,
         conf.get(MRJobConfig.MR_AM_ENV), conf);
 
     // Parse distributed cache
     MRApps.setupDistributedCache(jobConf, localResources);
 
-    Map<ApplicationAccessType, String> acls
-        = new HashMap<ApplicationAccessType, String>(2);
+    Map<ApplicationAccessType, String> acls = new HashMap<>(2);
     acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
         MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
     acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
         MRJobConfig.JOB_ACL_MODIFY_JOB,
         MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
 
+    return ContainerLaunchContext.newInstance(localResources, environment,
+        vargsFinal, null, securityTokens, acls);
+  }
+
+  /**
+   * Constructs all the necessary information to start the MR AM.
+   * @param jobConf the configuration for the MR job
+   * @param jobSubmitDir the directory path for the job
+   * @param ts the security credentials for the job
+   * @return ApplicationSubmissionContext
+   * @throws IOException on IO error (e.g. path resolution)
+   */
+  public ApplicationSubmissionContext createApplicationSubmissionContext(
+      Configuration jobConf, String jobSubmitDir, Credentials ts)
+      throws IOException {
+    ApplicationId applicationId = resMgrDelegate.getApplicationId();
+
+    // Setup resource requirements
+    Resource capability = recordFactory.newRecordInstance(Resource.class);
+    capability.setMemorySize(
+        conf.getInt(
+            MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+        )
+    );
+    capability.setVirtualCores(
+        conf.getInt(
+            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+        )
+    );
+    LOG.debug("AppMaster capability = " + capability);
+
+    // Setup LocalResources
+    Map<String, LocalResource> localResources =
+        setupLocalResources(jobConf, jobSubmitDir);
+
+    // Setup security tokens
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ts.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
     // Setup ContainerLaunchContext for AM container
-    ContainerLaunchContext amContainer =
-        ContainerLaunchContext.newInstance(localResources, environment,
-          vargsFinal, null, securityTokens, acls);
+    List<String> vargs = setupAMCommand(jobConf);
+    ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
+        jobConf, localResources, securityTokens, vargs);
 
     String regex = conf.get(MRJobConfig.MR_JOB_SEND_TOKEN_CONF);
     if (regex != null && !regex.isEmpty()) {
@@ -566,7 +597,7 @@ public class YARNRunner implements ClientProtocol {
 
     appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
     if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
-      appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
+      appContext.setApplicationTags(new HashSet<>(tagsFromConf));
     }
 
     String jobPriority = jobConf.get(MRJobConfig.PRIORITY);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org