You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2012/02/16 21:36:02 UTC

svn commit: r1245165 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/ src/org/apache/pig/impl/io/ src/org/apache/pig/impl/util/ test/org/apache/pig/test/

Author: dvryaboy
Date: Thu Feb 16 20:36:02 2012
New Revision: 1245165

URL: http://svn.apache.org/viewvc?rev=1245165&view=rev
Log:
PIG-2010: registered jars on distributed cache

Added:
    pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    pig/trunk/src/org/apache/pig/impl/util/JarManager.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Feb 16 20:36:02 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2010: registered jars on distributed cache (traviscrawford and julienledem via dvryaboy)
+
 PIG-2533: Pig MR job exceptions masked on frontend (traviscrawford via dvryaboy)
 
 PIG-2525: Support pluggable PigProcessNotifcationListeners on the command line (dvryaboy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Feb 16 20:36:02 2012
@@ -20,8 +20,10 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,6 +39,7 @@ import org.apache.hadoop.filecache.Distr
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
@@ -55,9 +58,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
@@ -404,6 +407,13 @@ public class JobControlCompiler{
 
             if (!pigContext.inIllustrator && pigContext.getExecType() != ExecType.LOCAL) 
             {
+
+                // Setup the DistributedCache for this job
+                for (URL extraJar : pigContext.extraJars) {
+                    log.debug("Adding jar to DistributedCache: " + extraJar.toString());
+                    putJarOnClassPathThroughDistributedCache(pigContext, conf, extraJar);
+                }
+
                 //Create the jar of all functions and classes required
                 File submitJarFile = File.createTempFile("Job", ".jar");
                 log.info("creating jar file "+submitJarFile.getName());
@@ -448,7 +458,6 @@ public class JobControlCompiler{
                 }
             }
 
-            // Setup the DistributedCache for this job
             setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(), 
                                   "pig.streaming.ship.files", true);
             setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(), 
@@ -1158,28 +1167,20 @@ public class JobControlCompiler{
             setupDistributedCache(pigContext, conf, paths, shipToCluster);
         }
     }
