You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by tu...@apache.org on 2012/08/30 01:28:05 UTC
svn commit: r1378769 - in /incubator/oozie/trunk: ./
core/src/main/java/org/apache/oozie/action/hadoop/ core/src/main/resources/
core/src/test/java/org/apache/oozie/action/hadoop/ docs/src/site/twiki/
Author: tucu
Date: Wed Aug 29 23:28:04 2012
New Revision: 1378769
URL: http://svn.apache.org/viewvc?rev=1378769&view=rev
Log:
OOZIE-654 Provide a way to use 'uber' jars with Oozie MR actions (rkanter via tucu)
Added:
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki
incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
incubator/oozie/trunk/pom.xml
incubator/oozie/trunk/release-log.txt
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java Wed Aug 29 23:28:04 2012
@@ -22,6 +22,7 @@ import java.io.StringReader;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
@@ -30,6 +31,7 @@ import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.Services;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
@@ -41,6 +43,8 @@ public class MapReduceActionExecutor ext
public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
public static final String HADOOP_COUNTERS = "hadoop.counters";
+ public static final String OOZIE_MAPREDUCE_UBER_JAR = "oozie.mapreduce.uber.jar";
+ public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
private XLog log = XLog.getLog(getClass());
public MapReduceActionExecutor() {
@@ -86,6 +90,7 @@ public class MapReduceActionExecutor ext
@SuppressWarnings("unchecked")
Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
throws ActionExecutorException {
+ boolean regularMR = false;
Namespace ns = actionXml.getNamespace();
if (actionXml.getChild("streaming", ns) != null) {
Element streamingXml = actionXml.getChild("streaming", ns);
@@ -115,8 +120,47 @@ public class MapReduceActionExecutor ext
String program = pipesXml.getChildTextTrim("program", ns);
PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
}
+ else {
+ regularMR = true;
+ }
}
actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
+
+ // For "regular" (not streaming or pipes) MR jobs
+ if (regularMR) {
+ // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
+ String uberJar = actionConf.get(OOZIE_MAPREDUCE_UBER_JAR);
+ if (uberJar != null) {
+ if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
+ throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
+ "{0} property is not allowed. Set {1} to true in oozie-site to enable.", OOZIE_MAPREDUCE_UBER_JAR,
+ OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
+ }
+ String nameNode = actionXml.getChildTextTrim("name-node", ns);
+ if (nameNode != null) {
+ Path uberJarPath = new Path(uberJar);
+ if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
+ if (uberJarPath.isAbsolute()) { // absolute path without namenode --> prepend namenode
+ Path nameNodePath = new Path(nameNode);
+ String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
+ + "://" + nameNodePath.toUri().getAuthority();
+ actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(nameNodeSchemeAuthority + uberJarPath).toString());
+ }
+ else { // relative path --> prepend app path
+ actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
+ }
+ }
+ }
+ }
+ }
+ else {
+ if (actionConf.get(OOZIE_MAPREDUCE_UBER_JAR) != null) {
+ log.warn("The " + OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not streaming nor pipes)"
+ + " workflows, ignoring");
+ actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, "");
+ }
+ }
+
return actionConf;
}
@@ -236,4 +280,22 @@ public class MapReduceActionExecutor ext
return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
}
+ @Override
+ JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
+ Configuration actionConf) throws ActionExecutorException {
+ // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
+ // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
+ // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
+ // argument and we can just look up the uber jar in the actionConf argument.
+ JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
+ Namespace ns = actionXml.getNamespace();
+ if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
+ // Set for uber jar
+ String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR);
+ if (uberJar != null && uberJar.trim().length() > 0) {
+ launcherJobConf.setJar(uberJar);
+ }
+ }
+ return launcherJobConf;
+ }
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java Wed Aug 29 23:28:04 2012
@@ -79,6 +79,12 @@ public class MapReduceMain extends Launc
JobConf jobConf = new JobConf();
addActionConf(jobConf, actionConf);
+ // Set for uber jar
+ String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR);
+ if (uberJar != null && uberJar.trim().length() > 0) {
+ jobConf.setJar(uberJar);
+ }
+
// propagate delegation related props from launcher job to MR job
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
Modified: incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ incubator/oozie/trunk/core/src/main/resources/oozie-default.xml Wed Aug 29 23:28:04 2012
@@ -25,6 +25,16 @@
<!-- ************************** VERY IMPORTANT ************************** -->
<property>
+ <name>oozie.action.mapreduce.uber.jar.enable</name>
+ <value>false</value>
+ <description>
+ If true, enables the oozie.mapreduce.uber.jar mapreduce workflow configuration property, which is used to specify an
+ uber jar in HDFS. Submitting a workflow with an uber jar requires at least Hadoop 2.2.0 or 1.2.0. If false, workflows
+ which specify the oozie.mapreduce.uber.jar configuration property will fail.
+ </description>
+ </property>
+
+ <property>
<name>oozie.processing.timezone</name>
<value>UTC</value>
<description>
Added: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java?rev=1378769&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java (added)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java Wed Aug 29 23:28:04 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Iterator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+public class MapperReducerUberJarForTest implements Mapper, Reducer {
+ public static final String GROUP = "g";
+ public static final String NAME = "c";
+
+ public static void main(String[] args) {
+ System.out.println("hello!");
+ }
+
+ public void configure(JobConf jobConf) {
+ }
+
+ public void close() throws IOException {
+ }
+
+ private static final LongWritable zero = new LongWritable(0);
+
+ @SuppressWarnings("unchecked")
+ public void map(Object key, Object value, OutputCollector collector, Reporter reporter) throws IOException {
+ ClassLoader applicationClassLoader = this.getClass().getClassLoader();
+ if (applicationClassLoader == null) {
+ applicationClassLoader = ClassLoader.getSystemClassLoader();
+ }
+ URL[] urls = ((URLClassLoader) applicationClassLoader).getURLs();
+ for (URL url : urls) {
+ collector.collect(zero, new Text(url.toString()));
+ }
+ reporter.incrCounter(GROUP, NAME, 5l);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void reduce(Object key, Iterator values, OutputCollector collector, Reporter reporter)
+ throws IOException {
+ while (values.hasNext()) {
+ collector.collect(values.next(), NullWritable.get());
+ }
+ }
+}
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java Wed Aug 29 23:28:04 2012
@@ -41,11 +41,18 @@ import java.io.File;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.Writer;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
+import java.util.Scanner;
+import java.util.jar.JarOutputStream;
+import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.oozie.action.ActionExecutorException;
public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
@@ -62,6 +69,13 @@ public class TestMapReduceActionExecutor
assertTrue(new File(jar.toString()).exists());
}
+ public Element createUberJarActionXML(String uberJarPath, String additional) throws Exception{
+ return XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ + "<name-node>" + getNameNodeUri() + "</name-node>" + additional + "<configuration>"
+ + "<property><name>oozie.mapreduce.uber.jar</name><value>" + uberJarPath + "</value></property>"
+ + "</configuration>" + "</map-reduce>");
+ }
+
public void testSetupMethods() throws Exception {
MapReduceActionExecutor ae = new MapReduceActionExecutor();
@@ -104,6 +118,69 @@ public class TestMapReduceActionExecutor
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertEquals("IN", conf.get("mapred.input.dir"));
+ // Enable uber jars to test that MapReduceActionExecutor picks up the oozie.mapreduce.uber.jar property correctly
+ Services serv = Services.get();
+ boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+ serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
+
+ actionXml = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", "");
+ conf = ae.createBaseHadoopConf(context, actionXml);
+ ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+ assertEquals(getNameNodeUri() + "/app/job.jar", conf.get("oozie.mapreduce.uber.jar")); // absolute path with namenode
+ JobConf launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+ assertEquals(getNameNodeUri() + "/app/job.jar", launcherJobConf.getJar()); // same for launcher conf
+
+ actionXml = createUberJarActionXML("/app/job.jar", "");
+ conf = ae.createBaseHadoopConf(context, actionXml);
+ ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+ assertEquals(getNameNodeUri() + "/app/job.jar", conf.get("oozie.mapreduce.uber.jar")); // absolute path without namenode
+ launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+ assertEquals(getNameNodeUri() + "/app/job.jar", launcherJobConf.getJar()); // same for launcher conf
+
+ actionXml = createUberJarActionXML("job.jar", "");
+ conf = ae.createBaseHadoopConf(context, actionXml);
+ ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+ assertEquals(getFsTestCaseDir() + "/job.jar", conf.get("oozie.mapreduce.uber.jar")); // relative path
+ launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+ assertEquals(getFsTestCaseDir() + "/job.jar", launcherJobConf.getJar()); // same for launcher
+
+ actionXml = createUberJarActionXML("job.jar", "<streaming></streaming>");
+ conf = ae.createBaseHadoopConf(context, actionXml);
+ ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+ assertEquals("", conf.get("oozie.mapreduce.uber.jar")); // ignored for streaming
+ launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+ assertNull(launcherJobConf.getJar()); // same for launcher conf (not set)
+
+ actionXml = createUberJarActionXML("job.jar", "<pipes></pipes>");
+ conf = ae.createBaseHadoopConf(context, actionXml);
+ ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+ assertEquals("", conf.get("oozie.mapreduce.uber.jar")); // ignored for pipes
+ launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+ assertNull(launcherJobConf.getJar()); // same for launcher conf (not set)
+
+ actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ + "<name-node>" + getNameNodeUri() + "</name-node>" + "</map-reduce>");
+ conf = ae.createBaseHadoopConf(context, actionXml);
+ ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+ assertNull(conf.get("oozie.mapreduce.uber.jar")); // doesn't resolve if not set
+ launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+ assertNull(launcherJobConf.getJar()); // same for launcher conf
+
+ // Disable uber jars to test that MapReduceActionExecutor won't allow the oozie.mapreduce.uber.jar property
+ serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+ try {
+ actionXml = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", "");
+ conf = ae.createBaseHadoopConf(context, actionXml);
+ ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+ assertEquals(getNameNodeUri() + "/app/job.jar", conf.get("oozie.mapreduce.uber.jar"));
+ } catch (ActionExecutorException aee) {
+ assertEquals("MR003", aee.getErrorCode());
+ assertEquals(ActionExecutorException.ErrorType.ERROR, aee.getErrorType());
+ assertTrue(aee.getMessage().contains("oozie.action.mapreduce.uber.jar.enable"));
+ assertTrue(aee.getMessage().contains("oozie.mapreduce.uber.jar"));
+ }
+ serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
+
actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+ "<name-node>" + getNameNodeUri() + "</name-node>" + "<streaming>" + "<mapper>M</mapper>"
+ "<reducer>R</reducer>" + "<record-reader>RR</record-reader>"
@@ -221,7 +298,7 @@ public class TestMapReduceActionExecutor
return runningJob;
}
- private void _testSubmit(String name, String actionXml) throws Exception {
+ private String _testSubmit(String name, String actionXml) throws Exception {
Context context = createContext(name, actionXml);
final RunningJob launcherJob = submitAction(context);
@@ -266,6 +343,8 @@ public class TestMapReduceActionExecutor
//External Child IDs will always be null in case of MR action.
assertNull(context.getExternalChildIDs());
+
+ return mrJob.getID().toString();
}
private void _testSubmitWithCredentials(String name, String actionXml) throws Exception {
@@ -327,6 +406,16 @@ public class TestMapReduceActionExecutor
return conf;
}
+ protected XConfiguration getMapReduceUberJarConfig(String inputDir, String outputDir) throws Exception{
+ XConfiguration conf = new XConfiguration();
+ conf.set("mapred.mapper.class", MapperReducerUberJarForTest.class.getName());
+ conf.set("mapred.reducer.class", MapperReducerUberJarForTest.class.getName());
+ conf.set("mapred.input.dir", inputDir);
+ conf.set("mapred.output.dir", outputDir);
+ conf.set("oozie.mapreduce.uber.jar", createAndUploadUberJar().toUri().toString());
+ return conf;
+ }
+
public void testMapReduce() throws Exception {
FileSystem fs = getFileSystem();
@@ -362,6 +451,121 @@ public class TestMapReduceActionExecutor
_testSubmitWithCredentials("map-reduce", actionXml);
}
+ protected Path createAndUploadUberJar() throws Exception {
+ Path localJobJarPath = makeUberJarWithLib(getTestCaseDir());
+ Path remoteJobJarPath = new Path(getAppPath(), localJobJarPath.getName());
+ getFileSystem().moveFromLocalFile(localJobJarPath, remoteJobJarPath);
+ File localJobJarFile = new File(localJobJarPath.toUri().toString());
+ if (localJobJarFile.exists()) { // just to make sure
+ localJobJarFile.delete();
+ }
+ return remoteJobJarPath;
+ }
+
+ private Path makeUberJarWithLib(String testDir) throws Exception {
+ Path jobJarPath = new Path(testDir, "uber.jar");
+ FileOutputStream fos = new FileOutputStream(new File(jobJarPath.toUri().getPath()));
+ JarOutputStream jos = new JarOutputStream(fos);
+ // Have to put in real jar files or it will complain
+ createAndAddJarToJar(jos, new File(new Path(testDir, "lib1.jar").toUri().getPath()));
+ createAndAddJarToJar(jos, new File(new Path(testDir, "lib2.jar").toUri().getPath()));
+ jos.close();
+ return jobJarPath;
+ }
+
+ private void createAndAddJarToJar(JarOutputStream jos, File jarFile) throws Exception {
+ FileOutputStream fos2 = new FileOutputStream(jarFile);
+ JarOutputStream jos2 = new JarOutputStream(fos2);
+ // Have to have at least one entry or it will complain
+ ZipEntry ze = new ZipEntry(jarFile.getName() + ".inside");
+ jos2.putNextEntry(ze);
+ jos2.closeEntry();
+ jos2.close();
+ ze = new ZipEntry("lib/" + jarFile.getName());
+ jos.putNextEntry(ze);
+ FileInputStream in = new FileInputStream(jarFile);
+ byte buf[] = new byte[1024];
+ int numRead;
+ do {
+ numRead = in.read(buf);
+ if (numRead >= 0) {
+ jos.write(buf, 0, numRead);
+ }
+ } while (numRead != -1);
+ in.close();
+ jos.closeEntry();
+ jarFile.delete();
+ }
+
+ public void _testMapReduceWithUberJar() throws Exception {
+ FileSystem fs = getFileSystem();
+
+ Path inputDir = new Path(getFsTestCaseDir(), "input");
+ Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+ Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+ w.write("dummy\n");
+ w.write("dummy\n");
+ w.close();
+
+ String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ + getNameNodeUri() + "</name-node>"
+ + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
+ String jobID = _testSubmit("map-reduce", actionXml);
+
+ boolean containsLib1Jar = false;
+ String lib1JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar";
+ Pattern lib1JarPatYarn = Pattern.compile(
+ ".*appcache/application_" + jobID.replaceFirst("job_", "") + "/filecache/.*/uber.jar/lib/lib1.jar");
+ boolean containsLib2Jar = false;
+ String lib2JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar";
+ Pattern lib2JarPatYarn = Pattern.compile(
+ ".*appcache/application_" + jobID.replaceFirst("job_", "") + "/filecache/.*/uber.jar/lib/lib2.jar");
+
+ FileStatus[] fstats = getFileSystem().listStatus(outputDir);
+ for (FileStatus fstat : fstats) {
+ Path p = fstat.getPath();
+ if (getFileSystem().isFile(p) && p.getName().startsWith("part-")) {
+ InputStream is = getFileSystem().open(p);
+ Scanner sc = new Scanner(is);
+ while (sc.hasNextLine()) {
+ String line = sc.nextLine();
+ containsLib1Jar = (containsLib1Jar || line.endsWith(lib1JarStr) || lib1JarPatYarn.matcher(line).matches());
+ containsLib2Jar = (containsLib2Jar || line.endsWith(lib2JarStr) || lib2JarPatYarn.matcher(line).matches());
+ }
+ sc.close();
+ is.close();
+ }
+ }
+
+ assertTrue("lib/lib1.jar should have been unzipped from the uber jar and added to the classpath but was not",
+ containsLib1Jar);
+ assertTrue("lib/lib2.jar should have been unzipped from the uber jar and added to the classpath but was not",
+ containsLib2Jar);
+ }
+
+ // With the oozie.action.mapreduce.uber.jar.enable property set to false, a workflow with an uber jar should fail
+ // (this happens before we get to Hadoop, so not having the correct version of Hadoop doesn't matter here; the
+ // TestMapReduceActionExecutorUberJar.testMapReduceWithUberJarEnabled() test actually tests the uber jar functionality, but
+ // this test is excluded by default)
+ public void testMapReduceWithUberJarDisabled() throws Exception {
+ Services serv = Services.get();
+ boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+ try {
+ serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+ _testMapReduceWithUberJar();
+ } catch (ActionExecutorException aee) {
+ assertEquals("MR003", aee.getErrorCode());
+ assertEquals(ActionExecutorException.ErrorType.ERROR, aee.getErrorType());
+ assertTrue(aee.getMessage().contains("oozie.action.mapreduce.uber.jar.enable"));
+ assertTrue(aee.getMessage().contains("oozie.mapreduce.uber.jar"));
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
+ }
+ }
+
protected XConfiguration getStreamingConfig(String inputDir, String outputDir) {
XConfiguration conf = new XConfiguration();
conf.set("mapred.input.dir", inputDir);
Added: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java?rev=1378769&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java (added)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java Wed Aug 29 23:28:04 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.oozie.service.Services;
+
+public class TestMapReduceActionExecutorUberJar extends TestMapReduceActionExecutor {
+
+ // This test requires at least Hadoop 2.2.x or 1.2.x or it will fail so it is excluded from the default tests in the pom.xml
+ public void testMapReduceWithUberJarEnabled() throws Exception {
+ Services serv = Services.get();
+ boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+ try {
+ serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
+ _testMapReduceWithUberJar();
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
+ }
+ }
+
+ @Override
+ public void testDefaultShareLibName() {
+ // skip test
+ }
+
+ @Override
+ public void testMapReduce() throws Exception {
+ // skip test
+ }
+
+ @Override
+ public void testLauncherJar() throws Exception {
+ // skip test
+ }
+
+ @Override
+ public void testMapReduceWithCredentials() throws Exception {
+ // skip test
+ }
+
+ @Override
+ public void testMapReduceWithUberJarDisabled() throws Exception {
+ // skip test
+ }
+
+ @Override
+ public void testSetExecutionStats_when_user_has_specified_stats_write_FALSE() throws Exception {
+ // skip test
+ }
+
+ @Override
+ public void testSetExecutionStats_when_user_has_specified_stats_write_TRUE() throws Exception {
+ // skip test
+ }
+
+ @Override
+ public void testSetupMethods() throws Exception {
+ // skip test
+ }
+
+ @Override
+ public void testStreaming() throws Exception {
+ // skip test
+ }
+
+ @Override
+ public void testPipes() throws Exception {
+ // skip test
+ }
+}
\ No newline at end of file
Modified: incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki (original)
+++ incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki Wed Aug 29 23:28:04 2012
@@ -425,6 +425,24 @@ be expressed in the corresponding timezo
For more details on using an alternate Oozie processing timezone, please reffer to the
[[CoordinatorFunctionalSpec#datetime][Coordinator Fuctional Specification, section '4. Datetime']]
+#UberJar
+---++ MapReduce Workflow Uber Jars
+For Map-Reduce jobs (not including streaming or pipes), additional jar files can also be included via an uber jar. An uber jar is a
+jar file that contains additional jar files within a "lib" folder (see
+[[WorkflowFunctionalSpec#AppDeployment][Workflow Functional Specification]] for more information). Submitting a workflow with an uber jar
+requires at least Hadoop 2.2.0 or 1.2.0. As such, using uber jars in a workflow is disabled by default. To enable this feature, use
+the =oozie.action.mapreduce.uber.jar.enable= property in the =oozie-site.xml= (and make sure to use a supported version of Hadoop).
+
+<verbatim>
+<configuration>
+ <property>
+ <name>oozie.action.mapreduce.uber.jar.enable</name>
+ <value>true</value>
+ </property>
+</configuration>
+</verbatim>
+
+
---++ Advanced/Custom Environment Settings
Oozie can be configured to use Unix standard filesystem hierarchy for its different files
Modified: incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki (original)
+++ incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki Wed Aug 29 23:28:04 2012
@@ -2059,6 +2059,29 @@ map-reduce and pig jobs =classpath= and
Additional JAR files and native libraries not present in the application 'lib/' directory can be specified in
map-reduce and pig actions with the 'file' element (refer to the map-reduce and pig documentation).
+For Map-Reduce jobs (not including streaming or pipes), additional jar files can also be included via an uber jar. An uber jar is a
+jar file that contains additional jar files within a "lib" folder. To let Oozie know about an uber jar, simply specify it with
+the =oozie.mapreduce.uber.jar= configuration property and Oozie will tell Hadoop MapReduce that it is an uber jar. The ability to
+specify an uber jar is governed by the =oozie.action.mapreduce.uber.jar.enable= property in =oozie-site.xml=. See
+[[AG_Install#UberJar][Oozie Install]] for more information.
+
+<verbatim>
+<action name="mr-node">
+ <map-reduce>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>oozie.mapreduce.uber.jar</name>
+ <value>${nameNode}/user/${wf:user()}/my-uber-jar.jar</value>
+ </property>
+ </configuration>
+ </map-reduce>
+ <ok to="end"/>
+ <error to="fail"/>
+</action>
+</verbatim>
+
The =config-default.xml= file defines, if any, default values for the workflow job parameters. This file must be in
the Hadoop Configuration XML format. EL expressions are not supported and =user.name= property cannot be specified in
this file.
Modified: incubator/oozie/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/pom.xml?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/pom.xml (original)
+++ incubator/oozie/trunk/pom.xml Wed Aug 29 23:28:04 2012
@@ -822,6 +822,11 @@
<!-- See 'testSqoop' profile in core/pom.xml and the Building doc-->
<exclude>**/TestSqoop*.java</exclude>
+
+ <!-- Explictly use -Dtest=TestMapReduceActionExecutorUberJar to test the uber jar functionality.
+ Requires at least Hadoop 1.2.0 or 2.2.0.
+ -->
+ <exclude>**/TestMapReduceActionExecutorUberJar.java</exclude>
</excludes>
<!-- DO NOT CHANGE THIS VALUES, TESTCASES CANNOT RUN IN PARALLEL -->
<parallel>classes</parallel>
Modified: incubator/oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Aug 29 23:28:04 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-654 Provide a way to use 'uber' jars with Oozie MR actions (rkanter via tucu)
OOZIE-979 bump up trunk version to 3.4.0-SNAPSHOT (tucu)
-- Oozie 3.3.0 release (unreleased)