You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by sh...@apache.org on 2015/04/27 07:20:49 UTC

oozie git commit: OOZIE-2129 Duplicate child jobs per instance (jaydeepvishwakarma via shwethags)

Repository: oozie
Updated Branches:
  refs/heads/master 6aefbc710 -> 3e20533b0


OOZIE-2129 Duplicate child jobs per instance (jaydeepvishwakarma via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3e20533b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3e20533b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3e20533b

Branch: refs/heads/master
Commit: 3e20533b0fc75ee91ed7e6bad9eff07a63dba35c
Parents: 6aefbc7
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Mon Apr 27 10:50:45 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Mon Apr 27 10:50:45 2015 +0530

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 11 +++++-
 .../action/hadoop/LauncherMapperHelper.java     | 14 +++----
 .../action/oozie/SubWorkflowActionExecutor.java |  8 ++++
 .../oozie/command/wf/ActionStartXCommand.java   | 16 ++++++++
 .../action/hadoop/LauncherMainHadoopUtils.java  |  4 --
 .../action/hadoop/LauncherMainHadoopUtils.java  |  4 --
 .../action/hadoop/LauncherMainHadoopUtils.java  | 34 ++++++++---------
 .../action/hadoop/LauncherMainHadoopUtils.java  | 34 ++++++++---------
 release-log.txt                                 |  1 +
 .../apache/oozie/action/hadoop/HiveMain.java    |  2 +
 .../apache/oozie/action/hadoop/Hive2Main.java   |  6 +++
 sharelib/oozie/pom.xml                          |  1 -
 .../apache/oozie/action/hadoop/JavaMain.java    |  5 +--
 .../oozie/action/hadoop/LauncherMain.java       | 15 +++++++-
 .../oozie/action/hadoop/LauncherMapper.java     | 40 ++++++++++++++++++--
 .../oozie/action/hadoop/MapReduceMain.java      | 28 +++++---------
 .../org/apache/oozie/action/hadoop/PigMain.java |  1 +
 sharelib/spark/pom.xml                          |  6 ++-
 .../SparkMain.java                              |  3 +-
 .../apache/oozie/action/hadoop/SqoopMain.java   |  1 +
 20 files changed, 151 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index eb2dbdb..695853e 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -57,6 +57,7 @@ import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.command.wf.ActionStartXCommand;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -884,8 +885,16 @@ public class JavaActionExecutor extends ActionExecutor {
             launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
             setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
 
+            String launcherTag = null;
+            // Extracting tag and appending action name to maintain the uniqueness.
+            if (context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) {
+                launcherTag = context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG);
+            } else { //Keeping it to maintain backward compatibly with test cases.
+                launcherTag = action.getId();
+            }
+
             // Properties for when a launcher job's AM gets restarted
-            LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, action.getId());
+            LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, launcherTag);
 
             String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
             if (actionShareLibProperty != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index 069a734..6a93232 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -23,8 +23,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
 import java.math.BigInteger;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -164,17 +162,19 @@ public class LauncherMapperHelper {
         launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
     }
 
-    public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String actionId)
+    public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String launcherTag)
             throws NoSuchAlgorithmException {
         launcherJobConf.setLong("oozie.job.launch.time", System.currentTimeMillis());
         // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
-        String tag = getTag(actionId);
-        actionConf.set("mapreduce.job.tags", tag);
+        String tag = getTag(launcherTag);
+        // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
+        // mapreduce.job.tags should only go to child job launch by launcher.
+        actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag);
     }
 
