You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by tu...@apache.org on 2012/08/30 01:28:05 UTC

svn commit: r1378769 - in /incubator/oozie/trunk: ./ core/src/main/java/org/apache/oozie/action/hadoop/ core/src/main/resources/ core/src/test/java/org/apache/oozie/action/hadoop/ docs/src/site/twiki/

Author: tucu
Date: Wed Aug 29 23:28:04 2012
New Revision: 1378769

URL: http://svn.apache.org/viewvc?rev=1378769&view=rev
Log:
OOZIE-654 Provide a way to use 'uber' jars with Oozie MR actions (rkanter via tucu)

Added:
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java
Modified:
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
    incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
    incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki
    incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
    incubator/oozie/trunk/pom.xml
    incubator/oozie/trunk/release-log.txt

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java Wed Aug 29 23:28:04 2012
@@ -22,6 +22,7 @@ import java.io.StringReader;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
@@ -30,6 +31,7 @@ import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
@@ -41,6 +43,8 @@ public class MapReduceActionExecutor ext
 
     public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
     public static final String HADOOP_COUNTERS = "hadoop.counters";
+    public static final String OOZIE_MAPREDUCE_UBER_JAR = "oozie.mapreduce.uber.jar";
+    public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
     private XLog log = XLog.getLog(getClass());
 
     public MapReduceActionExecutor() {
@@ -86,6 +90,7 @@ public class MapReduceActionExecutor ext
     @SuppressWarnings("unchecked")
     Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath)
             throws ActionExecutorException {
+        boolean regularMR = false;
         Namespace ns = actionXml.getNamespace();
         if (actionXml.getChild("streaming", ns) != null) {
             Element streamingXml = actionXml.getChild("streaming", ns);
@@ -115,8 +120,47 @@ public class MapReduceActionExecutor ext
                 String program = pipesXml.getChildTextTrim("program", ns);
                 PipesMain.setPipes(actionConf, map, reduce, inputFormat, partitioner, writer, program, appPath);
             }
+            else {
+                regularMR = true;
+            }
         }
         actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);
+
+        // For "regular" (not streaming or pipes) MR jobs
+        if (regularMR) {
+            // Resolve uber jar path (has to be done after super because oozie.mapreduce.uber.jar is under <configuration>)
+            String uberJar = actionConf.get(OOZIE_MAPREDUCE_UBER_JAR);
+            if (uberJar != null) {
+                if (!Services.get().getConf().getBoolean(OOZIE_MAPREDUCE_UBER_JAR_ENABLE, false)) {
+                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "MR003",
+                            "{0} property is not allowed.  Set {1} to true in oozie-site to enable.", OOZIE_MAPREDUCE_UBER_JAR,
+                            OOZIE_MAPREDUCE_UBER_JAR_ENABLE);
+                }
+                String nameNode = actionXml.getChildTextTrim("name-node", ns);
+                if (nameNode != null) {
+                    Path uberJarPath = new Path(uberJar);
+                    if (uberJarPath.toUri().getScheme() == null || uberJarPath.toUri().getAuthority() == null) {
+                        if (uberJarPath.isAbsolute()) {     // absolute path without namenode --> prepend namenode
+                            Path nameNodePath = new Path(nameNode);
+                            String nameNodeSchemeAuthority = nameNodePath.toUri().getScheme()
+                                    + "://" + nameNodePath.toUri().getAuthority();
+                            actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(nameNodeSchemeAuthority + uberJarPath).toString());
+                        }
+                        else {                              // relative path --> prepend app path
+                            actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, new Path(appPath, uberJarPath).toString());
+                        }
+                    }
+                }
+            }
+        }
+        else {
+            if (actionConf.get(OOZIE_MAPREDUCE_UBER_JAR) != null) {
+                log.warn("The " + OOZIE_MAPREDUCE_UBER_JAR + " property is only applicable for MapReduce (not streaming nor pipes)"
+                        + " workflows, ignoring");
+                actionConf.set(OOZIE_MAPREDUCE_UBER_JAR, "");
+            }
+        }
+
         return actionConf;
     }
 
