You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/10 21:11:11 UTC

[9/9] oozie git commit: Merge branch 'master' into oya

Merge branch 'master' into oya


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/80854d15
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/80854d15
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/80854d15

Branch: refs/heads/oya
Commit: 80854d151d67666132a02797de4efdbf3b2af175
Parents: 40ee320 1103882
Author: Gezapeti Cseh <ge...@gmail.com>
Authored: Wed May 10 14:09:31 2017 -0700
Committer: Gezapeti Cseh <ge...@gmail.com>
Committed: Wed May 10 14:09:31 2017 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/client/OozieClient.java    |   4 +-
 .../apache/oozie/client/rest/RestConstants.java |   2 +-
 .../main/java/org/apache/oozie/BaseEngine.java  |  18 +-
 .../org/apache/oozie/BaseLocalOozieClient.java  | 601 +++++++++++++++++++
 .../java/org/apache/oozie/LocalOozieClient.java | 245 +-------
 .../apache/oozie/LocalOozieClientBundle.java    |  93 +++
 .../org/apache/oozie/LocalOozieClientCoord.java | 328 ++--------
 .../oozie/OozieClientOperationHandler.java      | 173 ++++++
 .../java/org/apache/oozie/OozieJsonFactory.java |  55 ++
 .../oozie/action/hadoop/JavaActionExecutor.java |  15 +-
 .../oozie/action/hadoop/LauncherHelper.java     |   2 +
 .../hadoop/LauncherInputFormatClassLocator.java |  84 +++
 .../command/coord/CoordSubmitXCommand.java      |  25 -
 .../java/org/apache/oozie/local/LocalOozie.java |  93 ++-
 .../org/apache/oozie/servlet/V0JobsServlet.java |  12 +-
 .../org/apache/oozie/servlet/V1JobsServlet.java | 184 ++----
 core/src/main/resources/oozie-default.xml       |   8 +
 .../apache/oozie/TestLocalOozieClientCoord.java |  72 ++-
 .../TestLauncherInputFormatClassLocator.java    |  57 ++
 .../command/coord/TestCoordSubmitXCommand.java  |  30 -
 .../resources/coord-invalid-el-function.xml     |  35 --
 .../resources/coord-invalid-output-instance.xml |  58 --
 .../coord-multiple-input-instance4.xml          |   2 +-
 .../coord-multiple-input-start-instance2.xml    |   2 +-
 .../coord-multiple-output-instance4.xml         |   2 +-
 docs/src/site/twiki/AG_Install.twiki            |   4 +-
 docs/src/site/twiki/AG_Monitoring.twiki         |   2 +-
 .../site/twiki/CoordinatorFunctionalSpec.twiki  |   2 +-
 .../site/twiki/DG_ActionAuthentication.twiki    |   4 +-
 docs/src/site/twiki/DG_SLAMonitoring.twiki      |   2 +-
 docs/src/site/twiki/WebServicesAPI.twiki        |   6 +-
 .../src/site/twiki/WorkflowFunctionalSpec.twiki |   2 +-
 release-log.txt                                 |   4 +-
 33 files changed, 1412 insertions(+), 814 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index f3e5e32,d60a5c7..7a762ab
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@@ -49,14 -48,13 +49,15 @@@ import org.apache.hadoop.fs.FileStatus
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.fs.permission.AccessControlException;
 -import org.apache.oozie.hadoop.utils.HadoopShims;
 +import org.apache.hadoop.io.DataOutputBuffer;
  import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.ipc.RemoteException;
  import org.apache.hadoop.mapred.JobClient;
 -import org.apache.hadoop.mapred.JobConf;
 -import org.apache.hadoop.mapred.JobID;
 -import org.apache.hadoop.mapred.RunningJob;
 +import org.apache.hadoop.mapred.TaskLog;
 +import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 +import org.apache.hadoop.mapreduce.v2.util.MRApps;
 +import org.apache.hadoop.security.Credentials;
