You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:28:00 UTC

[08/10] oozie git commit: OOZIE-1770 Create Oozie Application Master for YARN (asasvari, pbacsko, rkanter, gezapeti)

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
deleted file mode 100644
index 72ed2f1..0000000
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.action.hadoop;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorException;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.UserGroupInformationService;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.PropertiesUtils;
-
-public class LauncherMapperHelper {
-
-    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
-
-    private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
-
-    public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
-            throws HadoopAccessorException, IOException {
-        String jobId = null;
-        Path recoveryFile = new Path(actionDir, recoveryId);
-        FileSystem fs = Services.get().get(HadoopAccessorService.class)
-                .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
-
-        if (fs.exists(recoveryFile)) {
-            InputStream is = fs.open(recoveryFile);
-            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-            jobId = reader.readLine();
-            reader.close();
-        }
-        return jobId;
-
-    }
-
-    public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
-        // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
-        // <configuration> property
-        if (javaMainClass != null && !javaMainClass.equals("")) {
-            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
-        }
-    }
-
-    public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
-        for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
-            launcherConf.set(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public static void setupMainArguments(Configuration launcherConf, String[] args) {
-        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
-        for (int i = 0; i < args.length; i++) {
-            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
-        }
-    }
-
-    public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
-        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
-    }
-
-    /**
-     * Set the maximum value of stats data
-     *
-     * @param launcherConf the oozie launcher configuration
-     * @param maxStatsData the maximum allowed size of stats data
-     */
-    public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
-        launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
-    }
-
-    /**
-     * Set the maximum number of globbed files/dirs
-     *
-     * @param launcherConf the oozie launcher configuration
-     * @param fsGlobMax the maximum number of files/dirs for FS operation
-     */
-    public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
-        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
-    }
-
-    public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir,
-            String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
-
-        launcherConf.setMapperClass(LauncherMapper.class);
-        launcherConf.setSpeculativeExecution(false);
-        launcherConf.setNumMapTasks(1);
-        launcherConf.setNumReduceTasks(0);
-
-        launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
-        launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
-        launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
-        launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
-        launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
-
-        actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
-        actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
-
-        if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
-          List<String> purgedEntries = new ArrayList<String>();
-          Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
-          for (String entry : entries) {
-            if (entry.contains("#")) {
-              purgedEntries.add(entry);
-            }
-          }
-          actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
-          launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
-        }
-
-        FileSystem fs =
-          Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
-                                                                           actionDir.toUri(), launcherConf);
-        fs.mkdirs(actionDir);
-
-        OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
-        try {
-            actionConf.writeXml(os);
-        } finally {
-            IOUtils.closeSafely(os);
-        }
-
-        launcherConf.setInputFormat(launcherInputFormatClassLocator.locateOrGet());
-        launcherConf.setOutputFormat(OozieLauncherOutputFormat.class);
-        launcherConf.setOutputCommitter(OozieLauncherOutputCommitter.class);
-    }
-
-    public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String launcherTag,
-                                                long launcherTime)
-            throws NoSuchAlgorithmException {
-        launcherJobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, launcherTime);
-        // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
-        String tag = getTag(launcherTag);
-        // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
-        // mapreduce.job.tags should only go to child job launch by launcher.
-        actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag);
-    }
-
-    public static String getTag(String launcherTag) throws NoSuchAlgorithmException {
-        MessageDigest digest = MessageDigest.getInstance("MD5");
-        digest.update(launcherTag.getBytes(), 0, launcherTag.length());
-        String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
-        return md5;
-    }
-
-    public static boolean isMainDone(RunningJob runningJob) throws IOException {
-        return runningJob.isComplete();
-    }
-
-    public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
-        boolean succeeded = runningJob.isSuccessful();
-        if (succeeded) {
-            Counters counters = runningJob.getCounters();
-            if (counters != null) {
-                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
-                if (group != null) {
-                    succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
-                }
-            }
-        }
-        return succeeded;
-    }
-
-    /**
-     * Determine whether action has external child jobs or not
-     * @param actionData
-     * @return true/false
-     * @throws IOException
-     */
-    public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException {
-        return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
-    }
-
-    /**
-     * Determine whether action has output data or not
-     * @param actionData
-     * @return true/false
-     * @throws IOException
-     */
-    public static boolean hasOutputData(Map<String, String> actionData) throws IOException {
-        return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS);
-    }
-
-    /**
-     * Determine whether action has external stats or not
-     * @param actionData
-     * @return true/false
-     * @throws IOException
-     */
-    public static boolean hasStatsData(Map<String, String> actionData) throws IOException{
-        return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS);
-    }
-
-    /**
-     * Determine whether action has new id (id swap) or not
-     * @param actionData
-     * @return true/false
-     * @throws IOException
-     */
-    public static boolean hasIdSwap(Map<String, String> actionData) throws IOException {
-        return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID);
-    }
-
-    /**
-     * Get the sequence file path storing all action data
-     * @param actionDir
-     * @return
-     */
-    public static Path getActionDataSequenceFilePath(Path actionDir) {
-        return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE);
-    }
-
-    /**
-     * Utility function to load the contents of action data sequence file into
-     * memory object
-     *
-     * @param fs Action Filesystem
-     * @param actionDir Path
-     * @param conf Configuration
-     * @return Map action data
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
-            throws IOException, InterruptedException {
-        UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
-        UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
-
-        return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
-            @Override
-            public Map<String, String> run() throws IOException {
-                Map<String, String> ret = new HashMap<String, String>();
-                Path seqFilePath = getActionDataSequenceFilePath(actionDir);
-                if (fs.exists(seqFilePath)) {
-                    SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
-                    Text key = new Text(), value = new Text();
-                    while (seqFile.next(key, value)) {
-                        ret.put(key.toString(), value.toString());
-                    }
-                    seqFile.close();
-                }
-                else { // maintain backward-compatibility. to be deprecated
-                    org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
-                    InputStream is;
-                    BufferedReader reader = null;
-                    Properties props;
-                    if (files != null && files.length > 0) {
-                        for (int x = 0; x < files.length; x++) {
-                            Path file = files[x].getPath();
-                            if (file.equals(new Path(actionDir, "externalChildIds.properties"))) {
-                                is = fs.open(file);
-                                reader = new BufferedReader(new InputStreamReader(is));
-                                ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
-                                        IOUtils.getReaderAsString(reader, -1));
-                            }
-                            else if (file.equals(new Path(actionDir, "newId.properties"))) {
-                                is = fs.open(file);
-                                reader = new BufferedReader(new InputStreamReader(is));
-                                props = PropertiesUtils.readProperties(reader, -1);
-                                ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id"));
-                            }
-                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
-                                int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
-                                        2 * 1024);
-                                is = fs.open(file);
-                                reader = new BufferedReader(new InputStreamReader(is));
-                                ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
-                                        .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
-                            }
-                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) {
-                                int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
-                                        Integer.MAX_VALUE);
-                                is = fs.open(file);
-                                reader = new BufferedReader(new InputStreamReader(is));
-                                ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils
-                                        .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
-                            }
-                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
-                                is = fs.open(file);
-                                reader = new BufferedReader(new InputStreamReader(is));
-                                ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
-                            }
-                        }
-                    }
-                }
-                return ret;
-            }
-        });
-    }
-
-    public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) {
-        String tag;
-        if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
-            tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
-        } else if (parentId != null) {
-            tag = parentId + "@" + wfAction.getName();
-        } else {
-            tag = wfAction.getId();
-        }
-        return tag;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 55c9372..634a1cb 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -21,7 +21,11 @@ package org.apache.oozie.action.hadoop;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,6 +35,14 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
@@ -39,7 +51,11 @@ import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 import org.jdom.Namespace;
-import org.json.simple.JSONObject;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.io.Closeables;
 
 public class MapReduceActionExecutor extends JavaActionExecutor {
 
@@ -47,16 +63,16 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
     public static final String HADOOP_COUNTERS = "hadoop.counters";
     public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
     private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
+    public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url";
     private XLog log = XLog.getLog(getClass());
 
     public MapReduceActionExecutor() {
         super("map-reduce");
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
-    public List<Class> getLauncherClasses() {
-        List<Class> classes = new ArrayList<Class>();
+    public List<Class<?>> getLauncherClasses() {
+        List<Class<?>> classes = new ArrayList<Class<?>>();
         try {
             classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME));
         }
@@ -97,9 +113,25 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException {
+    Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
+            throws ActionExecutorException {
         super.setupLauncherConf(conf, actionXml, appPath, context);
         conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
+
+        return conf;
+    }
+
+    private void injectConfigClass(Configuration conf, Element actionXml) {
+        // Inject config-class for launcher to use for action
+        Element e = actionXml.getChild("config-class", actionXml.getNamespace());
+        if (e != null) {
+            conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
+        }
+    }
+
+    @Override
+    protected Configuration createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
+        Configuration conf = super.createBaseHadoopConf(context, actionXml, loadResources);
         return conf;
     }
 
@@ -108,6 +140,8 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
     Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
             throws ActionExecutorException {
         boolean regularMR = false;
+
+        injectConfigClass(actionConf, actionXml);
         Namespace ns = actionXml.getNamespace();
         if (actionXml.getChild("streaming", ns) != null) {
             Element streamingXml = actionXml.getChild("streaming", ns);
@@ -193,7 +227,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
         try {
             if (action.getStatus() == WorkflowAction.Status.OK) {
                 Element actionXml = XmlUtils.parseXml(action.getConf());
-                JobConf jobConf = createBaseHadoopConf(context, actionXml);
+                Configuration jobConf = createBaseHadoopConf(context, actionXml);
                 jobClient = createJobClient(context, jobConf);
                 RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs()));
                 if (runningJob == null) {
@@ -248,7 +282,8 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
     }
 
     // Return the value of the specified configuration property
-    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException {
+    private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue)
+            throws ActionExecutorException {
         try {
             String ret = defaultValue;
             if (actionConf != null) {
@@ -267,26 +302,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private JSONObject counterstoJson(Counters counters) {
-
-        if (counters == null) {
-            return null;
-        }
-
-        JSONObject groups = new JSONObject();
-        for (String gName : counters.getGroupNames()) {
-            JSONObject group = new JSONObject();
-            for (Counters.Counter counter : counters.getGroup(gName)) {
-                String cName = counter.getName();
-                Long cValue = counter.getCounter();
-                group.put(cName, cValue);
-            }
-            groups.put(gName, group);
-        }
-        return groups;
-    }
-
     /**
      * Return the sharelib name for the action.
      *
@@ -299,25 +314,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
         return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
     }
 
-    @Override
-    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
-            Configuration actionConf) throws ActionExecutorException {
-        // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
-        // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
-        // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
-        // argument and we can just look up the uber jar in the actionConf argument.
-        JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
-        Namespace ns = actionXml.getNamespace();
-        if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
-            // Set for uber jar
-            String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR);
-            if (uberJar != null && uberJar.trim().length() > 0) {
-                launcherJobConf.setJar(uberJar);
-            }
-        }
-        return launcherJobConf;
-    }
-
     public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader,
                                     String[] recordReaderMapping, String[] env) {
         if (mapper != null) {
@@ -329,18 +325,93 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
         if (recordReader != null) {
             conf.set("oozie.streaming.record-reader", recordReader);
         }
-        MapReduceMain.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
-        MapReduceMain.setStrings(conf, "oozie.streaming.env", env);
+        ActionUtils.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping);
+        ActionUtils.setStrings(conf, "oozie.streaming.env", env);
     }
 
     @Override
-    protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
-        RunningJob runningJob = null;
-        String jobId = getActualExternalId(action);
-        if (jobId != null) {
-            runningJob = jobClient.getJob(JobID.forName(jobId));
+    protected void injectCallback(Context context, Configuration conf) {
+        // add callback for the MapReduce job
+        String callback = context.getCallbackUrl("$jobStatus");
+        String originalCallbackURL = conf.get(JOB_END_NOTIFICATION_URL);
+        if (originalCallbackURL != null) {
+            LOG.warn("Overriding the action job end notification URI. Original value: {0}", originalCallbackURL);
+        }
+        conf.set(JOB_END_NOTIFICATION_URL, callback);
+
+        super.injectCallback(context, conf);
+    }
+
+    @Override
+    protected boolean needToAddMapReduceToClassPath() {
+        return true;
+    }
+
+    @Override
+    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
+        Map<String, String> actionData = Collections.emptyMap();
+        Configuration jobConf = null;
+
+        try {
+            FileSystem actionFs = context.getAppFileSystem();
+            Element actionXml = XmlUtils.parseXml(action.getConf());
+            jobConf = createBaseHadoopConf(context, actionXml);
+            Path actionDir = context.getActionDir();
+            actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
+        } catch (Exception e) {
+            LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e);
+            throw convertException(e);
+        }
+
+        final String newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
+
+        // check the Hadoop job if newID is defined (which should be the case here) - otherwise perform the normal check()
+        if (newId != null) {
+            boolean jobCompleted;
+            JobClient jobClient = null;
+            boolean exception = false;
+
+            try {
+                jobClient = createJobClient(context, new JobConf(jobConf));
+                RunningJob runningJob = jobClient.getJob(JobID.forName(newId));
+
+                if (runningJob == null) {
+                    context.setExternalStatus(FAILED);
+                    throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
+                            "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
+                            action.getId());
+                }
+
+                jobCompleted = runningJob.isComplete();
+            } catch (Exception e) {
+                LOG.warn("Unable to check the state of a running MapReduce job -"
+                        + " please check the health of the Job History Server!", e);
+                exception = true;
+                throw convertException(e);
+            } finally {
+                if (jobClient != null) {
+                    try {
+                        jobClient.close();
+                    } catch (Exception e) {
+                        if (exception) {
+                            LOG.error("JobClient error (not re-throwing due to a previous error): ", e);
+                        } else {
+                            throw convertException(e);
+                        }
+                    }
+                }
+            }
+
+            // run original check() if the MR action is completed or there are errors - otherwise mark it as RUNNING
+            if (jobCompleted || actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
+                super.check(context, action);
+            } else {
+                context.setExternalStatus(RUNNING);
+                context.setExternalChildIDs(newId);
+            }
+        } else {
+            super.check(context, action);
         }
-        return runningJob;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
index 581d3b3..d8b1f03 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java
@@ -29,9 +29,7 @@ import org.apache.oozie.action.ActionExecutor.Context;
 import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.command.wf.JobXCommand;
 import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XConfiguration;
 
 import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
index 8b2dc16..8a24ac3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java
@@ -18,25 +18,21 @@
 
 package org.apache.oozie.action.hadoop;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.ActionExecutor.Context;
-import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.WorkflowAppService;
 import org.jdom.Element;
-import org.jdom.Namespace;
 import org.jdom.JDOMException;
+import org.jdom.Namespace;
 import org.json.simple.parser.JSONParser;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class PigActionExecutor extends ScriptLanguageActionExecutor {
 
     private static final String PIG_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.PigMain";
@@ -48,10 +44,9 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
         super("pig");
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
-    public List<Class> getLauncherClasses() {
-        List<Class> classes = new ArrayList<Class>();
+    public List<Class<?>> getLauncherClasses() {
+        List<Class<?>> classes = new ArrayList<Class<?>>();
         try {
             classes.add(Class.forName(PIG_MAIN_CLASS_NAME));
             classes.add(JSONParser.class);
@@ -73,7 +68,6 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
             throws ActionExecutorException {
         super.setupActionConf(actionConf, context, actionXml, appPath);
@@ -82,12 +76,14 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
         String script = actionXml.getChild("script", ns).getTextTrim();
         String pigName = new Path(script).getName();
 
-        List<Element> params = (List<Element>) actionXml.getChildren("param", ns);
+        @SuppressWarnings("unchecked")
+        List<Element> params = actionXml.getChildren("param", ns);
         String[] strParams = new String[params.size()];
         for (int i = 0; i < params.size(); i++) {
             strParams[i] = params.get(i).getTextTrim();
         }
         String[] strArgs = null;
+        @SuppressWarnings("unchecked")
         List<Element> eArgs = actionXml.getChildren("argument", ns);
         if (eArgs != null && eArgs.size() > 0) {
             strArgs = new String[eArgs.size()];
@@ -101,8 +97,8 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
 
     public static void setPigScript(Configuration conf, String script, String[] params, String[] args) {
         conf.set(PIG_SCRIPT, script);
-        MapReduceMain.setStrings(conf, PIG_PARAMS, params);
-        MapReduceMain.setStrings(conf, PIG_ARGS, args);
+        ActionUtils.setStrings(conf, PIG_PARAMS, params);
+        ActionUtils.setStrings(conf, PIG_ARGS, args);
     }
 
 
@@ -127,10 +123,15 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor {
     }
 
     @Override
-    protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) {
+    protected boolean needToAddMapReduceToClassPath() {
+        return true;
+    }
+
+    @Override
+    protected Configuration loadHadoopDefaultResources(Context context, Element actionXml) {
         boolean loadDefaultResources = ConfigurationService
                 .getBoolean(HadoopAccessorService.ACTION_CONFS_LOAD_DEFAULT_RESOURCES);
-        JobConf conf = super.createBaseHadoopConf(context, actionXml, loadDefaultResources);
+        Configuration conf = super.createBaseHadoopConf(context, actionXml, loadDefaultResources);
         return conf;
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java
index 92e149d..196f0b7 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java
@@ -37,9 +37,8 @@ public abstract class ScriptLanguageActionExecutor extends JavaActionExecutor {
         super(type);
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
-    public List<Class> getLauncherClasses() {
+    public List<Class<?>> getLauncherClasses() {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
index b9ffa7a..d44bbc5 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java
@@ -19,10 +19,13 @@
 
 package org.apache.oozie.action.hadoop;
 
+import java.io.File;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.util.Apps;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.service.ConfigurationService;
 import org.jdom.Element;
@@ -30,19 +33,12 @@ import org.jdom.Namespace;
 
 public class ShellActionExecutor extends JavaActionExecutor {
 
-    /**
-     * Config property name to set the child environment
-     */
-    public String OOZIE_LAUNCHER_CHILD_ENV = "mapred.child.env";
-    public String OOZIE_LAUNCHER_MAP_ENV = "mapreduce.map.env";
-
     public ShellActionExecutor() {
         super("shell");
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
-    public List<Class> getLauncherClasses() {
+    public List<Class<?>> getLauncherClasses() {
         return null;
     }
 
@@ -51,7 +47,6 @@ public class ShellActionExecutor extends JavaActionExecutor {
         return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, ShellMain.class.getName());
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
             throws ActionExecutorException {
@@ -103,6 +98,7 @@ public class ShellActionExecutor extends JavaActionExecutor {
             boolean checkKeyValue) throws ActionExecutorException {
         String[] strTagValue = null;
         Namespace ns = actionXml.getNamespace();
+        @SuppressWarnings("unchecked")
         List<Element> eTags = actionXml.getChildren(tag, ns);
         if (eTags != null && eTags.size() > 0) {
             strTagValue = new String[eTags.size()];
@@ -113,7 +109,7 @@ public class ShellActionExecutor extends JavaActionExecutor {
                 }
             }
         }
-        MapReduceMain.setStrings(actionConf, key, strTagValue);
+        ActionUtils.setStrings(actionConf, key, strTagValue);
     }
 
     /**
@@ -130,23 +126,8 @@ public class ShellActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
-            throws ActionExecutorException {
-        super.setupLauncherConf(conf, actionXml, appPath, context);
-        addDefaultChildEnv(conf);
-        return conf;
-    }
-
-    /**
-     * This method sets the PATH to current working directory for the launched
-     * map task from where shell command will run.
-     *
-     * @param conf
-     */
-    protected void addDefaultChildEnv(Configuration conf) {
-        String envValues = "PATH=.:$PATH";
-        updateProperty(conf, OOZIE_LAUNCHER_MAP_ENV, envValues);
-        updateProperty(conf, OOZIE_LAUNCHER_CHILD_ENV, envValues);
+    protected void addActionSpecificEnvVars(Map<String, String> env) {
+        Apps.setEnvFromInputString(env, "PATH=.:$PATH", File.pathSeparator);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 1a3197a..00497a7 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -21,7 +21,6 @@ package org.apache.oozie.action.hadoop;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
@@ -32,12 +31,11 @@ import org.jdom.Namespace;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 public class SparkActionExecutor extends JavaActionExecutor {
     public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain";
-    public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2
-    public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first";  // hadoop-1
     public static final String SPARK_MASTER = "oozie.spark.master";
     public static final String SPARK_MODE = "oozie.spark.mode";
     public static final String SPARK_OPTS = "oozie.spark.spark-opts";
@@ -78,7 +76,7 @@ public class SparkActionExecutor extends JavaActionExecutor {
 
         StringBuilder sparkOptsSb = new StringBuilder();
         if (master.startsWith("yarn")) {
-            String resourceManager = actionConf.get(HADOOP_JOB_TRACKER);
+            String resourceManager = actionConf.get(HADOOP_YARN_RM);
             Properties sparkConfig =
                     Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager);
             for (String property : sparkConfig.stringPropertyNames()) {
@@ -102,20 +100,6 @@ public class SparkActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
-                               Configuration actionConf) throws ActionExecutorException {
-
-        JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
-        if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) {
-            launcherJobConf.set(TASK_USER_PRECEDENCE, "true");
-        }
-        if (launcherJobConf.get("oozie.launcher." + TASK_USER_CLASSPATH_PRECEDENCE) == null) {
-            launcherJobConf.set(TASK_USER_CLASSPATH_PRECEDENCE, "true");
-        }
-        return launcherJobConf;
-    }
-
-    @Override
     Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context)
             throws ActionExecutorException {
         super.setupLauncherConf(conf, actionXml, appPath, context);
@@ -136,8 +120,8 @@ public class SparkActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    public List<Class> getLauncherClasses() {
-        List<Class> classes = new ArrayList<Class>();
+    public List<Class<?>> getLauncherClasses() {
+        List<Class<?>> classes = new ArrayList<Class<?>>();
         try {
             classes.add(Class.forName(SPARK_MAIN_CLASS_NAME));
         } catch (ClassNotFoundException e) {
@@ -159,6 +143,16 @@ public class SparkActionExecutor extends JavaActionExecutor {
     }
 
     @Override
+    protected boolean needToAddMapReduceToClassPath() {
+        return true;
+    }
+
+    @Override
+    protected void addActionSpecificEnvVars(Map<String, String> env) {
+        env.put("SPARK_HOME", ".");
+    }
+
+    @Override
     protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
         return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
index 22e2874..955f3b7 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
@@ -18,6 +18,12 @@
 
 package org.apache.oozie.action.hadoop;
 
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
@@ -33,12 +39,6 @@ import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 import org.jdom.Namespace;
 
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.StringTokenizer;
-
 public class SqoopActionExecutor extends JavaActionExecutor {
 
   public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
@@ -51,8 +51,8 @@ public class SqoopActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    public List<Class> getLauncherClasses() {
-        List<Class> classes = new ArrayList<Class>();
+    public List<Class<?>> getLauncherClasses() {
+        List<Class<?>> classes = new ArrayList<Class<?>>();
         try {
             classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME));
         }
@@ -68,7 +68,6 @@ public class SqoopActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
             throws ActionExecutorException {
         super.setupActionConf(actionConf, context, actionXml, appPath);
@@ -96,6 +95,7 @@ public class SqoopActionExecutor extends JavaActionExecutor {
             }
         }
         else {
+            @SuppressWarnings("unchecked")
             List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
             for (Element elem : eArgs) {
                 argList.add(elem.getTextTrim());
@@ -119,7 +119,7 @@ public class SqoopActionExecutor extends JavaActionExecutor {
     }
 
     private void setSqoopCommand(Configuration conf, String[] args) {
-        MapReduceMain.setStrings(conf, SQOOP_ARGS, args);
+        ActionUtils.setStrings(conf, SQOOP_ARGS, args);
     }
 
     /**
@@ -141,7 +141,7 @@ public class SqoopActionExecutor extends JavaActionExecutor {
         try {
             if (action.getStatus() == WorkflowAction.Status.OK) {
                 Element actionXml = XmlUtils.parseXml(action.getConf());
-                JobConf jobConf = createBaseHadoopConf(context, actionXml);
+                Configuration jobConf = createBaseHadoopConf(context, actionXml);
                 jobClient = createJobClient(context, jobConf);
 
                 // Cumulative counters for all Sqoop mapreduce jobs
@@ -236,6 +236,11 @@ public class SqoopActionExecutor extends JavaActionExecutor {
         }
     }
 
+    @Override
+    protected boolean needToAddMapReduceToClassPath() {
+        return true;
+    }
+
     /**
      * Return the sharelib name for the action.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
index fb021bd..374c6ef 100644
--- a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
+++ b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
@@ -18,7 +18,6 @@
 
 package org.apache.oozie.client.rest;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -29,6 +28,8 @@ import org.apache.oozie.client.BulkResponse;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
+import com.google.common.collect.ImmutableSet;
+
 /**
  * Server-side implementation class of the client interface BulkResponse
  * Declares all the bulk request specific user parameters and handling as JSON object
@@ -48,20 +49,14 @@ public class BulkResponseImpl implements BulkResponse, JsonBean {
     public static final String BULK_FILTER_END_NOMINAL_EPOCH = "endscheduledtime";
     public static final String BULK_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:SS'Z'";
 
-    public static final Set<String> BULK_FILTER_NAMES = new HashSet<String>();
-
-    static {
-
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE);
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD);
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_LEVEL);
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_STATUS);
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
-        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
-
-    }
+    public static final Set<String> BULK_FILTER_NAMES = ImmutableSet.of(BulkResponseImpl.BULK_FILTER_BUNDLE,
+            BulkResponseImpl.BULK_FILTER_COORD,
+            BulkResponseImpl.BULK_FILTER_LEVEL,
+            BulkResponseImpl.BULK_FILTER_STATUS,
+            BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH,
+            BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH,
+            BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH,
+            BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
 
     /**
      * Construct JSON object using the bulk request object and the associated tags

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/XCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java
index bdf13f6..7b8f47c 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -244,7 +244,10 @@ public abstract class XCommand<T> implements XCallable<T> {
     @Override
     public final T call() throws CommandException {
         setLogInfo();
-        if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
+        CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
+        Set<String> interruptTypes = callableQueueService.getInterruptTypes();
+
+        if (interruptTypes.contains(this.getType()) && used.get()) {
             LOG.debug("Command [{0}] key [{1}]  already used for [{2}]", getName(), getEntityKey(), this.toString());
             return null;
         }
@@ -271,7 +274,7 @@ public abstract class XCommand<T> implements XCallable<T> {
                 }
 
                 if (!isLockRequired() || (lock != null) || this.inInterruptMode()) {
-                    if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
+                    if (interruptTypes.contains(this.getType())
                             && !used.compareAndSet(false, true)) {
                         LOG.debug("Command [{0}] key [{1}]  already executed for [{2}]", getName(), getEntityKey(),
                                 this.toString());
@@ -289,7 +292,6 @@ public abstract class XCommand<T> implements XCallable<T> {
                     instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
                 }
                 if (commandQueue != null) {
-                    CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
                     for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
                         LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
                         if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java
index d2a2742..98d0f3c 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java
@@ -46,6 +46,8 @@ import org.apache.oozie.client.XOozieClient;
 import org.jdom.Element;
 import org.jdom.Namespace;
 
+import com.google.common.collect.ImmutableSet;
+
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -54,17 +56,8 @@ import java.util.HashSet;
 
 public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> {
 
-    protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>();
-    protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>();
-
-    static {
-        MANDATORY_OOZIE_CONFS.add(XOozieClient.JT);
-        MANDATORY_OOZIE_CONFS.add(XOozieClient.NN);
-        MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH);
-
-        OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES);
-        OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES);
-    }
+    static final Set<String> MANDATORY_OOZIE_CONFS = ImmutableSet.of(XOozieClient.RM, XOozieClient.NN, OozieClient.LIBPATH);
+    static final Set<String> OPTIONAL_OOZIE_CONFS = ImmutableSet.of(XOozieClient.FILES, XOozieClient.ARCHIVES);
 
     private Configuration conf;
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
index cc61d3d..05e7595 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java
@@ -41,11 +41,10 @@ public class SubmitMRXCommand extends SubmitHttpXCommand {
 
     static {
         SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER);
-        SKIPPED_CONFS.add(XOozieClient.JT);
+        SKIPPED_CONFS.add(XOozieClient.RM);
         SKIPPED_CONFS.add(XOozieClient.NN);
 
         DEPRECATE_MAP.put(XOozieClient.NN, XOozieClient.NN_2);
-        DEPRECATE_MAP.put(XOozieClient.JT, XOozieClient.JT_2);
         DEPRECATE_MAP.put(WorkflowAppService.HADOOP_USER, "mapreduce.job.user.name");
     }
 
@@ -93,8 +92,7 @@ public class SubmitMRXCommand extends SubmitHttpXCommand {
     protected Element generateSection(Configuration conf, Namespace ns) {
         Element mapreduce = new Element("map-reduce", ns);
         Element jt = new Element("job-tracker", ns);
-        String newJTVal = conf.get(DEPRECATE_MAP.get(XOozieClient.JT));
-        jt.addContent(newJTVal != null ? newJTVal : (conf.get(XOozieClient.JT)));
+        jt.addContent(conf.get(XOozieClient.RM));
         mapreduce.addContent(jt);
         Element nn = new Element("name-node", ns);
         String newNNVal = conf.get(DEPRECATE_MAP.get(XOozieClient.NN));

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java
index 9d41305..fab4398 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java
@@ -19,7 +19,7 @@
 package org.apache.oozie.command.wf;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
 import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.command.CommandException;
 import org.jdom.Element;
@@ -50,7 +50,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand {
         String name = getWorkflowName();
         Element ele = new Element(name, ns);
         Element jt = new Element("job-tracker", ns);
-        jt.addContent(conf.get(XOozieClient.JT));
+        jt.addContent(conf.get(XOozieClient.RM));
         ele.addContent(jt);
         Element nn = new Element("name-node", ns);
         nn.addContent(conf.get(XOozieClient.NN));
@@ -58,7 +58,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand {
 
         List<String> Dargs = new ArrayList<String>();
         List<String> otherArgs = new ArrayList<String>();
-        String[] args = MapReduceMain.getStrings(conf, getOptions());
+        String[] args = ActionUtils.getStrings(conf, getOptions());
         for (String arg : args) {
             if (arg.startsWith("-D")) {
                 Dargs.add(arg);
@@ -67,7 +67,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand {
                 otherArgs.add(arg);
             }
         }
-        String [] params = MapReduceMain.getStrings(conf, getScriptParamters());
+        String [] params = ActionUtils.getStrings(conf, getScriptParamters());
 
         // configuration section
         if (Dargs.size() > 0) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java
index 51b739e..c5574c5 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java
@@ -22,7 +22,7 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.command.CommandException;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
 import org.jdom.Namespace;
 import org.jdom.Element;
 
@@ -50,14 +50,14 @@ public class SubmitSqoopXCommand extends SubmitHttpXCommand {
         String name = "sqoop";
         Element ele = new Element(name, ns);
         Element jt = new Element("job-tracker", ns);
-        jt.addContent(conf.get(XOozieClient.JT));
+        jt.addContent(conf.get(XOozieClient.RM));
         ele.addContent(jt);
         Element nn = new Element("name-node", ns);
         nn.addContent(conf.get(XOozieClient.NN));
         ele.addContent(nn);
 
         List<String> Dargs = new ArrayList<String>();
-        String[] args = MapReduceMain.getStrings(conf, getOptions());
+        String[] args = ActionUtils.getStrings(conf, getOptions());
         for (String arg : args) {
             if (arg.startsWith("-D")) {
                 Dargs.add(arg);

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
index ace120d..79355eb 100644
--- a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
+++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java
@@ -111,15 +111,16 @@ public abstract class AbstractCoordInputDependency implements Writable, CoordInp
             missingDependenciesSet = new HashMap<String, List<String>>();
             availableDependenciesSet = new HashMap<String, List<String>>();
 
-            Set<String> keySets = dependencyMap.keySet();
-            for (String key : keySets) {
-                for (CoordInputInstance coordInputInstance : dependencyMap.get(key))
+            for (Entry<String, List<CoordInputInstance>> entry : dependencyMap.entrySet()) {
+                String key = entry.getKey();
+                for (CoordInputInstance coordInputInstance : entry.getValue()) {
                     if (coordInputInstance.isAvailable()) {
                         addToAvailableDependencies(key, coordInputInstance);
                     }
                     else {
                         addToMissingDependencies(key, coordInputInstance);
                     }
+                }
             }
         }
         catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
index 6f0abf6..a8b58d5 100644
--- a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
+++ b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java
@@ -492,13 +492,13 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve
 
     @Override
     public void removeNonWaitingCoordActions(Set<String> staleActions) {
-        Iterator<String> serverItr = missingDepsByServer.keySet().iterator();
-        while (serverItr.hasNext()) {
-            String server = serverItr.next();
-            Cache missingCache = missingDepsByServer.get(server);
+        for (Entry<String, Cache> entry : missingDepsByServer.entrySet()) {
+            Cache missingCache = entry.getValue();
+
             if (missingCache == null) {
                 continue;
             }
+
             synchronized (missingCache) {
                 for (Object key : missingCache.getKeys()) {
                     Element element = missingCache.get(key);

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
index a86a8d0..cfd208a 100644
--- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
+++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java
@@ -48,6 +48,8 @@ import org.apache.oozie.util.PriorityDelayQueue.QueueElement;
 import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XLog;
 
+import com.google.common.collect.ImmutableSet;
+
 /**
  * The callable queue service queues {@link XCallable}s for asynchronous execution.
  * <p>
@@ -95,9 +97,9 @@ public class CallableQueueService implements Service, Instrumentable {
 
     private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>();
 
-    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>();
+    private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>();
 
-    public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>();
+    private Set<String> interruptTypes;
 
     private int interruptMapMaxSize;
 
@@ -452,10 +454,12 @@ public class CallableQueueService implements Service, Instrumentable {
         int threads = ConfigurationService.getInt(conf, CONF_THREADS);
         boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE);
 
+        interruptTypes = new HashSet<>();
         for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) {
             log.debug("Adding interrupt type [{0}]", type);
-            INTERRUPT_TYPES.add(type);
+            interruptTypes.add(type);
         }
+        interruptTypes = ImmutableSet.copyOf(interruptTypes);
 
         if (!callableNextEligible) {
             queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) {
@@ -720,12 +724,12 @@ public class CallableQueueService implements Service, Instrumentable {
     public void checkInterruptTypes(XCallable<?> callable) {
         if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) {
             for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) {
-                if (INTERRUPT_TYPES.contains(singleCallable.getType())) {
+                if (interruptTypes.contains(singleCallable.getType())) {
                     insertCallableIntoInterruptMap(singleCallable);
                 }
             }
         }
-        else if (INTERRUPT_TYPES.contains(callable.getType())) {
+        else if (interruptTypes.contains(callable.getType())) {
             insertCallableIntoInterruptMap(callable);
         }
     }
@@ -791,4 +795,8 @@ public class CallableQueueService implements Service, Instrumentable {
         return executor.invokeAll(tasks);
     }
 
+    public Set<String> getInterruptTypes() {
+        return interruptTypes;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
index 22c6fb0..a68f94f 100644
--- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
+++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -170,14 +171,16 @@ public class EventHandlerService implements Service {
     @Override
     public void destroy() {
         eventsEnabled = false;
-        for (MessageType type : listenerMap.keySet()) {
-            Iterator<?> iter = listenerMap.get(type).iterator();
-            while (iter.hasNext()) {
+
+        for (Entry<MessageType, List<?>> entry : listenerMap.entrySet()) {
+            List<?> listeners = entry.getValue();
+            MessageType type = entry.getKey();
+
+            for (Object listener : listeners) {
                 if (type == MessageType.JOB) {
-                    ((JobEventListener) iter.next()).destroy();
-                }
-                else if (type == MessageType.SLA) {
-                    ((SLAEventListener) iter.next()).destroy();
+                    ((JobEventListener) listener).destroy();
+                } else if (type == MessageType.SLA) {
+                    ((SLAEventListener) listener).destroy();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 23a9d92..9624104 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -18,17 +18,22 @@
 
 package org.apache.oozie.service;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.action.hadoop.JavaActionExecutor;
 import org.apache.oozie.util.IOUtils;
@@ -43,7 +48,7 @@ import java.io.FileInputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
+import java.io.OutputStream;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.URI;
@@ -79,19 +84,16 @@ public class HadoopAccessorService implements Service {
     public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal";
     public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token");
 
-    protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
     /** The Kerberos principal for the job tracker.*/
     protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal";
     /** The Kerberos principal for the resource manager.*/
     protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal";
-    protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
-    protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
     protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
-    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
-
-    private static Configuration cachedConf;
 
+    private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created";
+    private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>();
     private static final String DEFAULT_ACTIONNAME = "default";
+    private static Configuration cachedConf;
 
     private Set<String> jobTrackerWhitelist = new HashSet<String>();
     private Set<String> nameNodeWhitelist = new HashSet<String>();
@@ -406,18 +408,20 @@ public class HadoopAccessorService implements Service {
             public boolean accept(File dir, String name) {
                 return ActionConfFileType.isSupportedFileType(name);
             }});
-        Arrays.sort(actionConfFiles, new Comparator<File>() {
-            @Override
-            public int compare(File o1, File o2) {
-                return o1.getName().compareTo(o2.getName());
-            }
-        });
-        for (File f : actionConfFiles) {
-            if (f.isFile() && f.canRead()) {
-                updateActionConfigWithFile(actionConf, f);
+
+        if (actionConfFiles != null) {
+            Arrays.sort(actionConfFiles, new Comparator<File>() {
+                @Override
+                public int compare(File o1, File o2) {
+                    return o1.getName().compareTo(o2.getName());
+                }
+            });
+            for (File f : actionConfFiles) {
+                if (f.isFile() && f.canRead()) {
+                    updateActionConfigWithFile(actionConf, f);
+                }
             }
         }
-
     }
 
     private Configuration readActionConfFile(File file) throws IOException {
@@ -505,7 +509,7 @@ public class HadoopAccessorService implements Service {
         if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
             throw new HadoopAccessorException(ErrorCode.E0903);
         }
-        String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER);
+        String jobTracker = conf.get(JavaActionExecutor.HADOOP_YARN_RM);
         validateJobTracker(jobTracker);
         try {
             UserGroupInformation ugi = getUGI(user);
@@ -516,39 +520,60 @@ public class HadoopAccessorService implements Service {
             });
             return jobClient;
         }
-        catch (InterruptedException ex) {
-            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
-        }
-        catch (IOException ex) {
+        catch (IOException | InterruptedException ex) {
             throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
         }
     }
 
     /**
-     * Get the RM delegation token using jobClient and add it to conf
+     * Return a JobClient created with the provided user/group.
      *
-     * @param jobClient
-     * @param conf
-     * @throws HadoopAccessorException
+     *
+     * @param conf Configuration with all necessary information to create the
+     *        JobClient.
+     * @return JobClient created with the provided user/group.
+     * @throws HadoopAccessorException if the client could not be created.
      */
-    public void addRMDelegationToken(JobClient jobClient, JobConf conf) throws HadoopAccessorException {
-        Token<DelegationTokenIdentifier> mrdt;
-        try {
-            mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf));
-        }
-        catch (IOException e) {
-            throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e);
+    public JobClient createJobClient(String user, Configuration conf) throws HadoopAccessorException {
+        return createJobClient(user, new JobConf(conf));
+    }
+
+    /**
+     * Return a YarnClient created with the provided user and configuration. The caller is responsible for closing it when done.
+     *
+     * @param user The username to impersonate
+     * @param conf The conf
+     * @return a YarnClient with the provided user and configuration
+     * @throws HadoopAccessorException if the client could not be created.
+     */
+    public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException {
+        ParamChecker.notEmpty(user, "user");
+        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+            throw new HadoopAccessorException(ErrorCode.E0903);
         }
-        catch (InterruptedException e) {
-            throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e);
+        String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM);
+        validateJobTracker(rm);
+        try {
+            UserGroupInformation ugi = getUGI(user);
+            YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() {
+                @Override
+                public YarnClient run() throws Exception {
+                    YarnClient yarnClient = YarnClient.createYarnClient();
+                    yarnClient.init(conf);
+                    yarnClient.start();
+                    return yarnClient;
+                }
+            });
+            return yarnClient;
+        } catch (IOException | InterruptedException ex) {
+            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
         }
-        conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt);
     }
 
     /**
      * Return a FileSystem created with the provided user for the specified URI.
      *
-     *
+     * @param user The username to impersonate
      * @param uri file system URI.
      * @param conf Configuration with all necessary information to create the FileSystem.
      * @return FileSystem created with the provided user/group.
@@ -556,8 +581,14 @@ public class HadoopAccessorService implements Service {
      */
     public FileSystem createFileSystem(String user, final URI uri, final Configuration conf)
             throws HadoopAccessorException {
+       return createFileSystem(user, uri, conf, true);
+    }
+
+    private FileSystem createFileSystem(String user, final URI uri, final Configuration conf, boolean checkAccessorProperty)
+            throws HadoopAccessorException {
         ParamChecker.notEmpty(user, "user");
-        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+
+        if (checkAccessorProperty && !conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
             throw new HadoopAccessorException(ErrorCode.E0903);
         }
 
@@ -585,10 +616,7 @@ public class HadoopAccessorService implements Service {
                 }
             });
         }
-        catch (InterruptedException ex) {
-            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
-        }
-        catch (IOException ex) {
+        catch (IOException | InterruptedException ex) {
             throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
         }
     }
@@ -639,10 +667,7 @@ public class HadoopAccessorService implements Service {
             renewer = mrTokenRenewers.get(servicePrincipal);
             if (renewer == null) {
                 // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal()
-                String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2));
-                if (target == null) {
-                    target = jobConf.get(HADOOP_JOB_TRACKER);
-                }
+                String target = jobConf.get(HADOOP_YARN_RM);
                 try {
                     String addr = NetUtils.createSocketAddr(target).getHostName();
                     renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr));
@@ -705,4 +730,48 @@ public class HadoopAccessorService implements Service {
         return supportedSchemes;
     }
 
-}
+    /**
+     * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container.  This involves also writing it
+     * to HDFS.
+     * Example usage:
+     * * <pre>
+     * {@code
+     * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir);
+     * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir);
+     * ...
+     * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+     * localResources.put(filename1, res1);
+     * localResources.put(filename2, res2);
+     * ...
+     * containerLaunchContext.setLocalResources(localResources);
+     * }
+     * </pre>
+     *
+     * @param filename The filename to use on the remote filesystem and once it has been localized.
+     * @param user The user
+     * @param conf The configuration to process
+     * @param uri The URI of the remote filesystem (e.g. HDFS)
+     * @param dir The directory on the remote filesystem to write the file to
+     * @return
+     * @throws IOException A problem occurred writing the file
+     * @throws HadoopAccessorException A problem occured with Hadoop
+     * @throws URISyntaxException A problem occurred parsing the URI
+     */
+    public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri,
+                                                                 Path dir)
+            throws IOException, HadoopAccessorException, URISyntaxException {
+        Path dst = new Path(dir, filename);
+        FileSystem fs = createFileSystem(user, uri, conf, false);
+        try (OutputStream os = fs.create(dst)){
+            conf.writeXml(os);
+        }
+        LocalResource localResource = Records.newRecord(LocalResource.class);
+        localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+        localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+        FileStatus destStatus = fs.getFileStatus(dst);
+        localResource.setTimestamp(destStatus.getModificationTime());
+        localResource.setSize(destStatus.getLen());
+        return localResource;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/Services.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/Services.java b/core/src/main/java/org/apache/oozie/service/Services.java
index 829d5f5..7f47f88 100644
--- a/core/src/main/java/org/apache/oozie/service/Services.java
+++ b/core/src/main/java/org/apache/oozie/service/Services.java
@@ -204,7 +204,6 @@ public class Services {
      *
      * @throws ServiceException thrown if any of the services could not initialize.
      */
-    @SuppressWarnings("unchecked")
     public void init() throws ServiceException {
         XLog log = new XLog(LogFactory.getLog(getClass()));
         log.trace("Initializing");
@@ -255,9 +254,9 @@ public class Services {
      *                configuration.
      * @throws ServiceException thrown if a service class could not be loaded.
      */
-    private void loadServices(Class[] classes, List<Service> list) throws ServiceException {
+    private void loadServices(Class<?>[] classes, List<Service> list) throws ServiceException {
         XLog log = new XLog(LogFactory.getLog(getClass()));
-        for (Class klass : classes) {
+        for (Class<?> klass : classes) {
             try {
                 Service service = (Service) klass.newInstance();
                 log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(),
@@ -284,10 +283,10 @@ public class Services {
     private void loadServices() throws ServiceException {
         XLog log = new XLog(LogFactory.getLog(getClass()));
         try {
-            Map<Class, Service> map = new LinkedHashMap<Class, Service>();
-            Class[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES);
+            Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>();
+            Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES);
             log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'");
-            Class[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES);
+            Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES);
             log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'");
             List<Service> list = new ArrayList<Service>();
             loadServices(classes, list);
@@ -301,11 +300,12 @@ public class Services {
                 }
                 map.put(service.getInterface(), service);
             }
-            for (Map.Entry<Class, Service> entry : map.entrySet()) {
+            for (Map.Entry<Class<?>, Service> entry : map.entrySet()) {
                 setService(entry.getValue().getClass());
             }
         } catch (RuntimeException rex) {
-            log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'");
+            log.fatal("Runtime Exception during Services Load. Check your list of '{0}' or '{1}'",
+                    CONF_SERVICE_CLASSES, CONF_SERVICE_EXT_CLASSES, rex);
             throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex);
         }
     }