You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/09/17 00:18:55 UTC

svn commit: r1625418 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/execut...

Author: daijy
Date: Tue Sep 16 22:18:54 2014
New Revision: 1625418

URL: http://svn.apache.org/r1625418
Log:
PIG-4141: Ship UDF/LoadFunc/StoreFunc dependent jar automatically

Added:
    pig/trunk/src/org/apache/pig/StoreResources.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java
    pig/trunk/src/org/apache/pig/builtin/FuncUtils.java
    pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/ivy.xml
    pig/trunk/src/org/apache/pig/EvalFunc.java
    pig/trunk/src/org/apache/pig/LoadFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
    pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
    pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
    pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
    pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/impl/util/JarManager.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/scripting/Pig.java
    pig/trunk/test/e2e/pig/tests/orc.conf
    pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
    pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
    pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Sep 16 22:18:54 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4141: Ship UDF/LoadFunc/StoreFunc dependent jar automatically (daijy)
+
 PIG-4146: Create a target to run mr and tez unit test in one shot (daijy)
 
 PIG-4144: Make pigunit.PigTest work in tez mode (daijy)

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Tue Sep 16 22:18:54 2014
@@ -411,9 +411,9 @@
     <dependency org="org.apache.hive" name="hive-common" rev="${hive.version}" changing="true"
       conf="compile->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-common" rev="${hive.version}" changing="true"
-      conf="test->master" />
+      conf="compile->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-common-secure" rev="${hive.version}" changing="true"
-      conf="test->master" />
+      conf="compile->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive.version}" changing="true"
       conf="hadoop23->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-0.20S" rev="${hive.version}" changing="true"

Modified: pig/trunk/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/EvalFunc.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/EvalFunc.java (original)
+++ pig/trunk/src/org/apache/pig/EvalFunc.java Tue Sep 16 22:18:54 2014
@@ -276,7 +276,7 @@ public abstract class EvalFunc<T>  {
     }
 
     /**
-     * Allow a UDF to specify a list of files it would like placed in the distributed
+     * Allow a UDF to specify a list of hdfs files it would like placed in the distributed
      * cache.  These files will be put in the cache for every job the UDF is used in.
      * The default implementation returns null.
      * @return A list of files
@@ -285,6 +285,17 @@ public abstract class EvalFunc<T>  {
         return null;
     }
 
+    /**
+     * Allow a UDF to specify a list of local files it would like placed in the distributed
+     * cache. These files will be put in the cache for every job the UDF is used in. Check for
+     * {@link FuncUtils} for utility function to facilitate it
+     * The default implementation returns null.
+     * @return A list of files
+     */
+    public List<String> getShipFiles() {
+        return null;
+    }
+
     public PigLogger getPigLogger() {
         return pigLogger;
     }

Modified: pig/trunk/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadFunc.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/LoadFunc.java (original)
+++ pig/trunk/src/org/apache/pig/LoadFunc.java Tue Sep 16 22:18:54 2014
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
-
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
@@ -305,4 +304,24 @@ public abstract class LoadFunc {
     public final void warn(String msg, Enum warningEnum) {
         PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
     }
+
+    /**
+     * Allow a LoadFunc to specify a list of files it would like placed in the distributed 
+     * cache.
+     * The default implementation returns null.
+     * @return A list of files
+     */
+    public List<String> getCacheFiles() {
+        return null;
+    }
+
+    /**
+     * Allow a LoadFunc to specify a list of files located locally and would like to ship to backend 
+     * (through distributed cache). Check for {@link FuncUtils} for utility function to facilitate it
+     * The default implementation returns null.
+     * @return A list of files
+     */
+    public List<String> getShipFiles() {
+        return null;
+    }
 }

