You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by as...@apache.org on 2017/09/11 09:46:27 UTC

oozie git commit: OOZIE-2916 Set a job name for the MR Action's child job (asasvari)

Repository: oozie
Updated Branches:
  refs/heads/master 71d3ddc31 -> 82925e4d2


OOZIE-2916 Set a job name for the MR Action's child job (asasvari)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/82925e4d
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/82925e4d
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/82925e4d

Branch: refs/heads/master
Commit: 82925e4d21796fd4dc1c9648f00677b98d7dbb81
Parents: 71d3ddc
Author: Attila Sasvari <as...@cloudera.com>
Authored: Mon Sep 11 11:46:09 2017 +0200
Committer: Attila Sasvari <as...@cloudera.com>
Committed: Mon Sep 11 11:46:09 2017 +0200

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java |  22 +++-
 .../action/hadoop/MapReduceActionExecutor.java  |  10 ++
 .../action/hadoop/TestJavaActionExecutor.java   |   1 -
 release-log.txt                                 |   1 +
 .../hadoop/TestMapReduceActionExecutor.java     | 109 +++++++++----------
 5 files changed, 77 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/82925e4d/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index bca79aa..2b1cc7d 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -147,6 +147,7 @@ public class JavaActionExecutor extends ActionExecutor {
     private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
     private static final String HADOOP_JOB_NAME = "mapred.job.name";
     private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
+    public static final String OOZIE_ACTION_NAME = "oozie.action.name";
 
     private static int maxActionOutputLen;
     private static int maxExternalStatsSize;
@@ -944,6 +945,7 @@ public class JavaActionExecutor extends ActionExecutor {
 
             // action job configuration
             Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
+            addAppNameContext(action, context);
             setupActionConf(actionConf, context, actionXml, appPathRoot);
             LOG.debug("Setting LibFilesArchives ");
             setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
@@ -1072,6 +1074,19 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
+    protected void addAppNameContext(WorkflowAction action, Context context) {
+        String oozieActionName = String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s",
+                getType(),
+                context.getWorkflow().getAppName(),
+                action.getName(),
+                context.getWorkflow().getId());
+        context.setVar(OOZIE_ACTION_NAME, oozieActionName);
+    }
+
+    protected String getAppName(Context context) {
+        return context.getVar(OOZIE_ACTION_NAME);
+    }
+
     private Credentials ensureCredentials(final Credentials credentials) {
         if (credentials == null) {
             LOG.debug("No credentials present, creating a new one.");
@@ -1129,13 +1144,10 @@ public class JavaActionExecutor extends ActionExecutor {
 
         ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
 
-        String jobName = XLog.format(
-                "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
-                context.getWorkflow().getAppName(), actionName,
-                context.getWorkflow().getId());
+        String appName = getAppName(context);
 
         appContext.setApplicationId(appId);
-        appContext.setApplicationName(jobName);
+        appContext.setApplicationName(appName);
         appContext.setApplicationType("Oozie Launcher");
         Priority pri = Records.newRecord(Priority.class);
         int priority = 0; // TODO: OYA: Add a constant or a config

http://git-wip-us.apache.org/repos/asf/oozie/blob/82925e4d/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 338e508..22d5526 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,6 +51,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
     public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
     private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
     public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url";
+    private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name";
     private XLog log = XLog.getLog(getClass());
 
     public MapReduceActionExecutor() {
@@ -161,6 +163,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
                 regularMR = true;
             }
         }
+        setJobName(actionConf, context);
         actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
 
         // For "regular" (not streaming or pipes) MR jobs
@@ -205,6 +208,13 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
         return actionConf;
     }
 
+    private void setJobName(Configuration actionConf, Context context) {
+        String jobName = getAppName(context);
+        if (jobName != null) {
+            actionConf.set(MAPREDUCE_JOB_NAME, jobName.replace("oozie:launcher", "oozie:action"));
+        }
+    }
+
     @Override
     public void end(Context context, WorkflowAction action) throws ActionExecutorException {
         super.end(context, action);

http://git-wip-us.apache.org/repos/asf/oozie/blob/82925e4d/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index ce674ad..d1d78fd 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -33,7 +33,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;

http://git-wip-us.apache.org/repos/asf/oozie/blob/82925e4d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 82a10aa..56e955b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-2916 Set a job name for the MR Action's child job (asasvari)
 OOZIE-2858 HiveMain, ShellMain and SparkMain should not overwrite properties and config files locally (gezapeti)
 OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko)
 OOZIE-3026 fix openjpa enhancer stage during build for logging (dbdist13, andras.piros via pbacsko)

http://git-wip-us.apache.org/repos/asf/oozie/blob/82925e4d/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index 2c92f41..7237769 100644
--- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.streaming.StreamJob;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -468,7 +469,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
         Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+        JobClient jobClient = getHadoopAccessorService().createJobClient(user, conf);
         org.apache.hadoop.mapreduce.JobID jobID = TypeConverter.fromYarn(
                 ConverterUtils.toApplicationId(externalChildIDs));
         final RunningJob mrJob = jobClient.getJob(JobID.downgrade(jobID));
@@ -515,10 +516,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
@@ -535,10 +533,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
         w.write("dummy\n");
         w.write("dummy\n");
-        Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt")));
-        ow.write("dummy\n");
-        ow.write("dummy\n");
-        ow.close();
+        writeDummyInput(fs, outputDir);
 
         String actionXml = "<map-reduce>" +
                 "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
@@ -562,10 +557,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         Path jobXml = new Path(getFsTestCaseDir(), "action.xml");
         XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
@@ -590,10 +582,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
@@ -618,10 +607,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
         conf.setBoolean("oozie.test.throw.exception", true);        // causes OozieActionConfiguratorForTest to throw an exception
@@ -647,10 +633,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
@@ -675,10 +658,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
@@ -739,10 +719,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
@@ -799,6 +776,37 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         }
     }
 