@@ -236,4 +280,22 @@ public class MapReduceActionExecutor ext
         return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null;
     }
 
+    @Override
+    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
+            Configuration actionConf) throws ActionExecutorException {
+        // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher;
+        // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that
+        // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf
+        // argument and we can just look up the uber jar in the actionConf argument.
+        JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf);
+        Namespace ns = actionXml.getNamespace();
+        if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) {
+            // Set for uber jar
+            String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR);
+            if (uberJar != null && uberJar.trim().length() > 0) {
+                launcherJobConf.setJar(uberJar);
+            }
+        }
+        return launcherJobConf;
+    }
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java Wed Aug 29 23:28:04 2012
@@ -79,6 +79,12 @@ public class MapReduceMain extends Launc
         JobConf jobConf = new JobConf();
         addActionConf(jobConf, actionConf);
 
+        // Set for uber jar
+        String uberJar = actionConf.get(MapReduceActionExecutor.OOZIE_MAPREDUCE_UBER_JAR);
+        if (uberJar != null && uberJar.trim().length() > 0) {
+            jobConf.setJar(uberJar);
+        }
+
         // propagate delegation related props from launcher job to MR job
         if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
             jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));

Modified: incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ incubator/oozie/trunk/core/src/main/resources/oozie-default.xml Wed Aug 29 23:28:04 2012
@@ -25,6 +25,16 @@
     <!-- ************************** VERY IMPORTANT  ************************** -->
 
     <property>
+        <name>oozie.action.mapreduce.uber.jar.enable</name>
+        <value>false</value>
+        <description>
+            If true, enables the oozie.mapreduce.uber.jar mapreduce workflow configuration property, which is used to specify an
+            uber jar in HDFS.  Submitting a workflow with an uber jar requires at least Hadoop 2.2.0 or 1.2.0.  If false, workflows
+            which specify the oozie.mapreduce.uber.jar configuration property will fail.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.processing.timezone</name>
         <value>UTC</value>
         <description>

Added: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java?rev=1378769&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java (added)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/MapperReducerUberJarForTest.java Wed Aug 29 23:28:04 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Iterator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+public class MapperReducerUberJarForTest implements Mapper, Reducer {
+    public static final String GROUP = "g";
+    public static final String NAME = "c";
+
+    public static void main(String[] args) {
+        System.out.println("hello!");
+    }
+
+    public void configure(JobConf jobConf) {
+    }
+
+    public void close() throws IOException {
+    }
+
+    private static final LongWritable zero = new LongWritable(0);
+
+    @SuppressWarnings("unchecked")
+    public void map(Object key, Object value, OutputCollector collector, Reporter reporter) throws IOException {
+        ClassLoader applicationClassLoader = this.getClass().getClassLoader();
+        if (applicationClassLoader == null) {
+            applicationClassLoader = ClassLoader.getSystemClassLoader();
+        }
+        URL[] urls = ((URLClassLoader) applicationClassLoader).getURLs();
+        for (URL url : urls) {
+            collector.collect(zero, new Text(url.toString()));
+        }
+        reporter.incrCounter(GROUP, NAME, 5l);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void reduce(Object key, Iterator values, OutputCollector collector, Reporter reporter)
+            throws IOException {
+        while (values.hasNext()) {
+            collector.collect(values.next(), NullWritable.get());
+        }
+    }
+}

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java Wed Aug 29 23:28:04 2012
@@ -41,11 +41,18 @@ import java.io.File;
 import java.io.OutputStream;
 import java.io.InputStream;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.Writer;
 import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Scanner;
+import java.util.jar.JarOutputStream;
+import java.util.regex.Pattern;
+import java.util.zip.ZipEntry;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.oozie.action.ActionExecutorException;
 
 public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
 
@@ -62,6 +69,13 @@ public class TestMapReduceActionExecutor
         assertTrue(new File(jar.toString()).exists());
     }
 