Added: pig/trunk/src/org/apache/pig/StoreResources.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreResources.java?rev=1625418&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreResources.java (added)
+++ pig/trunk/src/org/apache/pig/StoreResources.java Tue Sep 16 22:18:54 2014
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.util.List;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This interface allow StoreFunc to specify resources needed
+ * in distributed cache. The resources can be on dfs (getCacheFiles)
+ * or locally (getShipFiles)
+ * @since Pig 0.14
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface StoreResources {
+    /**
+     * Allow a StoreFunc to specify a list of files it would like placed in the distributed 
+     * cache.
+     * The default implementation returns null.
+     * @return A list of files
+     */
+    public List<String> getCacheFiles();
+
+    /**
+     * Allow a StoreFunc to specify a list of files located locally and would like to ship to backend 
+     * (through distributed cache). Check for {@link FuncUtils} for utility function to facilitate it
+     * The default implementation returns null.
+     * @return A list of files
+     */
+    public List<String> getShipFiles();
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Sep 16 22:18:54 2014
@@ -18,8 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -77,9 +75,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
@@ -546,42 +544,6 @@ public class JobControlCompiler{
                 nwJob.setNumReduceTasks(0);
             }
 
-            for (String udf : mro.UDFs) {
-                if (udf.contains("GFCross")) {
-                    Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf));
-                    if (func instanceof GFCross) {
-                        String crossKey = ((GFCross)func).getCrossKey();
-                        // If non GFCross has been processed yet
-                        if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) {
-                            pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
-                                    Integer.toString(nwJob.getNumReduceTasks()));
-                        }
-                        conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
-                                (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey));
-                    }
-                }
-            }
-
-            if(lds!=null && lds.size()>0){
-                for (POLoad ld : lds) {
-                    //Store the target operators for tuples read
-                    //from this input
-                    List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
-                    List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
-                    if(ldSucs!=null){
-                        for (PhysicalOperator operator2 : ldSucs) {
-                            ldSucKeys.add(operator2.getOperatorKey());
-                        }
-                    }
-                    inpTargets.add(ldSucKeys);
-                    inpSignatureLists.add(ld.getSignature());
-                    inpLimits.add(ld.getLimit());
-                    //Remove the POLoad from the plan
-                    if (!pigContext.inIllustrator)
-                        mro.mapPlan.remove(ld);
-                }
-            }
-
             if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal())
             {
                 if (okToRunLocal(nwJob, mro, lds)) {
@@ -610,6 +572,22 @@ public class JobControlCompiler{
                     conf.setBoolean(PigImplConstants.CONVERTED_TO_LOCAL, true);
                 } else {
                     log.info(BIG_JOB_LOG_MSG);
+                    // Search to see if we have any UDF/LoadFunc/StoreFunc that need to pack things into the
+                    // distributed cache.
+                    List<String> cacheFiles = new ArrayList<String>();
+                    List<String> shipFiles = new ArrayList<String>();
+                    UdfCacheShipFilesVisitor mapUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.mapPlan);
+                    mapUdfCacheFileVisitor.visit();
+                    cacheFiles.addAll(mapUdfCacheFileVisitor.getCacheFiles());
+                    shipFiles.addAll(mapUdfCacheFileVisitor.getShipFiles());
+
+                    UdfCacheShipFilesVisitor reduceUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.reducePlan);
+                    reduceUdfCacheFileVisitor.visit();
+                    cacheFiles.addAll(reduceUdfCacheFileVisitor.getCacheFiles());
+                    shipFiles.addAll(reduceUdfCacheFileVisitor.getShipFiles());
+
+                    setupDistributedCache(pigContext, conf, cacheFiles.toArray(new String[]{}), false);
+
                     // Setup the DistributedCache for this job
                     List<URL> allJars = new ArrayList<URL>();
 
@@ -626,6 +604,13 @@ public class JobControlCompiler{
                         }
                     }
 
+                    for (String shipFile : shipFiles) {
+                        URL jar = new File(shipFile).toURI().toURL();
+                        if (!allJars.contains(jar)) {
+                            allJars.add(jar);
+                        }
+                    }
+
                     for (String defaultJar : JarManager.getDefaultJars()) {
                         URL jar = new File(defaultJar).toURI().toURL();
                         if (!allJars.contains(jar)) {
@@ -653,6 +638,42 @@ public class JobControlCompiler{
                 }
             }
 
+            for (String udf : mro.UDFs) {
+                if (udf.contains("GFCross")) {
+                    Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf));
+                    if (func instanceof GFCross) {
+                        String crossKey = ((GFCross)func).getCrossKey();
+                        // If non GFCross has been processed yet
+                        if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) {
+                            pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
+                                    Integer.toString(nwJob.getNumReduceTasks()));
+                        }
+                        conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
+                                (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey));
+                    }
+                }
+            }
+
+            if(lds!=null && lds.size()>0){
+                for (POLoad ld : lds) {
+                    //Store the target operators for tuples read
+                    //from this input
+                    List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
+                    List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
+                    if(ldSucs!=null){
+                        for (PhysicalOperator operator2 : ldSucs) {
+                            ldSucKeys.add(operator2.getOperatorKey());
+                        }
+                    }
+                    inpTargets.add(ldSucKeys);
+                    inpSignatureLists.add(ld.getSignature());
+                    inpLimits.add(ld.getLimit());
+                    //Remove the POLoad from the plan
+                    if (!pigContext.inIllustrator)
+                        mro.mapPlan.remove(ld);
+                }
+            }
+
             if(Utils.isLocal(pigContext, conf)) {
                 ConfigurationUtil.replaceConfigForLocalMode(conf);
             }
