You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by as...@apache.org on 2017/09/11 10:01:55 UTC
[1/2] oozie git commit: Revert "OOZIE-2916 Set a job name for the MR
Action's child job (asasvari)"
Repository: oozie
Updated Branches:
refs/heads/master 82925e4d2 -> 5e1c9d362
Revert "OOZIE-2916 Set a job name for the MR Action's child job (asasvari)"
This reverts commit 82925e4d21796fd4dc1c9648f00677b98d7dbb81.
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/efc7a822
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/efc7a822
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/efc7a822
Branch: refs/heads/master
Commit: efc7a822762453daf8d56c78b7a88343f7b709d3
Parents: 82925e4
Author: Attila Sasvari <as...@cloudera.com>
Authored: Mon Sep 11 11:47:51 2017 +0200
Committer: Attila Sasvari <as...@cloudera.com>
Committed: Mon Sep 11 11:47:51 2017 +0200
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 22 +---
.../action/hadoop/MapReduceActionExecutor.java | 10 --
.../action/hadoop/TestJavaActionExecutor.java | 1 +
release-log.txt | 1 -
.../hadoop/TestMapReduceActionExecutor.java | 109 ++++++++++---------
5 files changed, 66 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 2b1cc7d..bca79aa 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -147,7 +147,6 @@ public class JavaActionExecutor extends ActionExecutor {
private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
private static final String HADOOP_JOB_NAME = "mapred.job.name";
private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
- public static final String OOZIE_ACTION_NAME = "oozie.action.name";
private static int maxActionOutputLen;
private static int maxExternalStatsSize;
@@ -945,7 +944,6 @@ public class JavaActionExecutor extends ActionExecutor {
// action job configuration
Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
- addAppNameContext(action, context);
setupActionConf(actionConf, context, actionXml, appPathRoot);
LOG.debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
@@ -1074,19 +1072,6 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
- protected void addAppNameContext(WorkflowAction action, Context context) {
- String oozieActionName = String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s",
- getType(),
- context.getWorkflow().getAppName(),
- action.getName(),
- context.getWorkflow().getId());
- context.setVar(OOZIE_ACTION_NAME, oozieActionName);
- }
-
- protected String getAppName(Context context) {
- return context.getVar(OOZIE_ACTION_NAME);
- }
-
private Credentials ensureCredentials(final Credentials credentials) {
if (credentials == null) {
LOG.debug("No credentials present, creating a new one.");
@@ -1144,10 +1129,13 @@ public class JavaActionExecutor extends ActionExecutor {
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
- String appName = getAppName(context);
+ String jobName = XLog.format(
+ "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
+ context.getWorkflow().getAppName(), actionName,
+ context.getWorkflow().getId());
appContext.setApplicationId(appId);
- appContext.setApplicationName(appName);
+ appContext.setApplicationName(jobName);
appContext.setApplicationType("Oozie Launcher");
Priority pri = Records.newRecord(Priority.class);
int priority = 0; // TODO: OYA: Add a constant or a config
http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 22d5526..338e508 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,7 +50,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url";
- private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name";
private XLog log = XLog.getLog(getClass());
public MapReduceActionExecutor() {
@@ -163,7 +161,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
regularMR = true;
}
}
- setJobName(actionConf, context);
actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
// For "regular" (not streaming or pipes) MR jobs
@@ -208,13 +205,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
return actionConf;
}
- private void setJobName(Configuration actionConf, Context context) {
- String jobName = getAppName(context);
- if (jobName != null) {
- actionConf.set(MAPREDUCE_JOB_NAME, jobName.replace("oozie:launcher", "oozie:action"));
- }
- }
-
@Override
public void end(Context context, WorkflowAction action) throws ActionExecutorException {
super.end(context, action);
http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index d1d78fd..ce674ad 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 56e955b..82a10aa 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,6 +1,5 @@
-- Oozie 5.0.0 release (trunk - unreleased)
-OOZIE-2916 Set a job name for the MR Action's child job (asasvari)
OOZIE-2858 HiveMain, ShellMain and SparkMain should not overwrite properties and config files locally (gezapeti)
OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko)
OOZIE-3026 fix openjpa enhancer stage during build for logging (dbdist13, andras.piros via pbacsko)
http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index 7237769..2c92f41 100644
--- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.streaming.StreamJob;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -469,7 +468,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
String user = conf.get("user.name");
- JobClient jobClient = getHadoopAccessorService().createJobClient(user, conf);
+ JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
org.apache.hadoop.mapreduce.JobID jobID = TypeConverter.fromYarn(
ConverterUtils.toApplicationId(externalChildIDs));
final RunningJob mrJob = jobClient.getJob(JobID.downgrade(jobID));
@@ -516,7 +515,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -533,7 +535,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
- writeDummyInput(fs, outputDir);
+ Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt")));
+ ow.write("dummy\n");
+ ow.write("dummy\n");
+ ow.close();
String actionXml = "<map-reduce>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
@@ -557,7 +562,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
Path jobXml = new Path(getFsTestCaseDir(), "action.xml");
XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
@@ -582,7 +590,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -607,7 +618,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
conf.setBoolean("oozie.test.throw.exception", true); // causes OozieActionConfiguratorForTest to throw an exception
@@ -633,7 +647,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -658,7 +675,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -719,7 +739,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -776,37 +799,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
}
}
- public void testJobNameSetForMapReduceChildren() throws Exception {
- Services serv = Services.get();
- serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
-
- final FileSystem fs = getFileSystem();
- final Path inputDir = new Path(getFsTestCaseDir(), "input");
- final Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
-
- final String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
- + getNameNodeUri() + "</name-node>"
- + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
-
- final String extId = _testSubmit(MAP_REDUCE, actionXml);
- final ApplicationId appId = ConverterUtils.toApplicationId(extId);
- final Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
- final String name = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getName();
- assertTrue(name.contains("oozie:action"));
- }
-
- private void writeDummyInput(FileSystem fs, Path inputDir) throws IOException {
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
- }
-
- private HadoopAccessorService getHadoopAccessorService() {
- return Services.get().get(HadoopAccessorService.class);
- }
-
public void testMapReduceWithUberJarEnabled() throws Exception {
Services serv = Services.get();
boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
@@ -835,7 +827,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
OutputStream os = fs.create(new Path(getAppPath(), streamingJar));
IOUtils.copyStream(is, os);
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + " <streaming>" + " <mapper>cat</mapper>"
@@ -922,7 +917,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + " <pipes>" + " <program>" + programPath
@@ -946,7 +944,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
// set user stats write property as true explicitly in the
// configuration.
@@ -1005,7 +1006,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
// set user stats write property as false explicitly in the
// configuration.
@@ -1058,7 +1062,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
// set user stats write property as false explicitly in the
// configuration.
@@ -1125,7 +1132,11 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- writeDummyInput(fs, inputDir);
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir,
+ "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
XConfiguration mrConfig = getMapReduceConfig(inputDir.toString(),
outputDir.toString());
@@ -1161,8 +1172,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
- Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
- final YarnClient yarnClient = getHadoopAccessorService().createYarnClient(getTestUser(), conf);
+ Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
+ final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), conf);
ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(externalChildIDs));
// Assert Mapred job name has been set
assertEquals(mapredJobName, report.getName());
[2/2] oozie git commit: OOZIE-2916 Set a job name for the MR Action's
child job (asasvari)
Posted by as...@apache.org.
OOZIE-2916 Set a job name for the MR Action's child job (asasvari)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5e1c9d36
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5e1c9d36
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5e1c9d36
Branch: refs/heads/master
Commit: 5e1c9d362afe1b2c6423a386aeac7f04d3337f65
Parents: efc7a82
Author: Attila Sasvari <as...@cloudera.com>
Authored: Mon Sep 11 12:00:58 2017 +0200
Committer: Attila Sasvari <as...@cloudera.com>
Committed: Mon Sep 11 12:00:58 2017 +0200
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 22 +++-
.../action/hadoop/MapReduceActionExecutor.java | 10 ++
.../action/hadoop/TestJavaActionExecutor.java | 1 -
release-log.txt | 1 +
.../hadoop/TestMapReduceActionExecutor.java | 109 +++++++++----------
5 files changed, 77 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/5e1c9d36/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index bca79aa..49fd4b8 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -147,6 +147,7 @@ public class JavaActionExecutor extends ActionExecutor {
private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
private static final String HADOOP_JOB_NAME = "mapred.job.name";
private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
+ private static final String OOZIE_ACTION_NAME = "oozie.action.name";
private static int maxActionOutputLen;
private static int maxExternalStatsSize;
@@ -944,6 +945,7 @@ public class JavaActionExecutor extends ActionExecutor {
// action job configuration
Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
+ addAppNameContext(action, context);
setupActionConf(actionConf, context, actionXml, appPathRoot);
LOG.debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
@@ -1072,6 +1074,19 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
+ protected void addAppNameContext(WorkflowAction action, Context context) {
+ String oozieActionName = String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s",
+ getType(),
+ context.getWorkflow().getAppName(),
+ action.getName(),
+ context.getWorkflow().getId());
+ context.setVar(OOZIE_ACTION_NAME, oozieActionName);
+ }
+
+ protected String getAppName(Context context) {
+ return context.getVar(OOZIE_ACTION_NAME);
+ }
+
private Credentials ensureCredentials(final Credentials credentials) {
if (credentials == null) {
LOG.debug("No credentials present, creating a new one.");
@@ -1129,13 +1144,10 @@ public class JavaActionExecutor extends ActionExecutor {
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
- String jobName = XLog.format(
- "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
- context.getWorkflow().getAppName(), actionName,
- context.getWorkflow().getId());
+ String appName = getAppName(context);
appContext.setApplicationId(appId);
- appContext.setApplicationName(jobName);
+ appContext.setApplicationName(appName);
appContext.setApplicationType("Oozie Launcher");
Priority pri = Records.newRecord(Priority.class);
int priority = 0; // TODO: OYA: Add a constant or a config
http://git-wip-us.apache.org/repos/asf/oozie/blob/5e1c9d36/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 338e508..22d5526 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -50,6 +51,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url";
+ private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name";
private XLog log = XLog.getLog(getClass());
public MapReduceActionExecutor() {
@@ -161,6 +163,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
regularMR = true;
}
}
+ setJobName(actionConf, context);
actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
// For "regular" (not streaming or pipes) MR jobs
@@ -205,6 +208,13 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
return actionConf;
}
+ private void setJobName(Configuration actionConf, Context context) {
+ String jobName = getAppName(context);
+ if (jobName != null) {
+ actionConf.set(MAPREDUCE_JOB_NAME, jobName.replace("oozie:launcher", "oozie:action"));
+ }
+ }
+
@Override
public void end(Context context, WorkflowAction action) throws ActionExecutorException {
super.end(context, action);
http://git-wip-us.apache.org/repos/asf/oozie/blob/5e1c9d36/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index ce674ad..d1d78fd 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -33,7 +33,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/oozie/blob/5e1c9d36/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 82a10aa..e1ef0df 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.0.0 release (trunk - unreleased)
+OOZIE-2916 Set a job name for the MR Action's child job (asasvari)
OOZIE-2858 HiveMain, ShellMain and SparkMain should not overwrite properties and config files locally (gezapeti)
OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko)
OOZIE-3026 fix openjpa enhancer stage during build for logging (dbdist13, andras.piros via pbacsko)
http://git-wip-us.apache.org/repos/asf/oozie/blob/5e1c9d36/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index 2c92f41..f460b6b 100644
--- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.streaming.StreamJob;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -468,7 +469,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
String user = conf.get("user.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+ JobClient jobClient = getHadoopAccessorService().createJobClient(user, conf);
org.apache.hadoop.mapreduce.JobID jobID = TypeConverter.fromYarn(
ConverterUtils.toApplicationId(externalChildIDs));
final RunningJob mrJob = jobClient.getJob(JobID.downgrade(jobID));
@@ -515,10 +516,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -535,10 +533,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
- Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt")));
- ow.write("dummy\n");
- ow.write("dummy\n");
- ow.close();
+ writeDummyInput(fs, outputDir);
String actionXml = "<map-reduce>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
@@ -562,10 +557,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
Path jobXml = new Path(getFsTestCaseDir(), "action.xml");
XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
@@ -590,10 +582,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -618,10 +607,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
conf.setBoolean("oozie.test.throw.exception", true); // causes OozieActionConfiguratorForTest to throw an exception
@@ -647,10 +633,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -675,10 +658,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -739,10 +719,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -799,6 +776,37 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
}
}
+ public void testJobNameSetForMapReduceChild() throws Exception {
+ Services serv = Services.get();
+ serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
+
+ final FileSystem fs = getFileSystem();
+ final Path inputDir = new Path(getFsTestCaseDir(), "input");
+ final Path outputDir = new Path(getFsTestCaseDir(), "output");
+ writeDummyInput(fs, inputDir);
+
+ final String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ + getNameNodeUri() + "</name-node>"
+ + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
+
+ final String extId = _testSubmit(MAP_REDUCE, actionXml);
+ final ApplicationId appId = ConverterUtils.toApplicationId(extId);
+ final Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
+ final String name = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getName();
+ assertTrue(name.contains("oozie:action"));
+ }
+
+ private void writeDummyInput(FileSystem fs, Path inputDir) throws IOException {
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
+ }
+
+ private HadoopAccessorService getHadoopAccessorService() {
+ return Services.get().get(HadoopAccessorService.class);
+ }
+
public void testMapReduceWithUberJarEnabled() throws Exception {
Services serv = Services.get();
boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
@@ -827,10 +835,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
OutputStream os = fs.create(new Path(getAppPath(), streamingJar));
IOUtils.copyStream(is, os);
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + " <streaming>" + " <mapper>cat</mapper>"
@@ -917,10 +922,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + " <pipes>" + " <program>" + programPath
@@ -944,10 +946,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
// set user stats write property as true explicitly in the
// configuration.
@@ -1006,10 +1005,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
// set user stats write property as false explicitly in the
// configuration.
@@ -1062,10 +1058,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
// set user stats write property as false explicitly in the
// configuration.
@@ -1132,11 +1125,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir,
- "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- w.close();
+ writeDummyInput(fs, inputDir);
XConfiguration mrConfig = getMapReduceConfig(inputDir.toString(),
outputDir.toString());
@@ -1172,8 +1161,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
- Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
- final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), conf);
+ Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
+ final YarnClient yarnClient = getHadoopAccessorService().createYarnClient(getTestUser(), conf);
ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(externalChildIDs));
// Assert Mapred job name has been set
assertEquals(mapredJobName, report.getName());