+    public Element createUberJarActionXML(String uberJarPath, String additional) throws Exception{
+        return XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+                + "<name-node>" + getNameNodeUri() + "</name-node>" + additional + "<configuration>"
+                + "<property><name>oozie.mapreduce.uber.jar</name><value>" + uberJarPath + "</value></property>"
+                + "</configuration>" + "</map-reduce>");
+    }
+
     public void testSetupMethods() throws Exception {
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
 
@@ -104,6 +118,69 @@ public class TestMapReduceActionExecutor
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
         assertEquals("IN", conf.get("mapred.input.dir"));
 
+        // Enable uber jars to test that MapReduceActionExecutor picks up the oozie.mapreduce.uber.jar property correctly
+        Services serv = Services.get();
+        boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+        serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
+
+        actionXml = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", "");
+        conf = ae.createBaseHadoopConf(context, actionXml);
+        ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+        assertEquals(getNameNodeUri() + "/app/job.jar", conf.get("oozie.mapreduce.uber.jar"));  // absolute path with namenode
+        JobConf launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+        assertEquals(getNameNodeUri() + "/app/job.jar", launcherJobConf.getJar());              // same for launcher conf
+
+        actionXml = createUberJarActionXML("/app/job.jar", "");
+        conf = ae.createBaseHadoopConf(context, actionXml);
+        ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+        assertEquals(getNameNodeUri() + "/app/job.jar", conf.get("oozie.mapreduce.uber.jar"));  // absolute path without namenode
+        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+        assertEquals(getNameNodeUri() + "/app/job.jar", launcherJobConf.getJar());              // same for launcher conf
+
+        actionXml = createUberJarActionXML("job.jar", "");
+        conf = ae.createBaseHadoopConf(context, actionXml);
+        ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+        assertEquals(getFsTestCaseDir() + "/job.jar", conf.get("oozie.mapreduce.uber.jar"));    // relative path
+        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+        assertEquals(getFsTestCaseDir() + "/job.jar", launcherJobConf.getJar());                // same for launcher
+
+        actionXml = createUberJarActionXML("job.jar", "<streaming></streaming>");
+        conf = ae.createBaseHadoopConf(context, actionXml);
+        ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+        assertEquals("", conf.get("oozie.mapreduce.uber.jar"));                                 // ignored for streaming
+        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+        assertNull(launcherJobConf.getJar());                                                   // same for launcher conf (not set)
+
+        actionXml = createUberJarActionXML("job.jar", "<pipes></pipes>");
+        conf = ae.createBaseHadoopConf(context, actionXml);
+        ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+        assertEquals("", conf.get("oozie.mapreduce.uber.jar"));                                 // ignored for pipes
+        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+        assertNull(launcherJobConf.getJar());                                                   // same for launcher conf (not set)
+
+        actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
+                + "<name-node>" + getNameNodeUri() + "</name-node>" + "</map-reduce>");
+        conf = ae.createBaseHadoopConf(context, actionXml);
+        ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+        assertNull(conf.get("oozie.mapreduce.uber.jar"));                                       // doesn't resolve if not set
+        launcherJobConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, conf);
+        assertNull(launcherJobConf.getJar());                                                   // same for launcher conf
+
+        // Disable uber jars to test that MapReduceActionExecutor won't allow the oozie.mapreduce.uber.jar property
+        serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+        try {
+            actionXml = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", "");
+            conf = ae.createBaseHadoopConf(context, actionXml);
+            ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+            assertEquals(getNameNodeUri() + "/app/job.jar", conf.get("oozie.mapreduce.uber.jar"));
+        } catch (ActionExecutorException aee) {
+            assertEquals("MR003", aee.getErrorCode());
+            assertEquals(ActionExecutorException.ErrorType.ERROR, aee.getErrorType());
+            assertTrue(aee.getMessage().contains("oozie.action.mapreduce.uber.jar.enable"));
+            assertTrue(aee.getMessage().contains("oozie.mapreduce.uber.jar"));
+        }
+        serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
+
         actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
                 + "<name-node>" + getNameNodeUri() + "</name-node>" + "<streaming>" + "<mapper>M</mapper>"
                 + "<reducer>R</reducer>" + "<record-reader>RR</record-reader>"
@@ -221,7 +298,7 @@ public class TestMapReduceActionExecutor
         return runningJob;
     }
 
