You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by tu...@apache.org on 2012/10/04 18:36:20 UTC
svn commit: r1394147 - in /oozie/trunk: ./
core/src/main/java/org/apache/oozie/action/hadoop/
core/src/test/java/org/apache/oozie/action/hadoop/
Author: tucu
Date: Thu Oct 4 16:36:20 2012
New Revision: 1394147
URL: http://svn.apache.org/viewvc?rev=1394147&view=rev
Log:
OOZIE-949 Allow the user to set 'mapred.job.name' (jrkinley via tucu)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
oozie/trunk/release-log.txt
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1394147&r1=1394146&r2=1394147&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Thu Oct 4 16:36:20 2012
@@ -82,6 +82,7 @@ public class JavaActionExecutor extends
private static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
private static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
private static final String HADOOP_NAME_NODE = "fs.default.name";
+ private static final String HADOOP_JOB_NAME = "mapred.job.name";
public static final String OOZIE_COMMON_LIBDIR = "oozie";
public static final int MAX_EXTERNAL_STATS_SIZE_DEFAULT = Integer.MAX_VALUE;
private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
@@ -543,9 +544,15 @@ public class JavaActionExecutor extends
launcherJobConf.set(ACTION_SHARELIB_FOR + getType(), actionShareLibProperty);
}
setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
- String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
- .getAppName(), action.getName(), context.getWorkflow().getId());
+
+ String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
+ if (jobName == null || jobName.isEmpty()) {
+ jobName = XLog.format(
+ "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
+ context.getWorkflow().getAppName(), action.getName(),
+ context.getWorkflow().getId());
launcherJobConf.setJobName(jobName);
+ }
String jobId = context.getWorkflow().getId();
String actionId = action.getId();
@@ -649,9 +656,15 @@ public class JavaActionExecutor extends
setupActionConf(actionConf, context, actionXml, appPathRoot);
XLog.getLog(getClass()).debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
- String jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", getType(), context.getWorkflow()
- .getAppName(), action.getName(), context.getWorkflow().getId());
- actionConf.set("mapred.job.name", jobName);
+
+ String jobName = actionConf.get(HADOOP_JOB_NAME);
+ if (jobName == null || jobName.isEmpty()) {
+ jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
+ getType(), context.getWorkflow().getAppName(),
+ action.getName(), context.getWorkflow().getId());
+ actionConf.set(HADOOP_JOB_NAME, jobName);
+ }
+
injectActionCallback(context, actionConf);
if (context.getWorkflow().getAcl() != null) {
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java?rev=1394147&r1=1394146&r2=1394147&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java Thu Oct 4 16:36:20 2012
@@ -181,12 +181,6 @@ public class MapReduceActionExecutor ext
.getExternalId(), action.getId());
}
- // TODO this has to be done in a better way
- if (!runningJob.getJobName().startsWith("oozie:action:")) {
- throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "MR001",
- "ID swap should have happened in launcher job [{0}]", action.getExternalId());
- }
-
Counters counters = runningJob.getCounters();
if (counters != null) {
ActionStats stats = new MRStats(counters);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java?rev=1394147&r1=1394146&r2=1394147&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java Thu Oct 4 16:36:20 2012
@@ -795,6 +795,97 @@ public class TestMapReduceActionExecutor
assertTrue(counters.contains("Counter"));
}
+ /**
+ * Test "oozie.launcher.mapred.job.name" and "mapred.job.name" can be set in
+ * the action configuration and not overridden by the action executor
+ *
+ * @throws Exception
+ */
+ public void testSetMapredJobName() throws Exception {
+ final String launcherJobName = "MapReduceLauncherTest";
+ final String mapredJobName = "MapReduceTest";
+
+ FileSystem fs = getFileSystem();
+
+ Path inputDir = new Path(getFsTestCaseDir(), "input");
+ Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir,
+ "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
+
+ XConfiguration mrConfig = getMapReduceConfig(inputDir.toString(),
+ outputDir.toString());
+ mrConfig.set("oozie.launcher.mapred.job.name", launcherJobName);
+ mrConfig.set("mapred.job.name", mapredJobName);
+
+ StringBuilder sb = new StringBuilder("<map-reduce>")
+ .append("<job-tracker>").append(getJobTrackerUri())
+ .append("</job-tracker>").append("<name-node>")
+ .append(getNameNodeUri()).append("</name-node>")
+ .append(mrConfig.toXmlString(false)).append("</map-reduce>");
+ String actionXml = sb.toString();
+
+ Context context = createContext("map-reduce", actionXml);
+ final RunningJob launcherJob = submitAction(context);
+ String launcherId = context.getAction().getExternalId();
+ waitFor(120 * 2000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ return launcherJob.isComplete();
+ }
+ });
+
+ assertTrue(launcherJob.isSuccessful());
+ assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+ // Assert launcher job name has been set
+ System.out.println("Launcher job name: " + launcherJob.getJobName());
+ assertTrue(launcherJob.getJobName().equals(launcherJobName));
+
+ MapReduceActionExecutor ae = new MapReduceActionExecutor();
+ ae.check(context, context.getAction());
+ assertFalse(launcherId.equals(context.getAction().getExternalId()));
+
+ JobConf conf = ae.createBaseHadoopConf(context,
+ XmlUtils.parseXml(actionXml));
+ String user = conf.get("user.name");
+
+ JobClient jobClient = Services.get().get(HadoopAccessorService.class)
+ .createJobClient(user, conf);
+ final RunningJob mrJob = jobClient.getJob(JobID.forName(context
+ .getAction().getExternalId()));
+
+ waitFor(120 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ return mrJob.isComplete();
+ }
+ });
+ assertTrue(mrJob.isSuccessful());
+ ae.check(context, context.getAction());
+
+ assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+ assertNull(context.getAction().getData());
+
+ ae.end(context, context.getAction());
+ assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
+
+ // Assert Mapred job name has been set
+ System.out.println("Mapred job name: " + mrJob.getJobName());
+ assertTrue(mrJob.getJobName().equals(mapredJobName));
+
+ // Assert for stats info stored in the context.
+ assertNull(context.getExecutionStats());
+
+ // External Child IDs will always be null in case of MR action.
+ assertNull(context.getExternalChildIDs());
+
+ // hadoop.counters will always be set in case of MR action.
+ assertNotNull(context.getVar("hadoop.counters"));
+ String counters = context.getVar("hadoop.counters");
+ assertTrue(counters.contains("Counter"));
+ }
+
public void testDefaultShareLibName() {
MapReduceActionExecutor ae = new MapReduceActionExecutor();
Element e = new Element("mapreduce");
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1394147&r1=1394146&r2=1394147&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Thu Oct 4 16:36:20 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-949 Allow the user to set 'mapred.job.name' (jrkinley via tucu)
OOZIE-1009 Documentation pages should use default ports for Oozie/JT/NN (tucu)
OOZIE-669 Deprecate oozie-start.sh, oozie-stop.sh & oozie-run.sh scripts (rkanter via tucu)
OOZIE-1005 Tests from OOZIE-994 use wrong condition in waitFor (rkanter via virag)