@@ -779,10 +800,6 @@ public class JobControlCompiler{
             // serialized
             setupDistributedCacheForJoin(mro, pigContext, conf);
 
-            // Search to see if we have any UDFs that need to pack things into the
-            // distributed cache.
-            setupDistributedCacheForUdfs(mro, pigContext, conf);
-
             SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
 
             POPackage pack = null;
@@ -1478,13 +1495,6 @@ public class JobControlCompiler{
         .visit();
     }
 
-    private void setupDistributedCacheForUdfs(MapReduceOper mro,
-            PigContext pigContext,
-            Configuration conf) throws IOException {
-        new UdfDistributedCacheVisitor(mro.mapPlan, pigContext, conf).visit();
-        new UdfDistributedCacheVisitor(mro.reducePlan, pigContext, conf).visit();
-    }
-
     private static void setupDistributedCache(PigContext pigContext,
             Configuration conf,
             Properties properties, String key,
@@ -1637,7 +1647,6 @@ public class JobControlCompiler{
         Path pathInHDFS = shipToHDFS(pigContext, conf, url);
         // and add to the DistributedCache
         DistributedCache.addFileToClassPath(pathInHDFS, conf);
-        pigContext.addSkipJar(url.getPath());
     }
 
     private static Path getCacheStagingDir(Configuration conf) throws IOException {
@@ -1854,41 +1863,6 @@ public class JobControlCompiler{
         }
     }
 
-    private static class UdfDistributedCacheVisitor extends PhyPlanVisitor {
-
-        private PigContext pigContext = null;
-        private Configuration conf = null;
-
-        public UdfDistributedCacheVisitor(PhysicalPlan plan,
-                PigContext pigContext,
-                Configuration conf) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
-                    plan));
-            this.pigContext = pigContext;
-            this.conf = conf;
-        }
-
-        @Override
-        public void visitUserFunc(POUserFunc func) throws VisitorException {
-
-            // XXX Hadoop currently doesn't support distributed cache in local mode.
-            // This line will be removed after the support is added
-            if (Utils.isLocal(pigContext, conf)) return;
-
-            // set up distributed cache for files indicated by the UDF
-            String[] files = func.getCacheFiles();
-            if (files == null) return;
-
-            try {
-                setupDistributedCache(pigContext, conf, files, false);
-            } catch (IOException e) {
-                String msg = "Internal error. Distributed cache could not " +
-                        "be set up for the requested files";
-                throw new VisitorException(msg, e);
-            }
-        }
-    }
-
     private static class ParallelConstantVisitor extends PhyPlanVisitor {
 
         private int rp;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Tue Sep 16 22:18:54 2014
@@ -67,7 +67,8 @@ public class POUserFunc extends Expressi
 
     private transient String counterGroup;
     private transient EvalFunc func;
-    private transient String[] cacheFiles = null;
+    private transient List<String> cacheFiles = null;
+    private transient List<String> shipFiles = null;
 
     FuncSpec funcSpec;
     FuncSpec origFSpec;
@@ -556,14 +557,22 @@ public class POUserFunc extends Expressi
         instantiateFunc(funcSpec);
     }
 