-    private static String getTag(String actionId) throws NoSuchAlgorithmException {
+    private static String getTag(String launcherTag) throws NoSuchAlgorithmException {
         MessageDigest digest = MessageDigest.getInstance("MD5");
-        digest.update(actionId.getBytes(), 0, actionId.length());
+        digest.update(launcherTag.getBytes(), 0, launcherTag.length());
         String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
         return md5;
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
index 527a5e2..854d621 100644
--- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.DagEngine;
 import org.apache.oozie.LocalOozieClient;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.wf.ActionStartXCommand;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.DagEngineService;
 import org.apache.oozie.client.WorkflowAction;
@@ -181,6 +182,13 @@ public class SubWorkflowActionExecutor extends ActionExecutor {
                 //TODO: this has to be refactored later to be done in a single place for REST calls and this
                 JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
                                           subWorkflowConf);
+
+                // pushing the tag to conf for using by Launcher.
+                if(context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) {
+                    subWorkflowConf.set(ActionStartXCommand.OOZIE_ACTION_YARN_TAG,
+                            context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG));
+                }
+
                 // if the rerun failed node option is provided during the time of rerun command, old subworkflow will
                 // rerun again.
                 if(action.getExternalId() != null && parentConf.getBoolean(OozieClient.RERUN_FAIL_NODES, false)) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index d4048a1..e06649c 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -68,6 +68,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
     public static final String COULD_NOT_START = "COULD_NOT_START";
     public static final String START_DATA_MISSING = "START_DATA_MISSING";
     public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
+    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
 
     private String jobId = null;
     private String actionId = null;
@@ -231,6 +232,21 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
                 Instrumentation.Cron cron = new Instrumentation.Cron();
                 cron.start();
                 context.setStartTime();
+                /*
+                Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only
+                one child job is running. Tag is formed as follows:
+                For workflow job, tag = action-id
+                For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else
+                coord-action-id@subflow-action-name@action-name.
+                 */
+                if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+                    context.setVar(OOZIE_ACTION_YARN_TAG, conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName());
+                } else if (wfJob.getParentId() != null) {
+                    context.setVar(OOZIE_ACTION_YARN_TAG, wfJob.getParentId() + "@" + wfAction.getName());
+                } else {
+                    context.setVar(OOZIE_ACTION_YARN_TAG, wfAction.getId());
+                }
+
                 executor.start(context, wfAction);
                 cron.stop();
                 FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index 46c2fbd..9e34d0b 100644