-        
+
     private static void setupDistributedCache(PigContext pigContext,
             Configuration conf, String[] paths, boolean shipToCluster) throws IOException {
         // Turn on the symlink feature
         DistributedCache.createSymlink(conf);
-            
+
         for (String path : paths) {
             path = path.trim();
             if (path.length() != 0) {
                 Path src = new Path(path);
-                
+
                 // Ensure that 'src' is a valid URI
-                URI srcURI = null;
-                try {
-                    srcURI = new URI(src.toString());
-                } catch (URISyntaxException ue) {
-                    int errCode = 6003;
-                    String msg = "Invalid cache specification. " +
-                    "File doesn't exist: " + src;
-                    throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
-                }
-                
+                URI srcURI = toURI(src);
+
                 // Ship it to the cluster if necessary and add to the
                 // DistributedCache
                 if (shipToCluster) {
@@ -1187,7 +1188,7 @@ public class JobControlCompiler{
                         new Path(FileLocalizer.getTemporaryPath(pigContext).toString());
                     FileSystem fs = dst.getFileSystem(conf);
                     fs.copyFromLocalFile(src, dst);
-                    
+
                     // Construct the dst#srcName uri for DistributedCache
                     URI dstURI = null;
                     try {
@@ -1244,74 +1245,146 @@ public class JobControlCompiler{
         return symlink;
     }
     
+
+    /**
+     * Ensure that 'src' is a valid URI
+     * @param src the source Path
+     * @return a URI for this path
+     * @throws ExecException
+     */
+    private static URI toURI(Path src) throws ExecException {
+        try {
+            return new URI(src.toString());
+        } catch (URISyntaxException ue) {
+            int errCode = 6003;
+            String msg = "Invalid cache specification. " +
+                    "File doesn't exist: " + src;
+            throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
+        }
+    }
+
+    /**
+     * if url is not in HDFS will copy the path to HDFS from local before adding to distributed cache
+     * @param pigContext the pigContext
+     * @param conf the job conf
+     * @param url the url to be added to distributed cache
+     * @return the path as seen on distributed cache
+     * @throws IOException
+     */
+    private static void putJarOnClassPathThroughDistributedCache(
+            PigContext pigContext,
+            Configuration conf,
+            URL url) throws IOException {
+
+        // Turn on the symlink feature
+        DistributedCache.createSymlink(conf);
+
+        // REGISTER always copies locally the jar file. see PigServer.registerJar()
+        Path pathInHDFS = shipToHDFS(pigContext, conf, url);
+        // and add to the DistributedCache
+        DistributedCache.addFileToClassPath(pathInHDFS, conf);
+        pigContext.skipJars.add(url.getPath());
+    }
+
+    /**
+     * copy the file to hdfs in a temporary path
+     * @param pigContext the pig context
+     * @param conf the job conf
+     * @param url the url to ship to hdfs
+     * @return the location where it was shipped
+     * @throws IOException
+     */
+    private static Path shipToHDFS(
+            PigContext pigContext,
+            Configuration conf,
+            URL url) throws IOException {
+
+        String path = url.getPath();
+        int slash = path.lastIndexOf("/");
+        String suffix = slash == -1 ? path : path.substring(slash+1);
+
+        Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);
+        FileSystem fs = dst.getFileSystem(conf);
+        OutputStream os = fs.create(dst);
+        try {
+            IOUtils.copyBytes(url.openStream(), os, 4096, true);
+        } finally {
+            // IOUtils can not close both the input and the output properly in a finally
+            // as we can get an exception in between opening the stream and calling the method
+            os.close();
+        }
+        return dst;
+    }
+
+
     private static class JoinDistributedCacheVisitor extends PhyPlanVisitor {
-                 
+
         private PigContext pigContext = null;
-                
-         private Configuration conf = null;
-         
-         public JoinDistributedCacheVisitor(PhysicalPlan plan, 
-                 PigContext pigContext, Configuration conf) {
-             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
-                     plan));
-             this.pigContext = pigContext;
-             this.conf = conf;
-         }
-         
-         @Override
+
+        private Configuration conf = null;
+
+        public JoinDistributedCacheVisitor(PhysicalPlan plan, 
+                PigContext pigContext, Configuration conf) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.pigContext = pigContext;
+            this.conf = conf;
+        }
+
+        @Override
         public void visitFRJoin(POFRJoin join) throws VisitorException {
-             
-             // XXX Hadoop currently doesn't support distributed cache in local mode.
-             // This line will be removed after the support is added
-             if (pigContext.getExecType() == ExecType.LOCAL) return;
-             
-             // set up distributed cache for the replicated files
-             FileSpec[] replFiles = join.getReplFiles();
-             ArrayList<String> replicatedPath = new ArrayList<String>();
-             
-             FileSpec[] newReplFiles = new FileSpec[replFiles.length];
-             
-             // the first input is not replicated
-             for (int i = 0; i < replFiles.length; i++) {
-                 // ignore fragmented file
-                 String symlink = "";
-                 if (i != join.getFragment()) {
-                     symlink = "pigrepl_" + join.getOperatorKey().toString() + "_"
-                         + Integer.toString(System.identityHashCode(replFiles[i].getFileName()))
-                         + "_" + Long.toString(System.currentTimeMillis()) 
-                         + "_" + i;
-                     replicatedPath.add(replFiles[i].getFileName() + "#"
-                             + symlink);
-                 }
-                 newReplFiles[i] = new FileSpec(symlink, 
-                         (replFiles[i] == null ? null : replFiles[i].getFuncSpec()));               
-             }
-             
-             join.setReplFiles(newReplFiles);
-             
-             try {
-                 setupDistributedCache(pigContext, conf, replicatedPath
-                         .toArray(new String[0]), false);
-             } catch (IOException e) {
-                 String msg = "Internal error. Distributed cache could not " +
-                               "be set up for the replicated files";
-                 throw new VisitorException(msg, e);
-             }
-         }
-         
-         @Override
-         public void visitMergeJoin(POMergeJoin join) throws VisitorException {
-             
-        	 // XXX Hadoop currently doesn't support distributed cache in local mode.
-             // This line will be removed after the support is added
-             if (pigContext.getExecType() == ExecType.LOCAL) return;
-             
-             String indexFile = join.getIndexFile();
-             
-             // merge join may not use an index file
-             if (indexFile == null) return;
-             
-             try {
+
+            // XXX Hadoop currently doesn't support distributed cache in local mode.
+            // This line will be removed after the support is added
+            if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+            // set up distributed cache for the replicated files
+            FileSpec[] replFiles = join.getReplFiles();
+            ArrayList<String> replicatedPath = new ArrayList<String>();
+
+            FileSpec[] newReplFiles = new FileSpec[replFiles.length];
+
+            // the first input is not replicated
+            for (int i = 0; i < replFiles.length; i++) {
+                // ignore fragmented file
+                String symlink = "";
+                if (i != join.getFragment()) {
+                    symlink = "pigrepl_" + join.getOperatorKey().toString() + "_"
+                            + Integer.toString(System.identityHashCode(replFiles[i].getFileName()))
+                            + "_" + Long.toString(System.currentTimeMillis()) 
+                            + "_" + i;
+                    replicatedPath.add(replFiles[i].getFileName() + "#"
+                            + symlink);
+                }
+                newReplFiles[i] = new FileSpec(symlink, 
+                        (replFiles[i] == null ? null : replFiles[i].getFuncSpec()));               
+            }
+
+            join.setReplFiles(newReplFiles);
+
+            try {
+                setupDistributedCache(pigContext, conf, replicatedPath
+                        .toArray(new String[0]), false);
+            } catch (IOException e) {
+                String msg = "Internal error. Distributed cache could not " +
+                        "be set up for the replicated files";
+                throw new VisitorException(msg, e);
+            }
+        }
+
+        @Override
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+
+            // XXX Hadoop currently doesn't support distributed cache in local mode.
+            // This line will be removed after the support is added
+            if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+            String indexFile = join.getIndexFile();
+
+            // merge join may not use an index file
+            if (indexFile == null) return;
+
+            try {
                 String symlink = addSingleFileToDistributedCache(pigContext,
                         conf, indexFile, "indexfile_");
                 join.setIndexFile(symlink);
@@ -1320,21 +1393,21 @@ public class JobControlCompiler{
                         "be set up for merge join index file";
                 throw new VisitorException(msg, e);
             }
-         }
-         
-         @Override
+        }
+
+        @Override
         public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
                 throws VisitorException {
-          
-             // XXX Hadoop currently doesn't support distributed cache in local mode.
-             // This line will be removed after the support is added
-             if (pigContext.getExecType() == ExecType.LOCAL) return;
-             
-             String indexFile = mergeCoGrp.getIndexFileName();
-             
-             if (indexFile == null) throw new VisitorException("No index file");
-             
-             try {
+
+            // XXX Hadoop currently doesn't support distributed cache in local mode.
+            // This line will be removed after the support is added
+            if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+            String indexFile = mergeCoGrp.getIndexFileName();
+
+            if (indexFile == null) throw new VisitorException("No index file");
+
+            try {
                 String symlink = addSingleFileToDistributedCache(pigContext,
                         conf, indexFile, "indexfile_mergecogrp_");
                 mergeCoGrp.setIndexFileName(symlink);
@@ -1344,40 +1417,40 @@ public class JobControlCompiler{
                 throw new VisitorException(msg, e);
             }
         }
-     }
+    }
 
-     private static class UdfDistributedCacheVisitor extends PhyPlanVisitor {
-                 
-         private PigContext pigContext = null;
-         private Configuration conf = null;
-         
-         public UdfDistributedCacheVisitor(PhysicalPlan plan, 
-                                           PigContext pigContext,
-                                           Configuration conf) {
-             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
-                     plan));
-             this.pigContext = pigContext;
-             this.conf = conf;
-         }
-         
-         @Override
-         public void visitUserFunc(POUserFunc func) throws VisitorException {
-             
-             // XXX Hadoop currently doesn't support distributed cache in local mode.
-             // This line will be removed after the support is added
-             if (pigContext.getExecType() == ExecType.LOCAL) return;
-             
-             // set up distributed cache for files indicated by the UDF
-             String[] files = func.getCacheFiles();
-             if (files == null) return;
-
-             try {
-                 setupDistributedCache(pigContext, conf, files, false);
-             } catch (IOException e) {
+    private static class UdfDistributedCacheVisitor extends PhyPlanVisitor {
+
+        private PigContext pigContext = null;
+        private Configuration conf = null;
+
+        public UdfDistributedCacheVisitor(PhysicalPlan plan, 
+                PigContext pigContext,
+                Configuration conf) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.pigContext = pigContext;
+            this.conf = conf;
+        }
+
+        @Override
+        public void visitUserFunc(POUserFunc func) throws VisitorException {
+
+            // XXX Hadoop currently doesn't support distributed cache in local mode.
+            // This line will be removed after the support is added
+            if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+            // set up distributed cache for files indicated by the UDF
+            String[] files = func.getCacheFiles();
+            if (files == null) return;
+
+            try {
+                setupDistributedCache(pigContext, conf, files, false);
+            } catch (IOException e) {
                 String msg = "Internal error. Distributed cache could not " +
-                            "be set up for the requested files";
+                        "be set up for the requested files";
                 throw new VisitorException(msg, e);
-             }
-         }
-     }   
+            }
+        }
+    }   
 }

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Thu Feb 16 20:36:02 2012
@@ -58,7 +58,7 @@ import org.apache.pig.impl.util.JarManag
 public class PigContext implements Serializable {
     private static final long serialVersionUID = 1L;
     
-    private transient final Log log = LogFactory.getLog(getClass());
+    private static final Log log = LogFactory.getLog(PigContext.class);
     
     public static final String JOB_NAME = "jobName";
     public static final String JOB_NAME_PREFIX= "PigLatin";

Modified: pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu Feb 16 20:36:02 2012
@@ -515,17 +515,21 @@ public class FileLocalizer {
     }
 
     public static Path getTemporaryPath(PigContext pigContext) throws IOException {
-        ElementDescriptor relative = relativeRoot(pigContext);
-    
-        if (!relativeRoot(pigContext).exists()) {
-            relativeRoot(pigContext).create();
-        }
-        ElementDescriptor elem= 
-            pigContext.getDfs().asElement(relative.toString(), "tmp" + r.nextInt());
-        toDelete().push(elem);
-        return ((HPath)elem).getPath();
+        return getTemporaryPath(pigContext, "");
     }
-    
+
+    public static Path getTemporaryPath(PigContext pigContext, String suffix) throws IOException {
+      ElementDescriptor relative = relativeRoot(pigContext);
+
+      if (!relativeRoot(pigContext).exists()) {
+          relativeRoot(pigContext).create();
+      }
+      ElementDescriptor elem=
+          pigContext.getDfs().asElement(relative.toString(), "tmp" + r.nextInt() + suffix);
+      toDelete().push(elem);
+      return ((HPath)elem).getPath();
+  }
+
     public static String hadoopify(String filename, PigContext pigContext) throws IOException {
         if (filename.startsWith(LOCAL_PREFIX)) {
             filename = filename.substring(LOCAL_PREFIX.length());

Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1245165&r1=1245164&r2=1245165&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Thu Feb 16 20:36:02 2012
@@ -121,17 +121,16 @@ public class JarManager {
         for (String scriptJar: pigContext.scriptJars) {
             mergeJar(jarFile, scriptJar, null, contents);
         }
-        for (URL extraJar: pigContext.extraJars) {
-            // log.error("Adding extra " + pigContext.extraJars.get(i));
-            mergeJar(jarFile, extraJar, null, contents);
-        }
         for (String path: pigContext.scriptFiles) {
+            log.debug("Adding entry " + path + " to job jar" );
         	addStream(jarFile, path, new FileInputStream(new File(path)),contents);
         }
         for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
+            log.debug("Adding entry " + entry.getKey() + " to job jar" );
         	addStream(jarFile, entry.getKey(), new FileInputStream(entry.getValue()),contents);
         }
-        
+
+        log.debug("Adding entry pigContext to job jar" );
         jarFile.putNextEntry(new ZipEntry("pigContext"));
         new ObjectOutputStream(jarFile).writeObject(pigContext);
         jarFile.close();
@@ -177,7 +176,7 @@ public class JarManager {
     private static void mergeJar(JarOutputStream jarFile, String jar, String prefix, Map<String, String> contents)
             throws FileNotFoundException, IOException {
         JarInputStream jarInput = new JarInputStream(new FileInputStream(jar));
-        
+        log.debug("Adding jar " + jar + (prefix != null ? " for prefix "+prefix : "" ) + " to job jar" );
         mergeJar(jarFile, jarInput, prefix, contents);
     }
     

Added: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1245165&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Thu Feb 16 20:36:02 2012
@@ -0,0 +1,191 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import javax.tools.JavaCompiler;
+import javax.tools.JavaCompiler.CompilationTask;
+import javax.tools.JavaFileObject;
+import javax.tools.StandardJavaFileManager;
+import javax.tools.ToolProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJobControlCompiler {
+
+  /**
+   * specifically tests that REGISTERED jars get added to distributed cache instead of merged into 
+   * the job jar
+   * @throws Exception
+   */
+  @Test
+  public void testJarAddedToDistributedCache() throws Exception {
+
+    // creating a jar with a UDF *not* in the current classloader
+    File tmpFile = File.createTempFile("Some_", ".jar");
+    tmpFile.deleteOnExit();
+    String className = createTestJar(tmpFile);
+    final String testUDFFileName = className+".class";
+
+    // creating a hadoop-site.xml and making it visible to Pig
+    // making sure it is at the same location as for other tests to not pick up a 
+    // conf from a previous test
+    File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/");
+    conf_dir.mkdirs();
+    File hadoopSite = new File(conf_dir, "hadoop-site.xml");
+    hadoopSite.deleteOnExit();
+    FileWriter fw = new FileWriter(hadoopSite);
+    try {
+      fw.write("<?xml version=\"1.0\"?>\n");
+      fw.write("<?xml-stylesheet type=\"text/xsl\" href=\"nutch-conf.xsl\"?>\n");
+      fw.write("<configuration>\n");
+      fw.write("</configuration>\n");
+    } finally {
+      fw.close();
+    }
+    // making hadoop-site.xml visible to Pig as it REQUIRES!!! one when running in mapred mode
+    Thread.currentThread().setContextClassLoader(
+        new URLClassLoader(new URL[] {conf_dir.toURI().toURL()}));
+
+    // JobControlCompiler setup
+    PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
+    pigContext.connect();
+    pigContext.addJar(tmpFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    JobControlCompiler jobControlCompiler = new JobControlCompiler(pigContext, conf);
+    MROperPlan plan = new MROperPlan();
+    MapReduceOper mro = new MapReduceOper(new OperatorKey());
+    mro.UDFs = new HashSet<String>();
+    mro.UDFs.add(className+"()");
+    plan.add(mro);
+
+    // compiling the job
+    JobControl jobControl = jobControlCompiler.compile(plan , "test");
+    JobConf jobConf = jobControl.getWaitingJobs().get(0).getJobConf();
+
+    // verifying the jar gets on distributed cache
+    Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
+    Assert.assertEquals("size 1 for "+Arrays.toString(fileClassPaths), 1, fileClassPaths.length);
+    Path distributedCachePath = fileClassPaths[0];
+    Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
+    // hadoop bug requires path to not contain hdfs://hotname in front
+    Assert.assertTrue("starts with /: "+distributedCachePath, 
+        distributedCachePath.toString().startsWith("/"));
+    Assert.assertTrue("jar pushed to distributed cache should contain testUDF", 
+        jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), testUDFFileName));
+
+    // verifying the job jar does not contain the UDF
+//    jobConf.writeXml(System.out);
+    File submitJarFile = new File(jobConf.get("mapred.jar"));
+    Assert.assertFalse("the mapred.jar should *not* contain the testUDF", jarContainsFileNamed(submitJarFile, testUDFFileName));
+
+  }
+
+  /**
+   * checks if the given file name is in the jar 
+   * @param jarFile the jar to check
+   * @param name the name to find (full path in the jar)
+   * @return true if the name was found
+   * @throws IOException
+   */
+  private boolean jarContainsFileNamed(File jarFile, String name) throws IOException {
+    Enumeration<JarEntry> entries = new JarFile(jarFile).entries();
+    while (entries.hasMoreElements()) {
+      JarEntry entry = entries.nextElement();
+      if (entry.getName().equals(name)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * creates a jar containing a UDF not in the current classloader
+   * @param jarFile the jar to create
+   * @return the name of the class created (in the default package)
+   * @throws IOException
+   * @throws FileNotFoundException
+   */
+  private String createTestJar(File jarFile) throws IOException, FileNotFoundException {
+
+    // creating the source .java file
+    File javaFile = File.createTempFile("TestUDF", ".java");
+    javaFile.deleteOnExit();
+    String className = javaFile.getName().substring(0, javaFile.getName().lastIndexOf('.'));
+    FileWriter fw = new FileWriter(javaFile);
+    try {
+      fw.write("import org.apache.pig.EvalFunc;\n");
+      fw.write("import org.apache.pig.data.Tuple;\n");
+      fw.write("import java.io.IOException;\n");
+      fw.write("public class "+className+" extends EvalFunc<String> {\n");
+      fw.write("  public String exec(Tuple input) throws IOException {\n");
+      fw.write("    return \"test\";\n");
+      fw.write("  }\n");
+      fw.write("}\n");
+    } finally {
+      fw.close();
+    }
+
+    // compiling it
+    JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+    StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
+    Iterable<? extends JavaFileObject> compilationUnits1 = fileManager.getJavaFileObjects(javaFile);
+    CompilationTask task = compiler.getTask(null, fileManager, null, null, null, compilationUnits1);
+    task.call();
+
+    // here is the compiled file
+    File classFile = new File(javaFile.getParentFile(), className+".class");
+    Assert.assertTrue(classFile.exists());
+
+    // putting it in the jar
+    JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarFile));
+    try {
+      jos.putNextEntry(new ZipEntry(classFile.getName()));
+      try {
+        InputStream testClassContentIS = new FileInputStream(classFile);
+        try {
+          byte[] buffer = new byte[64000];
+          int n;
+          while ((n = testClassContentIS.read(buffer)) != -1) {
+            jos.write(buffer, 0, n);
+          }
+        } finally {
+          testClassContentIS.close();
+        }
+      }finally {
+        jos.closeEntry();
+      }
+    } finally {
+      jos.close();
+    }
+
+    return className;
+  }
+}