+ import org.apache.hadoop.security.UserGroupInformation;
  import org.apache.hadoop.security.token.Token;
  import org.apache.hadoop.security.token.TokenIdentifier;
  import org.apache.hadoop.util.DiskChecker;
@@@ -144,11 -119,18 +148,12 @@@ public class JavaActionExecutor extend
      private static int maxActionOutputLen;
      private static int maxExternalStatsSize;
      private static int maxFSGlobMax;
 -    private static final String SUCCEEDED = "SUCCEEDED";
 -    private static final String KILLED = "KILLED";
 -    private static final String FAILED = "FAILED";
 -    private static final String FAILED_KILLED = "FAILED/KILLED";
 +
 +    protected static final String HADOOP_USER = "user.name";
 +
      protected XLog LOG = XLog.getLog(getClass());
 -    private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
      private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
 -    public static final String CONF_HADOOP_YARN_UBER_MODE = OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE;
 -    public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
 -    public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
 -    public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
+     private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
  
      public XConfiguration workflowConf = null;
  
@@@ -166,15 -150,15 +171,15 @@@
          super(type);
      }
  
 -    public static List<Class> getCommonLauncherClasses() {
 -        List<Class> classes = new ArrayList<Class>();
 -        classes.add(LauncherMapper.class);
 +    public static List<Class<?>> getCommonLauncherClasses() {
 +        List<Class<?>> classes = new ArrayList<Class<?>>();
-         classes.add(OozieLauncherInputFormat.class);
 +        classes.add(LauncherMain.class);
+         classes.add(launcherInputFormatClassLocator.locateOrGet());
          classes.add(OozieLauncherOutputFormat.class);
          classes.add(OozieLauncherOutputCommitter.class);
 -        classes.add(LauncherMainHadoopUtils.class);
 -        classes.add(HadoopShims.class);
          classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
 +        classes.add(LauncherAM.class);
 +        classes.add(LauncherAMCallbackNotifier.class);
          return classes;
      }
  

http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
index 5ac1a16,0000000..442f9a3
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
@@@ -1,321 -1,0 +1,323 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.oozie.action.hadoop;
 +
 +import java.io.BufferedReader;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.math.BigInteger;
 +import java.security.MessageDigest;
 +import java.security.NoSuchAlgorithmException;
 +import java.security.PrivilegedExceptionAction;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.SequenceFile;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapred.Counters;
 +import org.apache.hadoop.mapred.RunningJob;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.oozie.client.OozieClient;
 +import org.apache.oozie.client.WorkflowAction;
 +import org.apache.oozie.service.HadoopAccessorException;
 +import org.apache.oozie.service.HadoopAccessorService;
 +import org.apache.oozie.service.Services;
 +import org.apache.oozie.service.URIHandlerService;
 +import org.apache.oozie.service.UserGroupInformationService;
 +import org.apache.oozie.util.IOUtils;
 +import org.apache.oozie.util.PropertiesUtils;
 +
 +// TODO: we're no longer using Launcher Mapper -- give this class a better name
 +public class LauncherHelper {
 +
 +    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
 +
++    private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
++
 +    public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
 +            throws HadoopAccessorException, IOException {
 +        String jobId = null;
 +        Path recoveryFile = new Path(actionDir, recoveryId);
 +        FileSystem fs = Services.get().get(HadoopAccessorService.class)
 +                .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
 +
 +        if (fs.exists(recoveryFile)) {
 +            InputStream is = fs.open(recoveryFile);
 +            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
 +            jobId = reader.readLine();
 +            reader.close();
 +        }
 +        return jobId;
 +
 +    }
 +
 +    public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
 +        // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
 +        // <configuration> property
 +        if (javaMainClass != null && !javaMainClass.equals("")) {
 +            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
 +        }
 +    }
 +
 +    public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
 +        for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
 +            launcherConf.set(entry.getKey(), entry.getValue());
 +        }
 +    }
 +
 +    public static void setupMainArguments(Configuration launcherConf, String[] args) {
 +        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
 +        for (int i = 0; i < args.length; i++) {
 +            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
 +        }
 +    }
 +
 +    public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
 +        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
 +    }
 +
 +    /**
 +     * Set the maximum value of stats data
 +     *
 +     * @param launcherConf the oozie launcher configuration
 +     * @param maxStatsData the maximum allowed size of stats data
 +     */
 +    public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
 +        launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
 +    }
 +
 +    /**
 +     * Set the maximum number of globbed files/dirs
 +     *
 +     * @param launcherConf the oozie launcher configuration
 +     * @param fsGlobMax the maximum number of files/dirs for FS operation
 +     */
 +    public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
 +        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
 +    }
 +
 +    public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir,
 +            String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
 +
 +        launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
 +        launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
 +        launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
 +        launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
 +        launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
 +
 +        actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
 +        actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
 +
 +        if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
 +          List<String> purgedEntries = new ArrayList<String>();
 +          Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
 +          for (String entry : entries) {
 +            if (entry.contains("#")) {
 +              purgedEntries.add(entry);
 +            }
 +          }
 +          actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
 +          launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
 +        }
 +    }
 +
 +    public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag,
 +                                                long launcherTime)
 +            throws NoSuchAlgorithmException {
 +        launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime);
 +        // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
 +        String tag = getTag(launcherTag);
 +        // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
 +        // mapreduce.job.tags should only go to child job launch by launcher.
 +        actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag);
 +    }
 +
 +    public static String getTag(String launcherTag) throws NoSuchAlgorithmException {
 +        MessageDigest digest = MessageDigest.getInstance("MD5");
 +        digest.update(launcherTag.getBytes(), 0, launcherTag.length());
 +        String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
 +        return md5;
 +    }
 +
 +    public static boolean isMainDone(RunningJob runningJob) throws IOException {
 +        return runningJob.isComplete();
 +    }
 +
 +    public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
 +        boolean succeeded = runningJob.isSuccessful();
 +        if (succeeded) {
 +            Counters counters = runningJob.getCounters();
 +            if (counters != null) {
 +                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
 +                if (group != null) {
 +                    succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
 +                }
 +            }
 +        }
 +        return succeeded;
 +    }
 +
 +    /**
 +     * Determine whether action has external child jobs or not
 +     * @param actionData
 +     * @return true/false
 +     * @throws IOException
 +     */
 +    public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException {
 +        return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
 +    }
 +
 +    /**
 +     * Determine whether action has output data or not
 +     * @param actionData
 +     * @return true/false
 +     * @throws IOException
 +     */
 +    public static boolean hasOutputData(Map<String, String> actionData) throws IOException {
 +        return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS);
 +    }
 +
 +    /**
 +     * Determine whether action has external stats or not
 +     * @param actionData
 +     * @return true/false
 +     * @throws IOException
 +     */
 +    public static boolean hasStatsData(Map<String, String> actionData) throws IOException{
 +        return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS);
 +    }
 +
 +    /**
 +     * Determine whether action has new id (id swap) or not
 +     * @param actionData
 +     * @return true/false
 +     * @throws IOException
 +     */
 +    public static boolean hasIdSwap(Map<String, String> actionData) throws IOException {
 +        return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID);
 +    }
 +
 +    /**
 +     * Get the sequence file path storing all action data
 +     * @param actionDir
 +     * @return
 +     */
 +    public static Path getActionDataSequenceFilePath(Path actionDir) {
 +        return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE);
 +    }
 +
 +    /**
 +     * Utility function to load the contents of action data sequence file into
 +     * memory object
 +     *
 +     * @param fs Action Filesystem
 +     * @param actionDir Path
 +     * @param conf Configuration
 +     * @return Map action data
 +     * @throws IOException
 +     * @throws InterruptedException
 +     */
 +    public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
 +            throws IOException, InterruptedException {
 +        UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
 +        UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
 +
 +        return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
 +            @Override
 +            public Map<String, String> run() throws IOException {
 +                Map<String, String> ret = new HashMap<String, String>();
 +                Path seqFilePath = getActionDataSequenceFilePath(actionDir);
 +                if (fs.exists(seqFilePath)) {
 +                    SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
 +                    Text key = new Text(), value = new Text();
 +                    while (seqFile.next(key, value)) {
 +                        ret.put(key.toString(), value.toString());
 +                    }
 +                    seqFile.close();
 +                }
 +                else { // maintain backward-compatibility. to be deprecated
 +                    org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
 +                    InputStream is;
 +                    BufferedReader reader = null;
 +                    Properties props;
 +                    if (files != null && files.length > 0) {
 +                        for (int x = 0; x < files.length; x++) {
 +                            Path file = files[x].getPath();
 +                            if (file.equals(new Path(actionDir, "externalChildIds.properties"))) {
 +                                is = fs.open(file);
 +                                reader = new BufferedReader(new InputStreamReader(is));
 +                                ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
 +                                        IOUtils.getReaderAsString(reader, -1));
 +                            }
 +                            else if (file.equals(new Path(actionDir, "newId.properties"))) {
 +                                is = fs.open(file);
 +                                reader = new BufferedReader(new InputStreamReader(is));
 +                                props = PropertiesUtils.readProperties(reader, -1);
 +                                ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id"));
 +                            }
 +                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
 +                                int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
 +                                        2 * 1024);
 +                                is = fs.open(file);
 +                                reader = new BufferedReader(new InputStreamReader(is));
 +                                ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
 +                                        .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
 +                            }
 +                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) {
 +                                int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
 +                                        Integer.MAX_VALUE);
 +                                is = fs.open(file);
 +                                reader = new BufferedReader(new InputStreamReader(is));
 +                                ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils
 +                                        .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
 +                            }
 +                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
 +                                is = fs.open(file);
 +                                reader = new BufferedReader(new InputStreamReader(is));
 +                                ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
 +                            }
 +                        }
 +                    }
 +                }
 +                return ret;
 +            }
 +        });
 +    }
 +
 +    public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) {
 +        String tag;
 +        if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
 +            tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
 +        } else if (parentId != null) {
 +            tag = parentId + "@" + wfAction.getName();
 +        } else {
 +            tag = wfAction.getId();
 +        }
 +        return tag;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --cc core/src/main/resources/oozie-default.xml
