You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/10 21:11:08 UTC

[6/9] oozie git commit: OOZIE-2874 Make the Launcher Mapper map-only job's InputFormat class pluggable (andras.piros via gezapeti)

OOZIE-2874 Make the Launcher Mapper map-only job's InputFormat class pluggable (andras.piros via gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4cb29f81
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4cb29f81
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4cb29f81

Branch: refs/heads/oya
Commit: 4cb29f81492d3b4250f43898a1b3ffae2b28e012
Parents: f5554dd
Author: Gezapeti Cseh <ge...@gmail.com>
Authored: Tue May 9 10:43:54 2017 -0700
Committer: Gezapeti Cseh <ge...@gmail.com>
Committed: Tue May 9 10:43:54 2017 -0700

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 22 +++++++++++++-------
 .../action/hadoop/LauncherMapperHelper.java     |  4 +++-
 core/src/main/resources/oozie-default.xml       |  8 +++++++
 release-log.txt                                 |  1 +
 4 files changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index f62c997..d60a5c7 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -103,7 +102,9 @@ public class JavaActionExecutor extends ActionExecutor {
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
     public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
     public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
-    public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs";
+    public static final String OOZIE_ACTION_LAUNCHER_PREFIX = ActionExecutor.CONF_PREFIX  + "launcher.";
+    public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART =
+            OOZIE_ACTION_LAUNCHER_PREFIX + "am.restart.kill.childjobs";
     public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
     public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts";
     public static final String HADOOP_MAP_JAVA_OPTS = "mapreduce.map.java.opts";
@@ -125,10 +126,11 @@ public class JavaActionExecutor extends ActionExecutor {
     protected XLog LOG = XLog.getLog(getClass());
     private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
     private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
-    public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE;
+    public static final String CONF_HADOOP_YARN_UBER_MODE = OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE;
     public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
     public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
     public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
+    private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
 
     public XConfiguration workflowConf = null;
 
@@ -151,7 +153,7 @@ public class JavaActionExecutor extends ActionExecutor {
     public static List<Class> getCommonLauncherClasses() {
         List<Class> classes = new ArrayList<Class>();
         classes.add(LauncherMapper.class);
-        classes.add(OozieLauncherInputFormat.class);
+        classes.add(launcherInputFormatClassLocator.locateOrGet());
         classes.add(OozieLauncherOutputFormat.class);
         classes.add(OozieLauncherOutputCommitter.class);
         classes.add(LauncherMainHadoopUtils.class);
@@ -297,12 +299,12 @@ public class JavaActionExecutor extends ActionExecutor {
         // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
         // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
         if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
-            if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) {
-                if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) {
+            if (ConfigurationService.get(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE).length() > 0) {
+                if (ConfigurationService.getBoolean(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE)) {
                     launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
                 }
             } else {
-                if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) {
+                if (ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE)) {
                     launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
                 }
             }
@@ -312,7 +314,7 @@ public class JavaActionExecutor extends ActionExecutor {
     void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
         // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
         if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
-                && ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) {
+                && ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_TIMELINE_SERVICE_ENABLED)) {
             String cacheFiles = launcherConf.get("mapred.cache.files");
             if (cacheFiles != null && cacheFiles.contains("tez-site.xml")) {
                 launcherConf.setBoolean(HADOOP_YARN_TIMELINE_SERVICE_ENABLED, true);
@@ -1778,4 +1780,8 @@ public class JavaActionExecutor extends ActionExecutor {
         return workflowConf;
 
     }
+
+    private String getActionTypeLauncherPrefix() {
+        return "oozie.action." + getType() + ".launcher.";
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index 9609fdc..72ed2f1 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -57,6 +57,8 @@ public class LauncherMapperHelper {
 
     public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
 
+    private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
+
     public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
             throws HadoopAccessorException, IOException {
         String jobId = null;
@@ -160,7 +162,7 @@ public class LauncherMapperHelper {
             IOUtils.closeSafely(os);
         }
 
-        launcherConf.setInputFormat(OozieLauncherInputFormat.class);
+        launcherConf.setInputFormat(launcherInputFormatClassLocator.locateOrGet());
         launcherConf.setOutputFormat(OozieLauncherOutputFormat.class);
         launcherConf.setOutputCommitter(OozieLauncherOutputCommitter.class);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index e7a48a0..076401d 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1860,6 +1860,14 @@ will be the requeue interval for the actions which are waiting for a long time w
     </property>
 
     <property>
+        <name>oozie.action.launcher.mapreduce.input.format.class</name>
+        <value>org.apache.oozie.action.hadoop.OozieLauncherInputFormat</value>
+        <description>
+            Make the Launcher Mapper map-only job's InputFormat class pluggable in order to provide alternative implementations.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.action.spark.setup.hadoop.conf.dir</name>
         <value>false</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/4cb29f81/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index cb141cb..ee65cd1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2874 Make the Launcher Mapper map-only job's InputFormat class pluggable (andras.piros via gezapeti)
 OOZIE-2751 LocalOozieClient is missing methods from OozieClient (abhishekbafna via rkanter)
 OOZIE-2870 non working examples in oozie documentation coordinator spec (andras.piros via pbacsko)
 OOZIE-2827 amend More directly view of the coordinator’s history from perspective of workflow action. (Alonzo Zhou via pbacsko)