-    private void _testSubmit(String name, String actionXml) throws Exception {
+    private String _testSubmit(String name, String actionXml) throws Exception {
 
         Context context = createContext(name, actionXml);
         final RunningJob launcherJob = submitAction(context);
@@ -266,6 +343,8 @@ public class TestMapReduceActionExecutor
 
         //External Child IDs will always be null in case of MR action.
         assertNull(context.getExternalChildIDs());
+
+        return mrJob.getID().toString();
     }
 
     private void _testSubmitWithCredentials(String name, String actionXml) throws Exception {
@@ -327,6 +406,16 @@ public class TestMapReduceActionExecutor
         return conf;
     }
 
+    protected XConfiguration getMapReduceUberJarConfig(String inputDir, String outputDir) throws Exception{
+        XConfiguration conf = new XConfiguration();
+        conf.set("mapred.mapper.class", MapperReducerUberJarForTest.class.getName());
+        conf.set("mapred.reducer.class", MapperReducerUberJarForTest.class.getName());
+        conf.set("mapred.input.dir", inputDir);
+        conf.set("mapred.output.dir", outputDir);
+        conf.set("oozie.mapreduce.uber.jar", createAndUploadUberJar().toUri().toString());
+        return conf;
+    }
+
     public void testMapReduce() throws Exception {
         FileSystem fs = getFileSystem();
 
@@ -362,6 +451,121 @@ public class TestMapReduceActionExecutor
         _testSubmitWithCredentials("map-reduce", actionXml);
     }
 
+    protected Path createAndUploadUberJar() throws Exception {
+        Path localJobJarPath = makeUberJarWithLib(getTestCaseDir());
+        Path remoteJobJarPath = new Path(getAppPath(), localJobJarPath.getName());
+        getFileSystem().moveFromLocalFile(localJobJarPath, remoteJobJarPath);
+        File localJobJarFile = new File(localJobJarPath.toUri().toString());
+        if (localJobJarFile.exists()) {     // just to make sure
+            localJobJarFile.delete();
+        }
+        return remoteJobJarPath;
+    }
+
+    private Path makeUberJarWithLib(String testDir) throws Exception {
+        Path jobJarPath = new Path(testDir, "uber.jar");
+        FileOutputStream fos = new FileOutputStream(new File(jobJarPath.toUri().getPath()));
+        JarOutputStream jos = new JarOutputStream(fos);
+        // Have to put in real jar files or it will complain
+        createAndAddJarToJar(jos, new File(new Path(testDir, "lib1.jar").toUri().getPath()));
+        createAndAddJarToJar(jos, new File(new Path(testDir, "lib2.jar").toUri().getPath()));
+        jos.close();
+        return jobJarPath;
+    }
+
+    private void createAndAddJarToJar(JarOutputStream jos, File jarFile) throws Exception {
+        FileOutputStream fos2 = new FileOutputStream(jarFile);
+        JarOutputStream jos2 = new JarOutputStream(fos2);
+        // Have to have at least one entry or it will complain
+        ZipEntry ze = new ZipEntry(jarFile.getName() + ".inside");
+        jos2.putNextEntry(ze);
+        jos2.closeEntry();
+        jos2.close();
+        ze = new ZipEntry("lib/" + jarFile.getName());
+        jos.putNextEntry(ze);
+        FileInputStream in = new FileInputStream(jarFile);
+        byte buf[] = new byte[1024];
+        int numRead;
+        do {
+            numRead = in.read(buf);
+            if (numRead >= 0) {
+                jos.write(buf, 0, numRead);
+            }
+        } while (numRead != -1);
+        in.close();
+        jos.closeEntry();
+        jarFile.delete();
+    }
+
+    public void _testMapReduceWithUberJar() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>"
+                + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";
+        String jobID = _testSubmit("map-reduce", actionXml);
+
+        boolean containsLib1Jar = false;
+        String lib1JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar";
+        Pattern lib1JarPatYarn = Pattern.compile(
+                ".*appcache/application_" + jobID.replaceFirst("job_", "") + "/filecache/.*/uber.jar/lib/lib1.jar");
+        boolean containsLib2Jar = false;
+        String lib2JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar";
+        Pattern lib2JarPatYarn = Pattern.compile(
+                ".*appcache/application_" + jobID.replaceFirst("job_", "") + "/filecache/.*/uber.jar/lib/lib2.jar");
+
+        FileStatus[] fstats = getFileSystem().listStatus(outputDir);
+        for (FileStatus fstat : fstats) {
+            Path p = fstat.getPath();
+            if (getFileSystem().isFile(p) && p.getName().startsWith("part-")) {
+                InputStream is = getFileSystem().open(p);
+                Scanner sc = new Scanner(is);
+                while (sc.hasNextLine()) {
+                    String line = sc.nextLine();
+                    containsLib1Jar = (containsLib1Jar || line.endsWith(lib1JarStr) || lib1JarPatYarn.matcher(line).matches());
+                    containsLib2Jar = (containsLib2Jar || line.endsWith(lib2JarStr) || lib2JarPatYarn.matcher(line).matches());
+                }
+                sc.close();
+                is.close();
+            }
+        }
+
+        assertTrue("lib/lib1.jar should have been unzipped from the uber jar and added to the classpath but was not",
+                containsLib1Jar);
+        assertTrue("lib/lib2.jar should have been unzipped from the uber jar and added to the classpath but was not",
+                containsLib2Jar);
+    }
+
+    // With the oozie.action.mapreduce.uber.jar.enable property set to false, a workflow with an uber jar should fail
+    // (this happens before we get to Hadoop, so not having the correct version of Hadoop doesn't matter here; the
+    // TestMapReduceActionExecutorUberJar.testMapReduceWithUberJarEnabled() test actually tests the uber jar functionality, but
+    // this test is excluded by default)
+    public void testMapReduceWithUberJarDisabled() throws Exception {
+        Services serv = Services.get();
+        boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+        try {
+            serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+            _testMapReduceWithUberJar();
+        } catch (ActionExecutorException aee) {
+            assertEquals("MR003", aee.getErrorCode());
+            assertEquals(ActionExecutorException.ErrorType.ERROR, aee.getErrorType());
+            assertTrue(aee.getMessage().contains("oozie.action.mapreduce.uber.jar.enable"));
+            assertTrue(aee.getMessage().contains("oozie.mapreduce.uber.jar"));
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
+        }
+    }
+
     protected XConfiguration getStreamingConfig(String inputDir, String outputDir) {
         XConfiguration conf = new XConfiguration();
         conf.set("mapred.input.dir", inputDir);

Added: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java?rev=1378769&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java (added)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorUberJar.java Wed Aug 29 23:28:04 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.oozie.service.Services;
+
+public class TestMapReduceActionExecutorUberJar extends TestMapReduceActionExecutor {
+
+    // This test requires at least Hadoop 2.2.x or 1.2.x or it will fail so it is excluded from the default tests in the pom.xml
+    public void testMapReduceWithUberJarEnabled() throws Exception {
+        Services serv = Services.get();
+        boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
+        try {
+            serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
+            _testMapReduceWithUberJar();
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", originalUberJarDisabled);
+        }
+    }
+
+    @Override
+    public void testDefaultShareLibName() {
+        // skip test
+    }
+
+    @Override
+    public void testMapReduce() throws Exception {
+        // skip test
+    }
+
+    @Override
+    public void testLauncherJar() throws Exception {
+        // skip test
+    }
+
+    @Override
+    public void testMapReduceWithCredentials() throws Exception {
+        // skip test
+    }
+
+    @Override
+    public void testMapReduceWithUberJarDisabled() throws Exception {
+        // skip test
+    }
+
+    @Override
+    public void testSetExecutionStats_when_user_has_specified_stats_write_FALSE() throws Exception {
+        // skip test
+    }
+
+    @Override
+    public void testSetExecutionStats_when_user_has_specified_stats_write_TRUE() throws Exception {
+        // skip test
+    }
+
+    @Override
+    public void testSetupMethods() throws Exception {
+        // skip test
+    }
+
+    @Override
+    public void testStreaming() throws Exception {
+        // skip test
+    }
+
+    @Override
+    public void testPipes() throws Exception {
+        // skip test
+    }
+}
\ No newline at end of file

Modified: incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki (original)
+++ incubator/oozie/trunk/docs/src/site/twiki/AG_Install.twiki Wed Aug 29 23:28:04 2012
@@ -425,6 +425,24 @@ be expressed in the corresponding timezo
 For more details on using an alternate Oozie processing timezone, please reffer to the
 [[CoordinatorFunctionalSpec#datetime][Coordinator Fuctional Specification, section '4. Datetime']]
 
+#UberJar
+---++ MapReduce Workflow Uber Jars
+For Map-Reduce jobs (not including streaming or pipes), additional jar files can also be included via an uber jar. An uber jar is a
+jar file that contains additional jar files within a "lib" folder (see
+[[WorkflowFunctionalSpec#AppDeployment][Workflow Functional Specification]] for more information). Submitting a workflow with an uber jar
+requires at least Hadoop 2.2.0 or 1.2.0. As such, using uber jars in a workflow is disabled by default. To enable this feature, use
+the =oozie.action.mapreduce.uber.jar.enable= property in the =oozie-site.xml= (and make sure to use a supported version of Hadoop).
+
+<verbatim>
+<configuration>
+    <property>
+        <name>oozie.action.mapreduce.uber.jar.enable</name>
+        <value>true</value>
+    </property>
+</configuration>
+</verbatim>
+
+
 ---++ Advanced/Custom Environment Settings
 
 Oozie can be configured to use Unix standard filesystem hierarchy for its different files

Modified: incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki (original)
+++ incubator/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki Wed Aug 29 23:28:04 2012
@@ -2059,6 +2059,29 @@ map-reduce and pig jobs =classpath= and 
 Additional JAR files and native libraries not present in the application 'lib/' directory can be specified in
 map-reduce and pig actions with the 'file' element (refer to the map-reduce and pig documentation).
 
+For Map-Reduce jobs (not including streaming or pipes), additional jar files can also be included via an uber jar. An uber jar is a
+jar file that contains additional jar files within a "lib" folder. To let Oozie know about an uber jar, simply specify it with
+the =oozie.mapreduce.uber.jar= configuration property and Oozie will tell Hadoop MapReduce that it is an uber jar. The ability to
+specify an uber jar is governed by the =oozie.action.mapreduce.uber.jar.enable= property in =oozie-site.xml=. See
+[[AG_Install#UberJar][Oozie Install]] for more information.
+
+<verbatim>
+<action name="mr-node">
+    <map-reduce>
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>oozie.mapreduce.uber.jar</name>
+                <value>${nameNode}/user/${wf:user()}/my-uber-jar.jar</value>
+            </property>
+        </configuration>
+    </map-reduce>
+    <ok to="end"/>
+    <error to="fail"/>
+</action>
+</verbatim>
+
 The =config-default.xml= file defines, if any, default values for the workflow job parameters. This file must be in
 the Hadoop Configuration XML format. EL expressions are not supported and =user.name= property cannot be specified in
 this file.

Modified: incubator/oozie/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/pom.xml?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/pom.xml (original)
+++ incubator/oozie/trunk/pom.xml Wed Aug 29 23:28:04 2012
@@ -822,6 +822,11 @@
 
                         <!-- See 'testSqoop' profile in core/pom.xml and the Building doc-->
                         <exclude>**/TestSqoop*.java</exclude>
+
+                        <!-- Explictly use -Dtest=TestMapReduceActionExecutorUberJar to test the uber jar functionality.
+                             Requires at least Hadoop 1.2.0 or 2.2.0.
+                        -->
+                        <exclude>**/TestMapReduceActionExecutorUberJar.java</exclude>
                     </excludes>
                     <!-- DO NOT CHANGE THIS VALUES, TESTCASES CANNOT RUN IN PARALLEL -->
                     <parallel>classes</parallel>

Modified: incubator/oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1378769&r1=1378768&r2=1378769&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Aug 29 23:28:04 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-654 Provide a way to use 'uber' jars with Oozie MR actions (rkanter via tucu)
 OOZIE-979 bump up trunk version to 3.4.0-SNAPSHOT (tucu)
 
 -- Oozie 3.3.0 release (unreleased)