+    public void testJobNameSetForMapReduceChildren() throws Exception {
+        Services serv = Services.get();
+        serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
+
+        final FileSystem fs = getFileSystem();
+        final Path inputDir = new Path(getFsTestCaseDir(), "input");
+        final Path outputDir = new Path(getFsTestCaseDir(), "output");
+        writeDummyInput(fs, inputDir);
+
+        final String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>"
+                + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
+
+        final String extId = _testSubmit(MAP_REDUCE, actionXml);
+        final ApplicationId appId = ConverterUtils.toApplicationId(extId);
+        final Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
+        final String name = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getName();
+        assertTrue(name.contains("oozie:action"));
+    }
+
+    private void writeDummyInput(FileSystem fs, Path inputDir) throws IOException {
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+    }
+
+    private HadoopAccessorService getHadoopAccessorService() {
+        return Services.get().get(HadoopAccessorService.class);
+    }
+
     public void testMapReduceWithUberJarEnabled() throws Exception {
         Services serv = Services.get();
         boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
@@ -827,10 +835,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         OutputStream os = fs.create(new Path(getAppPath(), streamingJar));
         IOUtils.copyStream(is, os);
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>" + "      <streaming>" + "        <mapper>cat</mapper>"
@@ -917,10 +922,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
             Path inputDir = new Path(getFsTestCaseDir(), "input");
             Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-            Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-            w.write("dummy\n");
-            w.write("dummy\n");
-            w.close();
+            writeDummyInput(fs, inputDir);
 
             String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                     + getNameNodeUri() + "</name-node>" + "      <pipes>" + "        <program>" + programPath
@@ -944,10 +946,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         // set user stats write property as true explicitly in the
         // configuration.
@@ -1006,10 +1005,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         // set user stats write property as false explicitly in the
         // configuration.
@@ -1062,10 +1058,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         // set user stats write property as false explicitly in the
         // configuration.
@@ -1132,11 +1125,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         Path inputDir = new Path(getFsTestCaseDir(), "input");
         Path outputDir = new Path(getFsTestCaseDir(), "output");
 
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir,
-                "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        w.close();
+        writeDummyInput(fs, inputDir);
 
         XConfiguration mrConfig = getMapReduceConfig(inputDir.toString(),
                 outputDir.toString());
@@ -1172,8 +1161,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
         ae.end(context, context.getAction());
         assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
 
-        Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
-        final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), conf);
+        Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
+        final YarnClient yarnClient = getHadoopAccessorService().createYarnClient(getTestUser(), conf);
         ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(externalChildIDs));
         // Assert Mapred job name has been set
         assertEquals(mapredJobName, report.getName());