-    public String[] getCacheFiles() {
+    public List<String> getCacheFiles() {
         return cacheFiles;
     }
 
-    public void setCacheFiles(String[] cf) {
+    public void setCacheFiles(List<String> cf) {
         cacheFiles = cf;
     }
 
+    public List<String> getShipFiles() {
+        return shipFiles;
+    }
+
+    public void setShipFiles(List<String> sf) {
+        shipFiles = sf;
+    }
+
     public boolean combinable() {
         return (func instanceof Algebraic);
     }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java?rev=1625418&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/UdfCacheShipFilesVisitor.java Tue Sep 16 22:18:54 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UdfCacheShipFilesVisitor extends PhyPlanVisitor {
+    private Set<String> cacheFiles = new HashSet<String>();
+    private Set<String> shipFiles = new HashSet<String>();
+
+    public UdfCacheShipFilesVisitor(PhysicalPlan plan) {
+        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+    }
+
+    @Override
+    public void visitLoad(POLoad ld) throws VisitorException {
+        if (ld.getCacheFiles() != null) {
+            cacheFiles.addAll(ld.getCacheFiles());
+        }
+        if (ld.getShipFiles() != null) {
+            shipFiles.addAll(ld.getShipFiles());
+        }
+    }
+
+    @Override
+    public void visitStore(POStore st) throws VisitorException {
+        if (st.getCacheFiles() != null) {
+            cacheFiles.addAll(st.getCacheFiles());
+        }
+        if (st.getShipFiles() != null) {
+            shipFiles.addAll(st.getShipFiles());
+        }
+    }
+
+    public void visitUserFunc(POUserFunc udf) throws VisitorException {
+        if (udf.getCacheFiles() != null) {
+            cacheFiles.addAll(udf.getCacheFiles());
+        }
+        if (udf.getShipFiles() != null) {
+            shipFiles.addAll(udf.getShipFiles());
+        }
+    }
+
+    public Set<String> getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public Set<String> getShipFiles() {
+        return shipFiles;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Tue Sep 16 22:18:54 2014
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,9 @@ public class POLoad extends PhysicalOper
     private boolean isTmpLoad;
     
     private long limit=-1;
+
+    private transient List<String> cacheFiles = null;
+    private transient List<String> shipFiles = null;
     
     public POLoad(OperatorKey k) {
         this(k,-1, null);
@@ -252,4 +256,20 @@ public class POLoad extends PhysicalOper
     public void setLimit(long limit) {
         this.limit = limit;
     }
+
+    public List<String> getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public void setCacheFiles(List<String> cf) {
+        cacheFiles = cf;
+    }
+
+    public List<String> getShipFiles() {
+        return shipFiles;
+    }
+
+    public void setShipFiles(List<String> sf) {
+        shipFiles = sf;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Tue Sep 16 22:18:54 2014
@@ -81,6 +81,9 @@ public class POStore extends PhysicalOpe
 
     private String signature;
 
+    private transient List<String> cacheFiles = null;
+    private transient List<String> shipFiles = null;
+
     public POStore(OperatorKey k) {
         this(k, -1, null);
     }
@@ -313,4 +316,20 @@ public class POStore extends PhysicalOpe
     public void setStoreFunc(StoreFuncInterface storeFunc) {
         this.storer = storeFunc;
     }
+
+    public List<String> getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public void setCacheFiles(List<String> cf) {
+        cacheFiles = cf;
+    }
+
+    public List<String> getShipFiles() {
+        return shipFiles;
+    }
+
+    public void setShipFiles(List<String> sf) {
+        shipFiles = sf;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POStoreTez.java Tue Sep 16 22:18:54 2014
@@ -54,6 +54,8 @@ public class POStoreTez extends POStore 
     public POStoreTez(POStore copy) {
         super(copy);
         this.outputKey = copy.getOperatorKey().toString();
+        this.setCacheFiles(copy.getCacheFiles());
+        this.setShipFiles(copy.getShipFiles());
     }
 
     public String getOutputKey() {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Tue Sep 16 22:18:54 2014
@@ -107,6 +107,7 @@ public class TezOperPlan extends Operato
         TezPOUserFuncVisitor udfVisitor = new TezPOUserFuncVisitor(this);
         udfVisitor.visit();
 
+        addShipResources(udfVisitor.getShipFiles());
         addCacheResources(udfVisitor.getCacheFiles());
 
         return TezResourceManager.getInstance().getTezResources(extraResources.keySet());

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOUserFuncVisitor.java Tue Sep 16 22:18:54 2014
@@ -17,19 +17,16 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class TezPOUserFuncVisitor extends TezOpPlanVisitor {
     private Set<String> cacheFiles = new HashSet<String>();
+    private Set<String> shipFiles = new HashSet<String>();
 
     public TezPOUserFuncVisitor(TezOperPlan plan) {
         super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
@@ -38,26 +35,18 @@ public class TezPOUserFuncVisitor extend
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
         if(!tezOp.plan.isEmpty()) {
-            UdfCacheFileVisitor udfCacheFileVisitor = new UdfCacheFileVisitor(tezOp.plan);
+            UdfCacheShipFilesVisitor udfCacheFileVisitor = new UdfCacheShipFilesVisitor(tezOp.plan);
             udfCacheFileVisitor.visit();
-        }
-    }
-
-    class UdfCacheFileVisitor extends PhyPlanVisitor {
-
-        public UdfCacheFileVisitor(PhysicalPlan plan) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
-        }
-
-        public void visitUserFunc(POUserFunc udf) throws VisitorException {
-            String[] files = udf.getCacheFiles();
-            if (files != null) {
-                cacheFiles.addAll(Arrays.asList(files));
-            }
+            cacheFiles.addAll(udfCacheFileVisitor.getCacheFiles());
+            shipFiles.addAll(udfCacheFileVisitor.getShipFiles());
         }
     }
 
     public Set<String> getCacheFiles() {
         return cacheFiles;
     }
+
+    public Set<String> getShipFiles() {
+        return shipFiles;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java Tue Sep 16 22:18:54 2014
@@ -117,6 +117,8 @@ public class LoaderProcessor extends Tez
                 POSimpleTezLoad tezLoad = new POSimpleTezLoad(ld.getOperatorKey(), ld.getLFile());
                 tezLoad.setInputKey(ld.getOperatorKey().toString());
                 tezLoad.copyAliasFrom(ld);
+                tezLoad.setCacheFiles(ld.getCacheFiles());
+                tezLoad.setShipFiles(ld.getShipFiles());
                 tezOp.plan.add(tezLoad);
                 for (PhysicalOperator sucs : ldSucs) {
                     tezOp.plan.connect(tezLoad, sucs);

Modified: pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AvroStorage.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AvroStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/AvroStorage.java Tue Sep 16 22:18:54 2014
@@ -31,6 +31,7 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroInputFormat;
 import org.apache.avro.mapred.AvroOutputFormat;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -63,9 +64,11 @@ import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.util.avro.AvroArrayReader;
@@ -82,7 +85,7 @@ import com.google.common.collect.Maps;
  *
  */
 public class AvroStorage extends LoadFunc
-    implements StoreFuncInterface, LoadMetadata, LoadPushDown {
+    implements StoreFuncInterface, LoadMetadata, LoadPushDown, StoreResources {
 
   /**
    *  Creates new instance of Pig Storage function, without specifying
@@ -674,4 +677,9 @@ public class AvroStorage extends LoadFun
 
   }
 
+  @Override
+  public List<String> getShipFiles() {
+      Class[] classList = new Class[] {Schema.class, AvroInputFormat.class};
+      return FuncUtils.getShipFiles(classList);
+  }
 }

Added: pig/trunk/src/org/apache/pig/builtin/FuncUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/FuncUtils.java?rev=1625418&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/FuncUtils.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/FuncUtils.java Tue Sep 16 22:18:54 2014
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.util.JarManager;
+
+public class FuncUtils {
+    /**
+     * Utility function to get a list of containing jars via classes
+     * @param classesIdentifyingJars classes used to identify containing jars
+     * @return list of containing jars
+     */
+    public static List<String> getShipFiles(Class[] classesIdentifyingJars) {
+        List<String> cacheFiles = new ArrayList<String>();
+
+        for (Class clz : classesIdentifyingJars) {
+            String jar = JarManager.findContainingJar(clz);
+            cacheFiles.add(jar);
+        }
+        return cacheFiles;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonLoader.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonLoader.java Tue Sep 16 22:18:54 2014
@@ -19,28 +19,23 @@ package org.apache.pig.builtin;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.joda.time.format.ISODateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
-
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
-
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
 import org.apache.pig.Expression;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -56,6 +51,7 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.parser.ParserException;
@@ -372,4 +368,11 @@ public class JsonLoader extends LoadFunc
     throws IOException {
         // We don't have partitions
     }
+
+    @Override
+    public List<String> getShipFiles() {
+        List<String> cacheFiles = new ArrayList<String>();
+        Class[] classList = new Class[] {JsonFactory.class};
+        return FuncUtils.getShipFiles(classList);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonStorage.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonStorage.java Tue Sep 16 22:18:54 2014
@@ -19,17 +19,15 @@ package org.apache.pig.builtin;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 
-import org.joda.time.DateTime;
-
 import org.codehaus.jackson.JsonEncoding;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -38,25 +36,18 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreMetadata;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreResources;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
 /**
  * A JSON Pig store function.  Each Pig tuple is stored on one line (as one
  * value for TextOutputFormat) so that it can be read easily using
@@ -66,7 +57,7 @@ import java.util.TreeMap;
  * with mapping between JSON and Pig types. The schema file share the same format
  * as the one we use in PigStorage.
  */
-public class JsonStorage extends StoreFunc implements StoreMetadata {
+public class JsonStorage extends StoreFunc implements StoreMetadata, StoreResources {
 
     protected RecordWriter writer = null;
     protected ResourceSchema schema = null;
@@ -318,4 +309,14 @@ public class JsonStorage extends StoreFu
       return s;
     }
 
+    @Override
+    public List<String> getShipFiles() {
+        Class[] classList = new Class[] {JsonFactory.class};
+        return FuncUtils.getShipFiles(classList);
+    }
+
+    @Override
+    public List<String> getCacheFiles() {
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Tue Sep 16 22:18:54 2014
@@ -49,11 +49,13 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -80,6 +82,7 @@ import org.apache.pig.Expression.BinaryE
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.StoreResources;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataType;
@@ -109,7 +112,7 @@ import com.google.common.annotations.Vis
  * <li><code>-v, --version</code> Sets the version of the file that will be written
  * </ul>
  **/
-public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown {
+public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown, StoreResources {
 
     //TODO Make OrcInputFormat.SARG_PUSHDOWN visible
     private static final String SARG_PUSHDOWN = "sarg.pushdown";
@@ -382,6 +385,25 @@ public class OrcStorage extends LoadFunc
         }
     }
 
+    @Override
+    public List<String> getShipFiles() {
+        List<String> cacheFiles = new ArrayList<String>();
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        Class hadoopVersionShimsClass;
+        try {
+            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
+                    hadoopVersion + "Shims");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
+        }
+        Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
+                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass};
+        return FuncUtils.getShipFiles(classList);
+    }
+
     private static Path getFirstFile(String location, FileSystem fs) throws IOException {
         String[] locations = getPathStrings(location);
         Path[] paths = new Path[locations.length];

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Tue Sep 16 22:18:54 2014
@@ -105,7 +105,9 @@ public class PigContext implements Seria
      * Resources for the job (jars, scripting udf files, cached macro abstract syntax trees)
      */
 
-    // extra jar files that are needed to run a job
+    // Jar files that are global to the whole Pig script, includes
+    // 1. registered jars
+    // 2. Jars defined in -Dpig.additional.jars
     transient public List<URL> extraJars = new LinkedList<URL>();
 
     // original paths each extra jar came from
@@ -115,10 +117,6 @@ public class PigContext implements Seria
     // jars needed for scripting udfs - jython.jar etc
     transient public List<String> scriptJars = new ArrayList<String>(2);
 
-    // jars that should not be merged in.
-    // (some functions may come from pig.jar and we don't want the whole jar file.)
-    transient public Vector<String> skipJars = new Vector<String>(2);
-
     // jars that are predeployed to the cluster and thus should not be merged in at all (even subsets).
     transient public Vector<String> predeployedJars = new Vector<String>(2);
 
@@ -260,13 +258,6 @@ public class PigContext implements Seria
         this.properties = properties;
 
         this.properties.setProperty("exectype", this.execType.name());
-        String pigJar = JarManager.findContainingJar(Main.class);
-        String hadoopJar = JarManager.findContainingJar(FileSystem.class);
-        if (pigJar != null) {
-            addSkipJar(pigJar);
-            if (!pigJar.equals(hadoopJar))
-                addSkipJar(hadoopJar);
-        }
 
         this.executionEngine = execType.getExecutionEngine(this);
 
@@ -345,12 +336,6 @@ public class PigContext implements Seria
         }
     }
 
-    public void addSkipJar(String path) {
-        if (path != null && !skipJars.contains(path)) {
-            skipJars.add(path);
-        }
-    }
-
     public void addJar(String path) throws MalformedURLException {
         if (path != null) {
             URL resource = (new File(path)).toURI().toURL();

Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Tue Sep 16 22:18:54 2014
@@ -68,8 +68,6 @@ public class JarManager {
         AUTOMATON(Automaton.class),
         ANTLR(CommonTokenStream.class),
         GUAVA(Multimaps.class),
-        JACKSON_CORE(JsonPropertyOrder.class),
-        JACKSON_MAPPER(JacksonStdImpl.class),
         JODATIME(DateTime.class);
 
         private final Class pkgClass;

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Tue Sep 16 22:18:54 2014
@@ -94,8 +94,21 @@ public class Utils {
     public static boolean isVendorIBM() {    	
     	  return System.getProperty("java.vendor").contains("IBM");
     }
-    
-    
+
+    public static boolean isHadoop23() {
+        String version = org.apache.hadoop.util.VersionInfo.getVersion();
+        if (version.matches("\\b0\\.23\\..+\\b"))
+            return true;
+        return false;
+    }
+
+    public static boolean isHadoop2() {
+        String version = org.apache.hadoop.util.VersionInfo.getVersion();
+        if (version.matches("\\b2\\.\\d+\\..+"))
+            return true;
+        return false;
+    }
+
     /**
      * This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
      * checks if two objects are equals - two levels of checks are

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Tue Sep 16 22:18:54 2014
@@ -512,7 +512,7 @@ public class ExpToPhyTranslationVisitor 
             }
             List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
             if (cacheFiles != null) {
-                ((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()]));
+                ((POUserFunc)p).setCacheFiles(cacheFiles);
             }
         } else {
             p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Tue Sep 16 22:18:54 2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreResources;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -142,6 +143,8 @@ public class LogToPhyTranslationVisitor 
         load.setSignature(loLoad.getSignature());
         load.setLimit(loLoad.getLimit());
         load.setIsTmpLoad(loLoad.isTmpLoad());
+        load.setCacheFiles(loLoad.getLoadFunc().getCacheFiles());
+        load.setShipFiles(loLoad.getLoadFunc().getShipFiles());
 
         currentPlan.add(load);
         logToPhyMap.put(loLoad, load);
@@ -953,8 +956,11 @@ public class LogToPhyTranslationVisitor 
         store.setSortInfo(loStore.getSortInfo());
         store.setIsTmpStore(loStore.isTmpStore());
         store.setStoreFunc(loStore.getStoreFunc());
-
         store.setSchema(Util.translateSchema( loStore.getSchema() ));
+        if (loStore.getStoreFunc() instanceof StoreResources) {
+            store.setCacheFiles(((StoreResources)loStore.getStoreFunc()).getCacheFiles());
+            store.setShipFiles(((StoreResources)loStore.getStoreFunc()).getShipFiles());
+        }
 
         currentPlan.add(store);
 

Modified: pig/trunk/src/org/apache/pig/scripting/Pig.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/Pig.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/Pig.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/Pig.java Tue Sep 16 22:18:54 2014
@@ -120,7 +120,7 @@ public class Pig {
      */
     public static void registerUDF(String udffile, String namespace)
             throws IOException {
-        LOG.info("Register script UFD file: "+ udffile);
+        LOG.info("Register script UDF file: "+ udffile);
         ScriptPigContext ctx = getScriptContext();
         ScriptEngine engine = ctx.getScriptEngine();
         // script file contains only functions, no need to separate

Modified: pig/trunk/test/e2e/pig/tests/orc.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/orc.conf?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/orc.conf (original)
+++ pig/trunk/test/e2e/pig/tests/orc.conf Tue Sep 16 22:18:54 2014
@@ -14,13 +14,6 @@ $cfg = {
                         'num' => 1,
                         'notmq' => 1,
                         'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
 store a into ':OUTPATH:.intermediate' using OrcStorage();
 exec
@@ -36,13 +29,6 @@ store b into ':OUTPATH:';\,
                         'num' => 2,
                         'notmq' => 1,
                         'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
 a = load ':INPATH:/singlefile/studentcomplextab10k' as (nameagegpamap:map[], nameagegpatuple:tuple(tname:chararray, tage:int, tgpa:float), nameagegpabag:bag{t:tuple(bname:chararray, bage:int, bgpa:float)});
 store a into ':OUTPATH:.intermediate' using OrcStorage();
 exec
@@ -56,13 +42,7 @@ store a into ':OUTPATH:';\,
                         {
                         'num' => 3,
                         'notmq' => 1,
-                        'pig' => q\register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
+                        'pig' => q\
 a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:float);
 store a into ':OUTPATH:.simple.intermediate' using OrcStorage();
 exec
@@ -93,13 +73,6 @@ store h into ':OUTPATH:';\,
                         'num' => 4,
                         'notmq' => 1,
                         'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
 store a into ':OUTPATH:.orc_params.intermediate' using OrcStorage('-c ZLIB -s 67108864 -r 100000 -b 1048576 -p true -v 0.12');
 exec
@@ -119,13 +92,6 @@ store a into ':OUTPATH:';\,
                         'num' => 1,
                         'notmq' => 1,
                         'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
 b = order a by name parallel 4;
 store b into ':OUTPATH:.intermediate' using OrcStorage();
@@ -141,13 +107,6 @@ store b into ':OUTPATH:';\,
                         'num' => 2,
                         'notmq' => 1,
                         'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
 a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:float);
 b = order a by age desc parallel 4;
 store b into ':OUTPATH:.intermediate' using OrcStorage('-s 10000000');
@@ -163,13 +122,6 @@ store b into ':OUTPATH:';\,
                         'num' => 3,
                         'notmq' => 1,
                         'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
 b = order a by gpa parallel 4;
 store b into ':OUTPATH:.intermediate' using OrcStorage();
@@ -185,13 +137,6 @@ store b into ':OUTPATH:';\,
                         'num' => 4,
                         'notmq' => 1,
                         'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:bigdecimal);
 b = order a by gpa parallel 4;
 store b into ':OUTPATH:.intermediate' using OrcStorage();
@@ -209,13 +154,6 @@ store b into ':OUTPATH:';\,
                         'num' => 5,
                         'notmq' => 1,
                         'pig' => q\
-register :HIVELIBDIR:/hive-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-serde-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-exec-:HIVEVERSION:-core.jar;
-register :HIVELIBDIR:/hive-shims-common-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-common-secure-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/hive-shims-:HIVESHIMSVERSION:-:HIVEVERSION:.jar;
-register :HIVELIBDIR:/kryo*.jar;
 a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
 b = foreach a generate name, age, gpa, (age>35 ? ToDate('20100101', 'yyyyMMdd', 'UTC') : ToDate('20100105', 'yyyyMMdd', 'UTC')) as d;
 c = order b by d parallel 4;

Modified: pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original)
+++ pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Tue Sep 16 22:18:54 2014
@@ -351,7 +351,7 @@ public class TestLoadStoreFuncLifeCycle 
         // result, the number of StoreFunc instances is greater by 1 in
         // Hadoop-2.0.x.
         assertTrue("storer instanciation count increasing: " + Storer.count,
-                Storer.count <= (Util.isHadoop2_0() ? 5 : 4));
+                Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4));
 
     }
 

Modified: pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java Tue Sep 16 22:18:54 2014
@@ -72,7 +72,7 @@ public class TestQueryParserUtils {
         QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc);
         assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
 
-        if(Util.isHadoop23() || Util.isHadoop2_0()) {
+        if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
             // webhdfs
             props.remove(MRConfiguration.JOB_HDFS_SERVERS);
             QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);

Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Tue Sep 16 22:18:54 2014
@@ -126,7 +126,7 @@ public class TestJobControlCompiler {
 
     // verifying the jar gets on distributed cache
     Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
-    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 8, fileClassPaths.length);
+    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 6, fileClassPaths.length);
     Path distributedCachePath = fileClassPaths[0];
     Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
     // hadoop bug requires path to not contain hdfs://hotname in front

Added: pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1625418&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Tue Sep 16 22:18:54 2014
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPOUserFuncVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainer;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLoaderStorerShipCacheFiles {
+    private static PigServer pigServer;
+
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(ExecType.LOCAL);
+        pigServer.getPigContext().inExplain = true;
+    }
+
+    @Test
+    public void testShipOrcLoader() throws Exception {
+        String query = "a = load 'test/org/apache/pig/builtin/orc/orc-file-11-format.orc' using OrcStorage();" +
+                "store a into 'ooo';";
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", 
+                "hive-shims-0." + hadoopVersion, "hive-shims-common-0", "hive-shims-common-secure"};
+
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pigServer.getPigContext());
+        assertMRPlanContains(mrPlan.getRoots().get(0), expectedJars, 6);
+
+        TezLauncher launcher = new TezLauncher();
+        TezPlanContainer tezPlanContainer = launcher.compile(pp, pigServer.getPigContext());
+        assertTezPlanContains(tezPlanContainer.getRoots().get(0).getNode(), expectedJars, 6);
+    }
+
+    @Test
+    public void testShipOrcStorer() throws Exception {
+        String query = "a = load '1.txt' as (name:chararray, age:int, gpa:double);" +
+                "store a into 'ooo' using OrcStorage;";
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+
+        String hadoopVersion = "20S";
+        if (Utils.isHadoop23() || Utils.isHadoop2()) {
+            hadoopVersion = "23";
+        }
+        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", 
+                "hive-shims-0." + hadoopVersion, "hive-shims-common-0", "hive-shims-common-secure"};
+
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pigServer.getPigContext());
+        assertMRPlanContains(mrPlan.getRoots().get(0), expectedJars, 6);
+
+        TezLauncher launcher = new TezLauncher();
+        TezPlanContainer tezPlanContainer = launcher.compile(pp, pigServer.getPigContext());
+        assertTezPlanContains(tezPlanContainer.getRoots().get(0).getNode(), expectedJars, 6);
+    }
+
+    @Test
+    public void testShipAvroLoader() throws Exception {
+        String query = "a = load '1.txt' as (name:chararray, age:int, gpa:double);" +
+                "store a into 'ooo' using AvroStorage();";
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+
+        String[] expectedJars = new String[] {"avro-1", "avro-mapred-"};
+
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pigServer.getPigContext());
+        assertMRPlanContains(mrPlan.getRoots().get(0), expectedJars, 2);
+
+        TezLauncher launcher = new TezLauncher();
+        TezPlanContainer tezPlanContainer = launcher.compile(pp, pigServer.getPigContext());
+        assertTezPlanContains(tezPlanContainer.getRoots().get(0).getNode(), expectedJars, 2);
+    }
+
+    @Test
+    public void testShipJsonLoader() throws Exception {
+        String query = "a = load '1.txt' as (name:chararray, age:int, gpa:double);" +
+                "b = order a by name;" +
+                "store b into 'ooo' using JsonStorage();";
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+
+        String[] expectedJars = new String[] {"jackson-core-"};
+
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pigServer.getPigContext());
+        assertMRPlanContains(mrPlan.getLeaves().get(0), expectedJars, 1);
+
+        TezLauncher launcher = new TezLauncher();
+        TezPlanContainer tezPlanContainer = launcher.compile(pp, pigServer.getPigContext());
+        assertTezPlanContains(tezPlanContainer.getRoots().get(0).getNode(), expectedJars, 1);
+    }
+
+    private void assertMRPlanContains(MapReduceOper mro, String[] expectedFiles, int size) throws VisitorException {
+        List<String> cacheFiles = new ArrayList<String>();
+        List<String> shipFiles = new ArrayList<String>();
+        UdfCacheShipFilesVisitor mapUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.mapPlan);
+        mapUdfCacheFileVisitor.visit();
+        cacheFiles.addAll(mapUdfCacheFileVisitor.getCacheFiles());
+        shipFiles.addAll(mapUdfCacheFileVisitor.getShipFiles());
+
+        UdfCacheShipFilesVisitor reduceUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.reducePlan);
+        reduceUdfCacheFileVisitor.visit();
+        cacheFiles.addAll(reduceUdfCacheFileVisitor.getCacheFiles());
+        shipFiles.addAll(reduceUdfCacheFileVisitor.getShipFiles());
+
+        Assert.assertEquals(shipFiles.size(), size);
+        assertContains(shipFiles, expectedFiles);
+    }
+
+    private void assertTezPlanContains(TezOperPlan plan, String[] expectedFiles, int size) throws VisitorException {
+        TezPOUserFuncVisitor udfVisitor = new TezPOUserFuncVisitor(plan);
+        udfVisitor.visit();
+
+        List<String> shipFiles = new ArrayList<String>();
+        shipFiles.addAll(udfVisitor.getShipFiles());
+
+        Assert.assertEquals(shipFiles.size(), size);
+        assertContains(shipFiles, expectedFiles);
+    }
+
+    private void assertContains(List<String> collectedFiles, String[] expectedFiles) {
+        for (String expectedFile : expectedFiles) {
+            boolean found = false;
+            for (String collectedFile : collectedFiles) {
+                if (collectedFile.contains(expectedFile)) {
+                    found = true;
+                    break;
+                }
+            }
+            Assert.assertTrue(found);
+        }
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Tue Sep 16 22:18:54 2014
@@ -633,7 +633,7 @@ public class TestPigRunner {
     @Test
     public void classLoaderTest() throws Exception {
         // Skip in hadoop 23 test, see PIG-2449
-        if (Util.isHadoop23() || Util.isHadoop2_0())
+        if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
             return;
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("register test/org/apache/pig/test/data/pigtestloader.jar");

Modified: pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPredeployedJar.java Tue Sep 16 22:18:54 2014
@@ -34,7 +34,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -58,22 +57,22 @@ public class TestPredeployedJar {
         pigServer.getPigContext().getProperties().put(PigConfiguration.OPT_FETCH, "false");
         String[] inputData = new String[] { "hello", "world" };
         Util.createInputFile(cluster, "a.txt", inputData);
-        String jacksonJar = JarManager.findContainingJar(org.codehaus.jackson.JsonParser.class);
+        String guavaJar = JarManager.findContainingJar(com.google.common.collect.Multimaps.class);
 
         pigServer.registerQuery("a = load 'a.txt' as (line:chararray);");
         Iterator<Tuple> it = pigServer.openIterator("a");
 
         String content = FileUtils.readFileToString(logFile);
-        Assert.assertTrue(content.contains(jacksonJar));
+        Assert.assertTrue(content.contains(guavaJar));
         
         logFile = File.createTempFile("log", "");
         
-        // Now let's mark the jackson jar as predeployed.
-        pigServer.getPigContext().markJarAsPredeployed(jacksonJar);
+        // Now let's mark the guava jar as predeployed.
+        pigServer.getPigContext().markJarAsPredeployed(guavaJar);
         it = pigServer.openIterator("a");
 
         content = FileUtils.readFileToString(logFile);
-        Assert.assertFalse(content.contains(jacksonJar));
+        Assert.assertFalse(content.contains(guavaJar));
     }
     
     @Test

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1625418&r1=1625417&r2=1625418&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Tue Sep 16 22:18:54 2014
@@ -611,7 +611,7 @@ public class Util {
      }
 
      static private String getMkDirCommandForHadoop2_0(String fileName) {
-         if (Util.isHadoop23() || Util.isHadoop2_0()) {
+         if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
              Path parentDir = new Path(fileName).getParent();
              String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
              return mkdirCommand;
@@ -1231,13 +1231,6 @@ public class Util {
         return plan.replaceAll("','','[^']*','scope','true'\\)\\)", "','','','scope','true'))");
     }
 
-    public static boolean isHadoop23() {
-        String version = org.apache.hadoop.util.VersionInfo.getVersion();
-        if (version.matches("\\b0\\.23\\..+\\b"))
-            return true;
-        return false;
-    }
-
     public static boolean isHadoop203plus() {
         String version = org.apache.hadoop.util.VersionInfo.getVersion();
         if (version.matches("\\b0\\.20\\.2\\b"))
@@ -1274,13 +1267,6 @@ public class Util {
         assertEquals("Unexpected value found in configs for " + param, expected, conf.getLong(param, -1));
     }
 
-    public static boolean isHadoop2_0() {
-        String version = org.apache.hadoop.util.VersionInfo.getVersion();
-        if (version.matches("\\b2\\.\\d\\..+"))
-            return true;
-        return false;
-    }
-
     /**
      * Returns a PathFilter that filters out filenames that start with _.
      * @return PathFilter