index 4ab7512,076401d..61b1d88
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@@ -1835,6 -1835,39 +1835,14 @@@ will be the requeue interval for the ac
      </property>
  
      <property>
 -        <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name>
 -        <value>true</value>
 -        <description>
 -            Enables Uber Mode for the launcher job in YARN/Hadoop 2 (no effect in Hadoop 1) for all action types by default.
 -            This can be overridden on a per-action-type basis by setting
 -            oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site.xml (where #action-type# is the action
 -            type; for example, "pig").  And that can be overridden on a per-action basis by setting
 -            oozie.launcher.mapreduce.job.ubertask.enable in an action's configuration section in a workflow.  In summary, the
 -            priority is this:
 -            1. action's configuration section in a workflow
 -            2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site
 -            3. oozie.action.launcher.mapreduce.job.ubertask.enable in oozie-site
 -        </description>
 -    </property>
 -
 -    <property>
 -        <name>oozie.action.shell.launcher.mapreduce.job.ubertask.enable</name>
 -        <value>false</value>
 -        <description>
 -            The Shell action may have issues with the $PATH environment when using Uber Mode, and so Uber Mode is disabled by
 -            default for it.  See oozie.action.launcher.mapreduce.job.ubertask.enable
 -        </description>
 -    </property>
 -
 -    <property>
+         <name>oozie.action.launcher.mapreduce.input.format.class</name>
+         <value>org.apache.oozie.action.hadoop.OozieLauncherInputFormat</value>
+         <description>
+             Make the Launcher Mapper map-only job's InputFormat class pluggable in order to provide alternative implementations.
+         </description>
+     </property>
+ 
+     <property>
          <name>oozie.action.spark.setup.hadoop.conf.dir</name>
          <value>false</value>
          <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/release-log.txt
----------------------------------------------------------------------