--- a/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -26,10 +26,6 @@ public class LauncherMainHadoopUtils {
     private LauncherMainHadoopUtils() {
     }
 
-    public static String getYarnJobForMapReduceAction(Configuration actionConf) {
-        return null;
-    }
-
     public static void killChildYarnJobs(Configuration actionConf) {
         // no-op
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index 46c2fbd..9e34d0b 100644
--- a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -26,10 +26,6 @@ public class LauncherMainHadoopUtils {
     private LauncherMainHadoopUtils() {
     }
 
-    public static String getYarnJobForMapReduceAction(Configuration actionConf) {
-        return null;
-    }
-
     public static void killChildYarnJobs(Configuration actionConf) {
         // no-op
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index f6bb6a4..9331c13 100644
--- a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
@@ -33,29 +35,35 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.mapreduce.TypeConverter;
 
 public class LauncherMainHadoopUtils {
 
+    public static final String CHILD_MAPREDUCE_JOB_TAGS = "oozie.child.mapreduce.job.tags";
+
     private LauncherMainHadoopUtils() {
     }
 
     private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
-        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+        System.out.println("Fetching child yarn jobs");
         long startTime = 0L;
         try {
             startTime = Long.parseLong((System.getProperty("oozie.job.launch.time")));
         } catch(NumberFormatException nfe) {
             throw new RuntimeException("Could not find Oozie job launch time", nfe);
         }
-        String tag = actionConf.get("mapreduce.job.tags");
-        if (tag == null) {
-            throw new RuntimeException("Could not find Yarn tags property (mapreduce.job.tags)");
+
+        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+        if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) == null) {
+            System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS);
+            return childYarnJobs;
         }
+
+        String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+        System.out.println("tag id : " + tag);
         GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
         gar.setScope(ApplicationsRequestScope.OWN);
-        gar.setStartRange(startTime, System.currentTimeMillis());
         gar.setApplicationTags(Collections.singleton(tag));
+        gar.setStartRange(startTime, System.currentTimeMillis());
         try {
             ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
             GetApplicationsResponse apps = proxy.getApplications(gar);
@@ -68,19 +76,9 @@ public class LauncherMainHadoopUtils {
         } catch (YarnException ye) {
             throw new RuntimeException("Exception occurred while finding child jobs", ye);
         }
-        return childYarnJobs;
-    }
 
-    public static String getYarnJobForMapReduceAction(Configuration actionConf) {
-        Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
-        String childJobId = null;
-        if (!childYarnJobs.isEmpty()) {
-            ApplicationId childJobYarnId = childYarnJobs.iterator().next();
-            System.out.println("Found Map-Reduce job [" + childJobYarnId + "] already running");
-            // Need the JobID version for Oozie
-            childJobId = TypeConverter.fromYarn(childJobYarnId).toString();
-        }
-        return childJobId;
+        System.out.println("Child yarn jobs are found - " + StringUtils.join(childYarnJobs, ","));
+        return childYarnJobs;
     }
 
     public static void killChildYarnJobs(Configuration actionConf) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index f6bb6a4..211ba09 100644
--- a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -33,29 +33,35 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.mapreduce.TypeConverter;
 
 public class LauncherMainHadoopUtils {
 
+    public static final String CHILD_MAPREDUCE_JOB_TAGS = "oozie.child.mapreduce.job.tags";
+
     private LauncherMainHadoopUtils() {
     }
 
     private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
-        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+        System.out.println("Fetching child yarn jobs");
         long startTime = 0L;
         try {
             startTime = Long.parseLong((System.getProperty("oozie.job.launch.time")));
         } catch(NumberFormatException nfe) {
             throw new RuntimeException("Could not find Oozie job launch time", nfe);
         }
-        String tag = actionConf.get("mapreduce.job.tags");
-        if (tag == null) {
-            throw new RuntimeException("Could not find Yarn tags property (mapreduce.job.tags)");
+
+        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+        if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) == null) {
+            System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS);
+            return childYarnJobs;
         }
+
+        String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+        System.out.println("tag id : " + tag);
         GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
         gar.setScope(ApplicationsRequestScope.OWN);
-        gar.setStartRange(startTime, System.currentTimeMillis());
         gar.setApplicationTags(Collections.singleton(tag));
+        gar.setStartRange(startTime, System.currentTimeMillis());
         try {
             ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
             GetApplicationsResponse apps = proxy.getApplications(gar);
@@ -68,19 +74,9 @@ public class LauncherMainHadoopUtils {
         } catch (YarnException ye) {
             throw new RuntimeException("Exception occurred while finding child jobs", ye);
         }
-        return childYarnJobs;
-    }
 
-    public static String getYarnJobForMapReduceAction(Configuration actionConf) {
-        Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
-        String childJobId = null;
-        if (!childYarnJobs.isEmpty()) {
-            ApplicationId childJobYarnId = childYarnJobs.iterator().next();
-            System.out.println("Found Map-Reduce job [" + childJobYarnId + "] already running");
-            // Need the JobID version for Oozie
-            childJobId = TypeConverter.fromYarn(childJobYarnId).toString();
-        }
-        return childJobId;
+        System.out.println("Child yarn jobs are found - " + StringUtils.join(childYarnJobs, ","));
+        return childYarnJobs;
     }
 
     public static void killChildYarnJobs(Configuration actionConf) {
@@ -106,4 +102,4 @@ public class LauncherMainHadoopUtils {
             throw new RuntimeException("Exception occurred while killing child job(s)", ioe);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1b5cccd..a0c4557 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-2129 Duplicate child jobs per instance (jaydeepvishwakarma via shwethags)
 OOZIE-2214 fix test case TestCoordRerunXCommand.testCoordRerunDateNeg (ryota)
 OOZIE-2213 oozie-setup.ps1 should use "start-process" rather than "cmd /c" to invoke OozieSharelibCLI or OozieDBCLI commands (bzhang)
 OOZIE-2210 Update extjs 2.2 link (bzhang)

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
----------------------------------------------------------------------
diff --git a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
index 5ea4e1a..84bdb79 100644
--- a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
+++ b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
@@ -85,6 +85,8 @@ public class HiveMain extends LauncherMain {
 
         hiveConf.addResource(new Path("file:///", actionXml));
 
+        setYarnTag(hiveConf);
+
         // Propagate delegation related props from launcher job to Hive job
         String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
         if (delegationToken != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
----------------------------------------------------------------------
diff --git a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
index 304e391..557969e 100644
--- a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
+++ b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
@@ -72,6 +72,7 @@ public class Hive2Main extends LauncherMain {
         }
 
         actionConf.addResource(new Path("file:///", actionXml));
+        setYarnTag(actionConf);
 
         // Propagate delegation related props from launcher job to Hive job
         String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
@@ -199,6 +200,11 @@ public class Hive2Main extends LauncherMain {
             arguments.add(beelineArg);
         }
 
+        if (actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS) != null ) {
+            arguments.add("--hiveconf");
+            arguments.add("mapreduce.job.tags=" + actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS));
+        }
+
         System.out.println("Beeline command arguments :");
         for (String arg : arguments) {
             System.out.println("             " + arg);

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/oozie/pom.xml b/sharelib/oozie/pom.xml
index 087b6de..484fb45 100644
--- a/sharelib/oozie/pom.xml
+++ b/sharelib/oozie/pom.xml
@@ -139,6 +139,5 @@
             </plugin>
         </plugins>
     </build>
-
 </project>
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
index f58ff1d..10a1b12 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
@@ -20,10 +20,7 @@
 package org.apache.oozie.action.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 
-import java.io.File;
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
@@ -43,6 +40,8 @@ public class JavaMain extends LauncherMain {
 
         Configuration actionConf = loadActionConf();
 
+        setYarnTag(actionConf);
+
         LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
 
         Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index 0860484..2288ed0 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.JobConf;
 public abstract class LauncherMain {
 
     public static final String HADOOP_JOBS = "hadoopJobs";
+    public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";
 
     protected static void run(Class<? extends LauncherMain> klass, String[] args) throws Exception {
         LauncherMain main = klass.newInstance();
@@ -181,7 +182,7 @@ public abstract class LauncherMain {
      * @return action  Configuration
      * @throws IOException
      */
-    protected Configuration loadActionConf() throws IOException {
+    public static Configuration loadActionConf() throws IOException {
         // loading action conf prepared by Oozie
         Configuration actionConf = new Configuration(false);
 
@@ -197,6 +198,18 @@ public abstract class LauncherMain {
         actionConf.addResource(new Path("file:///", actionXml));
         return actionConf;
     }
+
+    protected static void setYarnTag(Configuration actionConf) {
+        if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) {
+            // in case the user set their own tags, appending the launcher tag.
+            if(actionConf.get(MAPREDUCE_JOB_TAGS) != null) {
+                actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(MAPREDUCE_JOB_TAGS) + ","
+                        + actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+            } else {
+                actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+            }
+        }
+    }
 }
 
 class LauncherMainException extends Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index 9c3128f..fe38976 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -21,12 +21,15 @@ package org.apache.oozie.action.hadoop;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.io.OutputStream;
+import java.io.FileOutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.security.Permission;
@@ -78,6 +81,8 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
     static final String ACTION_DATA_NEW_ID = "newId";
     static final String ACTION_DATA_ERROR_PROPS = "error.properties";
     public static final String HADOOP2_WORKAROUND_DISTRIBUTED_CACHE = "oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache";
+    public static final String PROPAGATION_CONF_XML = "propagation-conf.xml";
+    public static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id";
 
     private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException {
         try {
@@ -171,6 +176,9 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
 
                     setupMainConfiguration();
 
+                    // Propagating the conf to use by child job.
+                    propagateToHadoopConf();
+
                     try {
                         System.out.println("Starting the execution of prepare actions");
                         executePrepare();
@@ -322,6 +330,34 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
         System.out.println();
     }
 
+    /**
+     * Pushing all important conf to hadoop conf for the action
+     */
+    private void propagateToHadoopConf() throws IOException {
+        Configuration propagationConf = new Configuration(false);
+        if (System.getProperty(OOZIE_ACTION_ID) != null) {
+            propagationConf.set(OOZIE_ACTION_ID, System.getProperty(OOZIE_ACTION_ID));
+        }
+        if (System.getProperty(OOZIE_JOB_ID) != null) {
+            propagationConf.set(OOZIE_JOB_ID, System.getProperty(OOZIE_JOB_ID));
+        }
+        if(System.getProperty(OOZIE_LAUNCHER_JOB_ID) != null) {
+            propagationConf.set(OOZIE_LAUNCHER_JOB_ID, System.getProperty(OOZIE_LAUNCHER_JOB_ID));
+        }
+
+        // loading action conf prepared by Oozie
+        Configuration actionConf = LauncherMain.loadActionConf();
+
+        if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) {
+            propagationConf.set(LauncherMain.MAPREDUCE_JOB_TAGS,
+                    actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+        }
+
+        propagationConf.writeXml(new FileWriter(PROPAGATION_CONF_XML));
+        Configuration.dumpConfiguration(propagationConf, new OutputStreamWriter(System.out));
+        Configuration.addDefaultResource(PROPAGATION_CONF_XML);
+    }
+
     protected JobConf getJobConf() {
         return jobConf;
     }
@@ -421,8 +457,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
         Path pathNew = new Path(new Path(actionDir, ACTION_CONF_XML),
                 new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
         FileSystem fs = FileSystem.get(pathNew.toUri(), getJobConf());
-        fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML),
-                new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
+        fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML), new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
 
         System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id"));
         System.setProperty(OOZIE_JOB_ID, getJobConf().get(OOZIE_JOB_ID));
@@ -434,7 +469,6 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
         System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
-        System.setProperty("oozie.job.launch.time", getJobConf().get("oozie.job.launch.time"));
         String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS);
         if (actionConfigClass != null) {
             System.setProperty(OOZIE_ACTION_CONFIG_CLASS, actionConfigClass);

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
index 61cec7e..23447cf 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
@@ -46,35 +46,25 @@ public class MapReduceMain extends LauncherMain {
         // loading action conf prepared by Oozie
         Configuration actionConf = new Configuration(false);
         actionConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+        setYarnTag(actionConf);
 
         JobConf jobConf = new JobConf();
         addActionConf(jobConf, actionConf);
+        LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
 
         // Run a config class if given to update the job conf
         runConfigClass(jobConf);
 
         logMasking("Map-Reduce job configuration:", new HashSet<String>(), jobConf);
 
-        String jobId = LauncherMainHadoopUtils.getYarnJobForMapReduceAction(jobConf);
         File idFile = new File(System.getProperty(LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_NEW_ID));
-        if (jobId != null) {
-            if (!idFile.exists()) {
-                System.out.print("JobId file is mising: writing now... ");
-                writeJobIdFile(idFile, jobId);
-                System.out.print("Done");
-            }
-            System.out.println("Exiting launcher");
-            System.out.println();
-        }
-        else {
-            System.out.println("Submitting Oozie action Map-Reduce job");
-            System.out.println();
-            // submitting job
-            RunningJob runningJob = submitJob(jobConf);
-
-            jobId = runningJob.getID().toString();
-            writeJobIdFile(idFile, jobId);
-        }
+        System.out.println("Submitting Oozie action Map-Reduce job");
+        System.out.println();
+        // submitting job
+        RunningJob runningJob = submitJob(jobConf);
+
+        String jobId = runningJob.getID().toString();
+        writeJobIdFile(idFile, jobId);
 
         System.out.println("=======================");
         System.out.println();

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
index 129022a..8228e88 100644
--- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
+++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
@@ -95,6 +95,7 @@ public class PigMain extends LauncherMain {
         }
 
         actionConf.addResource(new Path("file:///", actionXml));
+        setYarnTag(actionConf);
 
         Properties pigProperties = new Properties();
         for (Map.Entry<String, String> entry : actionConf) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index c532532..51a4251 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -49,7 +49,11 @@
             <artifactId>commons-lang</artifactId>
             <scope>compile</scope>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-sharelib-oozie</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
index dcf3868..b18a0b9 100644
--- a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
@@ -22,8 +22,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.deploy.SparkSubmit;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -43,6 +41,7 @@ public class SparkMain extends LauncherMain {
     @Override
     protected void run(String[] args) throws Exception {
         Configuration actionConf = loadActionConf();
+        setYarnTag(actionConf);
         LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
 
         List<String> sparkArgs = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
----------------------------------------------------------------------
diff --git a/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
index 1ffaf10..6ba7238 100644
--- a/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
+++ b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
@@ -60,6 +60,7 @@ public class SqoopMain extends LauncherMain {
         }
 
         sqoopConf.addResource(new Path("file:///", actionXml));
+        setYarnTag(sqoopConf);
 
         String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
         if (delegationToken != null) {