You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by sh...@apache.org on 2015/04/27 07:20:49 UTC
oozie git commit: OOZIE-2129 Duplicate child jobs per instance
(jaydeepvishwakarma via shwethags)
Repository: oozie
Updated Branches:
refs/heads/master 6aefbc710 -> 3e20533b0
OOZIE-2129 Duplicate child jobs per instance (jaydeepvishwakarma via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3e20533b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3e20533b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3e20533b
Branch: refs/heads/master
Commit: 3e20533b0fc75ee91ed7e6bad9eff07a63dba35c
Parents: 6aefbc7
Author: Shwetha GS <ss...@hortonworks.com>
Authored: Mon Apr 27 10:50:45 2015 +0530
Committer: Shwetha GS <ss...@hortonworks.com>
Committed: Mon Apr 27 10:50:45 2015 +0530
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 11 +++++-
.../action/hadoop/LauncherMapperHelper.java | 14 +++----
.../action/oozie/SubWorkflowActionExecutor.java | 8 ++++
.../oozie/command/wf/ActionStartXCommand.java | 16 ++++++++
.../action/hadoop/LauncherMainHadoopUtils.java | 4 --
.../action/hadoop/LauncherMainHadoopUtils.java | 4 --
.../action/hadoop/LauncherMainHadoopUtils.java | 34 ++++++++---------
.../action/hadoop/LauncherMainHadoopUtils.java | 34 ++++++++---------
release-log.txt | 1 +
.../apache/oozie/action/hadoop/HiveMain.java | 2 +
.../apache/oozie/action/hadoop/Hive2Main.java | 6 +++
sharelib/oozie/pom.xml | 1 -
.../apache/oozie/action/hadoop/JavaMain.java | 5 +--
.../oozie/action/hadoop/LauncherMain.java | 15 +++++++-
.../oozie/action/hadoop/LauncherMapper.java | 40 ++++++++++++++++++--
.../oozie/action/hadoop/MapReduceMain.java | 28 +++++---------
.../org/apache/oozie/action/hadoop/PigMain.java | 1 +
sharelib/spark/pom.xml | 6 ++-
.../SparkMain.java | 3 +-
.../apache/oozie/action/hadoop/SqoopMain.java | 1 +
20 files changed, 151 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index eb2dbdb..695853e 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -57,6 +57,7 @@ import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.command.wf.ActionStartXCommand;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
@@ -884,8 +885,16 @@ public class JavaActionExecutor extends ActionExecutor {
launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context);
+ String launcherTag = null;
+ // Extracting tag and appending action name to maintain the uniqueness.
+ if (context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) {
+ launcherTag = context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG);
+ } else { //Keeping it to maintain backward compatibly with test cases.
+ launcherTag = action.getId();
+ }
+
// Properties for when a launcher job's AM gets restarted
- LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, action.getId());
+ LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, launcherTag);
String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType());
if (actionShareLibProperty != null) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index 069a734..6a93232 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -23,8 +23,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -164,17 +162,19 @@ public class LauncherMapperHelper {
launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
}
- public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String actionId)
+ public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String launcherTag)
throws NoSuchAlgorithmException {
launcherJobConf.setLong("oozie.job.launch.time", System.currentTimeMillis());
// Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
- String tag = getTag(actionId);
- actionConf.set("mapreduce.job.tags", tag);
+ String tag = getTag(launcherTag);
+ // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
+ // mapreduce.job.tags should only go to child job launch by launcher.
+ actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag);
}
- private static String getTag(String actionId) throws NoSuchAlgorithmException {
+ private static String getTag(String launcherTag) throws NoSuchAlgorithmException {
MessageDigest digest = MessageDigest.getInstance("MD5");
- digest.update(actionId.getBytes(), 0, actionId.length());
+ digest.update(launcherTag.getBytes(), 0, launcherTag.length());
String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
return md5;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
index 527a5e2..854d621 100644
--- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.DagEngine;
import org.apache.oozie.LocalOozieClient;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.wf.ActionStartXCommand;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.DagEngineService;
import org.apache.oozie.client.WorkflowAction;
@@ -181,6 +182,13 @@ public class SubWorkflowActionExecutor extends ActionExecutor {
//TODO: this has to be refactored later to be done in a single place for REST calls and this
JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
subWorkflowConf);
+
+ // pushing the tag to conf for using by Launcher.
+ if(context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) {
+ subWorkflowConf.set(ActionStartXCommand.OOZIE_ACTION_YARN_TAG,
+ context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG));
+ }
+
// if the rerun failed node option is provided during the time of rerun command, old subworkflow will
// rerun again.
if(action.getExternalId() != null && parentConf.getBoolean(OozieClient.RERUN_FAIL_NODES, false)) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
index d4048a1..e06649c 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
@@ -68,6 +68,7 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
public static final String COULD_NOT_START = "COULD_NOT_START";
public static final String START_DATA_MISSING = "START_DATA_MISSING";
public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
+ public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
private String jobId = null;
private String actionId = null;
@@ -231,6 +232,21 @@ public class ActionStartXCommand extends ActionXCommand<Void> {
Instrumentation.Cron cron = new Instrumentation.Cron();
cron.start();
context.setStartTime();
+ /*
+ Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only
+ one child job is running. Tag is formed as follows:
+ For workflow job, tag = action-id
+ For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else
+ coord-action-id@subflow-action-name@action-name.
+ */
+ if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+ context.setVar(OOZIE_ACTION_YARN_TAG, conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName());
+ } else if (wfJob.getParentId() != null) {
+ context.setVar(OOZIE_ACTION_YARN_TAG, wfJob.getParentId() + "@" + wfAction.getName());
+ } else {
+ context.setVar(OOZIE_ACTION_YARN_TAG, wfAction.getId());
+ }
+
executor.start(context, wfAction);
cron.stop();
FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index 46c2fbd..9e34d0b 100644
--- a/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-0.23/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -26,10 +26,6 @@ public class LauncherMainHadoopUtils {
private LauncherMainHadoopUtils() {
}
- public static String getYarnJobForMapReduceAction(Configuration actionConf) {
- return null;
- }
-
public static void killChildYarnJobs(Configuration actionConf) {
// no-op
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index 46c2fbd..9e34d0b 100644
--- a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -26,10 +26,6 @@ public class LauncherMainHadoopUtils {
private LauncherMainHadoopUtils() {
}
- public static String getYarnJobForMapReduceAction(Configuration actionConf) {
- return null;
- }
-
public static void killChildYarnJobs(Configuration actionConf) {
// no-op
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index f6bb6a4..9331c13 100644
--- a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
@@ -33,29 +35,35 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.mapreduce.TypeConverter;
public class LauncherMainHadoopUtils {
+ public static final String CHILD_MAPREDUCE_JOB_TAGS = "oozie.child.mapreduce.job.tags";
+
private LauncherMainHadoopUtils() {
}
private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
- Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+ System.out.println("Fetching child yarn jobs");
long startTime = 0L;
try {
startTime = Long.parseLong((System.getProperty("oozie.job.launch.time")));
} catch(NumberFormatException nfe) {
throw new RuntimeException("Could not find Oozie job launch time", nfe);
}
- String tag = actionConf.get("mapreduce.job.tags");
- if (tag == null) {
- throw new RuntimeException("Could not find Yarn tags property (mapreduce.job.tags)");
+
+ Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+ if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) == null) {
+ System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS);
+ return childYarnJobs;
}
+
+ String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+ System.out.println("tag id : " + tag);
GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
gar.setScope(ApplicationsRequestScope.OWN);
- gar.setStartRange(startTime, System.currentTimeMillis());
gar.setApplicationTags(Collections.singleton(tag));
+ gar.setStartRange(startTime, System.currentTimeMillis());
try {
ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
GetApplicationsResponse apps = proxy.getApplications(gar);
@@ -68,19 +76,9 @@ public class LauncherMainHadoopUtils {
} catch (YarnException ye) {
throw new RuntimeException("Exception occurred while finding child jobs", ye);
}
- return childYarnJobs;
- }
- public static String getYarnJobForMapReduceAction(Configuration actionConf) {
- Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
- String childJobId = null;
- if (!childYarnJobs.isEmpty()) {
- ApplicationId childJobYarnId = childYarnJobs.iterator().next();
- System.out.println("Found Map-Reduce job [" + childJobYarnId + "] already running");
- // Need the JobID version for Oozie
- childJobId = TypeConverter.fromYarn(childJobYarnId).toString();
- }
- return childJobId;
+ System.out.println("Child yarn jobs are found - " + StringUtils.join(childYarnJobs, ","));
+ return childYarnJobs;
}
public static void killChildYarnJobs(Configuration actionConf) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index f6bb6a4..211ba09 100644
--- a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -33,29 +33,35 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.mapreduce.TypeConverter;
public class LauncherMainHadoopUtils {
+ public static final String CHILD_MAPREDUCE_JOB_TAGS = "oozie.child.mapreduce.job.tags";
+
private LauncherMainHadoopUtils() {
}
private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
- Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+ System.out.println("Fetching child yarn jobs");
long startTime = 0L;
try {
startTime = Long.parseLong((System.getProperty("oozie.job.launch.time")));
} catch(NumberFormatException nfe) {
throw new RuntimeException("Could not find Oozie job launch time", nfe);
}
- String tag = actionConf.get("mapreduce.job.tags");
- if (tag == null) {
- throw new RuntimeException("Could not find Yarn tags property (mapreduce.job.tags)");
+
+ Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+ if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) == null) {
+ System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS);
+ return childYarnJobs;
}
+
+ String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+ System.out.println("tag id : " + tag);
GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
gar.setScope(ApplicationsRequestScope.OWN);
- gar.setStartRange(startTime, System.currentTimeMillis());
gar.setApplicationTags(Collections.singleton(tag));
+ gar.setStartRange(startTime, System.currentTimeMillis());
try {
ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
GetApplicationsResponse apps = proxy.getApplications(gar);
@@ -68,19 +74,9 @@ public class LauncherMainHadoopUtils {
} catch (YarnException ye) {
throw new RuntimeException("Exception occurred while finding child jobs", ye);
}
- return childYarnJobs;
- }
- public static String getYarnJobForMapReduceAction(Configuration actionConf) {
- Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
- String childJobId = null;
- if (!childYarnJobs.isEmpty()) {
- ApplicationId childJobYarnId = childYarnJobs.iterator().next();
- System.out.println("Found Map-Reduce job [" + childJobYarnId + "] already running");
- // Need the JobID version for Oozie
- childJobId = TypeConverter.fromYarn(childJobYarnId).toString();
- }
- return childJobId;
+ System.out.println("Child yarn jobs are found - " + StringUtils.join(childYarnJobs, ","));
+ return childYarnJobs;
}
public static void killChildYarnJobs(Configuration actionConf) {
@@ -106,4 +102,4 @@ public class LauncherMainHadoopUtils {
throw new RuntimeException("Exception occurred while killing child job(s)", ioe);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1b5cccd..a0c4557 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.2.0 release (trunk - unreleased)
+OOZIE-2129 Duplicate child jobs per instance (jaydeepvishwakarma via shwethags)
OOZIE-2214 fix test case TestCoordRerunXCommand.testCoordRerunDateNeg (ryota)
OOZIE-2213 oozie-setup.ps1 should use "start-process" rather than "cmd /c" to invoke OozieSharelibCLI or OozieDBCLI commands (bzhang)
OOZIE-2210 Update extjs 2.2 link (bzhang)
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
----------------------------------------------------------------------
diff --git a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
index 5ea4e1a..84bdb79 100644
--- a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
+++ b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
@@ -85,6 +85,8 @@ public class HiveMain extends LauncherMain {
hiveConf.addResource(new Path("file:///", actionXml));
+ setYarnTag(hiveConf);
+
// Propagate delegation related props from launcher job to Hive job
String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
if (delegationToken != null) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
----------------------------------------------------------------------
diff --git a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
index 304e391..557969e 100644
--- a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
+++ b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
@@ -72,6 +72,7 @@ public class Hive2Main extends LauncherMain {
}
actionConf.addResource(new Path("file:///", actionXml));
+ setYarnTag(actionConf);
// Propagate delegation related props from launcher job to Hive job
String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
@@ -199,6 +200,11 @@ public class Hive2Main extends LauncherMain {
arguments.add(beelineArg);
}
+ if (actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS) != null ) {
+ arguments.add("--hiveconf");
+ arguments.add("mapreduce.job.tags=" + actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS));
+ }
+
System.out.println("Beeline command arguments :");
for (String arg : arguments) {
System.out.println(" " + arg);
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/oozie/pom.xml b/sharelib/oozie/pom.xml
index 087b6de..484fb45 100644
--- a/sharelib/oozie/pom.xml
+++ b/sharelib/oozie/pom.xml
@@ -139,6 +139,5 @@
</plugin>
</plugins>
</build>
-
</project>
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
index f58ff1d..10a1b12 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
@@ -20,10 +20,7 @@
package org.apache.oozie.action.hadoop;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import java.io.File;
-import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -43,6 +40,8 @@ public class JavaMain extends LauncherMain {
Configuration actionConf = loadActionConf();
+ setYarnTag(actionConf);
+
LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index 0860484..2288ed0 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.JobConf;
public abstract class LauncherMain {
public static final String HADOOP_JOBS = "hadoopJobs";
+ public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";
protected static void run(Class<? extends LauncherMain> klass, String[] args) throws Exception {
LauncherMain main = klass.newInstance();
@@ -181,7 +182,7 @@ public abstract class LauncherMain {
* @return action Configuration
* @throws IOException
*/
- protected Configuration loadActionConf() throws IOException {
+ public static Configuration loadActionConf() throws IOException {
// loading action conf prepared by Oozie
Configuration actionConf = new Configuration(false);
@@ -197,6 +198,18 @@ public abstract class LauncherMain {
actionConf.addResource(new Path("file:///", actionXml));
return actionConf;
}
+
+ protected static void setYarnTag(Configuration actionConf) {
+ if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) {
+ // in case the user set their own tags, appending the launcher tag.
+ if(actionConf.get(MAPREDUCE_JOB_TAGS) != null) {
+ actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(MAPREDUCE_JOB_TAGS) + ","
+ + actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+ } else {
+ actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+ }
+ }
+ }
}
class LauncherMainException extends Exception {
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index 9c3128f..fe38976 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -21,12 +21,15 @@ package org.apache.oozie.action.hadoop;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.io.OutputStream;
+import java.io.FileOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.Permission;
@@ -78,6 +81,8 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
static final String ACTION_DATA_NEW_ID = "newId";
static final String ACTION_DATA_ERROR_PROPS = "error.properties";
public static final String HADOOP2_WORKAROUND_DISTRIBUTED_CACHE = "oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache";
+ public static final String PROPAGATION_CONF_XML = "propagation-conf.xml";
+ public static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id";
private void setRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws LauncherException {
try {
@@ -171,6 +176,9 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
setupMainConfiguration();
+ // Propagating the conf to use by child job.
+ propagateToHadoopConf();
+
try {
System.out.println("Starting the execution of prepare actions");
executePrepare();
@@ -322,6 +330,34 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
System.out.println();
}
+ /**
+ * Pushing all important conf to hadoop conf for the action
+ */
+ private void propagateToHadoopConf() throws IOException {
+ Configuration propagationConf = new Configuration(false);
+ if (System.getProperty(OOZIE_ACTION_ID) != null) {
+ propagationConf.set(OOZIE_ACTION_ID, System.getProperty(OOZIE_ACTION_ID));
+ }
+ if (System.getProperty(OOZIE_JOB_ID) != null) {
+ propagationConf.set(OOZIE_JOB_ID, System.getProperty(OOZIE_JOB_ID));
+ }
+ if(System.getProperty(OOZIE_LAUNCHER_JOB_ID) != null) {
+ propagationConf.set(OOZIE_LAUNCHER_JOB_ID, System.getProperty(OOZIE_LAUNCHER_JOB_ID));
+ }
+
+ // loading action conf prepared by Oozie
+ Configuration actionConf = LauncherMain.loadActionConf();
+
+ if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) {
+ propagationConf.set(LauncherMain.MAPREDUCE_JOB_TAGS,
+ actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+ }
+
+ propagationConf.writeXml(new FileWriter(PROPAGATION_CONF_XML));
+ Configuration.dumpConfiguration(propagationConf, new OutputStreamWriter(System.out));
+ Configuration.addDefaultResource(PROPAGATION_CONF_XML);
+ }
+
protected JobConf getJobConf() {
return jobConf;
}
@@ -421,8 +457,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
Path pathNew = new Path(new Path(actionDir, ACTION_CONF_XML),
new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
FileSystem fs = FileSystem.get(pathNew.toUri(), getJobConf());
- fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML),
- new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
+ fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML), new Path(new File(ACTION_CONF_XML).getAbsolutePath()));
System.setProperty("oozie.launcher.job.id", getJobConf().get("mapred.job.id"));
System.setProperty(OOZIE_JOB_ID, getJobConf().get(OOZIE_JOB_ID));
@@ -434,7 +469,6 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath());
System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
- System.setProperty("oozie.job.launch.time", getJobConf().get("oozie.job.launch.time"));
String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS);
if (actionConfigClass != null) {
System.setProperty(OOZIE_ACTION_CONFIG_CLASS, actionConfigClass);
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
index 61cec7e..23447cf 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
@@ -46,35 +46,25 @@ public class MapReduceMain extends LauncherMain {
// loading action conf prepared by Oozie
Configuration actionConf = new Configuration(false);
actionConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+ setYarnTag(actionConf);
JobConf jobConf = new JobConf();
addActionConf(jobConf, actionConf);
+ LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
// Run a config class if given to update the job conf
runConfigClass(jobConf);
logMasking("Map-Reduce job configuration:", new HashSet<String>(), jobConf);
- String jobId = LauncherMainHadoopUtils.getYarnJobForMapReduceAction(jobConf);
File idFile = new File(System.getProperty(LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_NEW_ID));
- if (jobId != null) {
- if (!idFile.exists()) {
- System.out.print("JobId file is mising: writing now... ");
- writeJobIdFile(idFile, jobId);
- System.out.print("Done");
- }
- System.out.println("Exiting launcher");
- System.out.println();
- }
- else {
- System.out.println("Submitting Oozie action Map-Reduce job");
- System.out.println();
- // submitting job
- RunningJob runningJob = submitJob(jobConf);
-
- jobId = runningJob.getID().toString();
- writeJobIdFile(idFile, jobId);
- }
+ System.out.println("Submitting Oozie action Map-Reduce job");
+ System.out.println();
+ // submitting job
+ RunningJob runningJob = submitJob(jobConf);
+
+ String jobId = runningJob.getID().toString();
+ writeJobIdFile(idFile, jobId);
System.out.println("=======================");
System.out.println();
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
index 129022a..8228e88 100644
--- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
+++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
@@ -95,6 +95,7 @@ public class PigMain extends LauncherMain {
}
actionConf.addResource(new Path("file:///", actionXml));
+ setYarnTag(actionConf);
Properties pigProperties = new Properties();
for (Map.Entry<String, String> entry : actionConf) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index c532532..51a4251 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -49,7 +49,11 @@
<artifactId>commons-lang</artifactId>
<scope>compile</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.oozie</groupId>
+ <artifactId>oozie-sharelib-oozie</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
index dcf3868..b18a0b9 100644
--- a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
@@ -22,8 +22,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.deploy.SparkSubmit;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@@ -43,6 +41,7 @@ public class SparkMain extends LauncherMain {
@Override
protected void run(String[] args) throws Exception {
Configuration actionConf = loadActionConf();
+ setYarnTag(actionConf);
LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
List<String> sparkArgs = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/oozie/blob/3e20533b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
----------------------------------------------------------------------
diff --git a/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
index 1ffaf10..6ba7238 100644
--- a/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
+++ b/sharelib/sqoop/src/main/java/org/apache/oozie/action/hadoop/SqoopMain.java
@@ -60,6 +60,7 @@ public class SqoopMain extends LauncherMain {
}
sqoopConf.addResource(new Path("file:///", actionXml));
+ setYarnTag(sqoopConf);
String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
if (delegationToken != null) {