You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC

svn commit: r1642132 [4/14] - in /pig/branches/spark: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/ contrib/piggybank/java/sr...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Thu Nov 27 12:49:54 2014
@@ -38,7 +38,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
@@ -57,6 +56,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.builtin.SampleLoader;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -80,7 +80,7 @@ public class FetchOptimizer {
      */
     public static boolean isFetchEnabled(PigContext pc) {
         return "true".equalsIgnoreCase(
-                pc.getProperties().getProperty(PigConfiguration.OPT_FETCH, "true"));
+                pc.getProperties().getProperty(PigConfiguration.PIG_OPT_FETCH, "true"));
     }
 
     /**
@@ -97,14 +97,20 @@ public class FetchOptimizer {
             FetchablePlanVisitor fpv = new FetchablePlanVisitor(pc, pp);
             fpv.visit();
             // Plan is fetchable only if FetchablePlanVisitor returns true AND
-            // limit is present in the plan. Limit is a safeguard. If the input
-            // is large, and there is no limit, fetch optimizer will fetch the
-            // entire input to the client. That can be dangerous.
-            boolean isFetchable = fpv.isPlanFetchable() && 
-                    PlanHelper.containsPhysicalOperator(pp, POLimit.class);
-            if (isFetchable)
-                init(pp);
-            return isFetchable;
+            // limit is present in the plan, i.e: limit is pushed up to the loader.
+            // Limit is a safeguard. If the input is large, and there is no limit, 
+            // fetch optimizer will fetch the entire input to the client. That can be dangerous.
+            if (!fpv.isPlanFetchable()) {
+                return false;
+            }
+            for (POLoad load : PlanHelper.getPhysicalOperators(pp, POLoad.class)) {
+                if (load.getLimit() == -1) {
+                    return false;
+                }
+            }
+            pc.getProperties().setProperty(PigImplConstants.CONVERTED_TO_FETCH, "true");
+            init(pp);
+            return true;
         }
         return false;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Thu Nov 27 12:49:54 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -26,8 +28,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.util.UriUtil;
 
-import java.io.IOException;
-
 /**
  * Class that computes the size of output for file-based systems.
  */
@@ -43,19 +43,23 @@ public class FileBasedOutputSizeReader i
      */
     @Override
     public boolean supports(POStore sto, Configuration conf) {
-        String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName();
-        // Some store functions do not support file-based output reader (e.g.
-        // HCatStorer), so they should be excluded.
-        String unsupported = conf.get(
-                PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED);
-        if (unsupported != null) {
-            for (String s : unsupported.split(",")) {
-                if (s.equalsIgnoreCase(storeFuncName)) {
-                    return false;
+        boolean nullOrSupportedScheme = UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
+        if (nullOrSupportedScheme) {
+            // Some store functions that do not have scheme
+            // do not support file-based output reader (e.g.HCatStorer),
+            // so they should be excluded.
+            String unsupported = conf.get(
+                    PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED);
+            if (unsupported != null) {
+                String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName();
+                for (String s : unsupported.split(",")) {
+                    if (s.equalsIgnoreCase(storeFuncName)) {
+                        return false;
+                    }
                 }
             }
         }
-        return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
+        return nullOrSupportedScheme;
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Thu Nov 27 12:49:54 2014
@@ -92,12 +92,27 @@ public class InputSizeReducerEstimator i
         return reducers;
     }
 
+    static long getTotalInputFileSize(Configuration conf,
+            List<POLoad> lds, Job job) throws IOException {
+        return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
+    }
+
     /**
      * Get the input size for as many inputs as possible. Inputs that do not report
      * their size nor can pig look that up itself are excluded from this size.
+     * 
+     * @param conf Configuration
+     * @param lds List of POLoads
+     * @param job Job
+     * @param max Maximum value of total input size that will trigger exit. Many
+     * times we're only interested whether the total input size is greater than
+     * X or not. In such case, we can exit the function early as soon as the max
+     * is reached.
+     * @return
+     * @throws IOException
      */
     static long getTotalInputFileSize(Configuration conf,
-            List<POLoad> lds, Job job) throws IOException {
+            List<POLoad> lds, Job job, long max) throws IOException {
         long totalInputFileSize = 0;
         for (POLoad ld : lds) {
             long size = getInputSizeFromLoader(ld, job);
@@ -115,8 +130,14 @@ public class InputSizeReducerEstimator i
                         FileStatus[] status = fs.globStatus(path);
                         if (status != null) {
                             for (FileStatus s : status) {
-                                totalInputFileSize += MapRedUtil.getPathLength(fs, s);
+                                totalInputFileSize += MapRedUtil.getPathLength(fs, s, max);
+                                if (totalInputFileSize > max) {
+                                    break;
+                                }
                             }
+                        } else {
+                            // If file is not found, we should report -1
+                            return -1;
                         }
                     } else {
                         // If we cannot estimate size of a location, we should report -1

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Nov 27 12:49:54 2014
@@ -17,9 +17,10 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import static org.apache.pig.PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR;
+import static org.apache.pig.PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY;
+
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -77,9 +78,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
@@ -163,9 +164,6 @@ public class JobControlCompiler{
 
     public static final String END_OF_INP_IN_MAP = "pig.invoke.close.in.map";
 
-    private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
-    private static final String REDUCER_ESTIMATOR_ARG_KEY =  "pig.exec.reducer.estimator.arg";
-
     public static final String PIG_MAP_COUNTER = "pig.counters.counter_";
     public static final String PIG_MAP_RANK_NAME = "pig.rank_";
     public static final String PIG_MAP_SEPARATOR = "_";
@@ -447,8 +445,8 @@ public class JobControlCompiler{
             return false;
         }
 
-        long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job);
         long inputByteMax = conf.getLong(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, 100*1000*1000l);
+        long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job, inputByteMax);
         log.info("Size of input: " + totalInputFileSize +" bytes. Small job threshold: " + inputByteMax );
         if (totalInputFileSize < 0 || totalInputFileSize > inputByteMax) {
             return false;
@@ -505,7 +503,7 @@ public class JobControlCompiler{
         Path tmpLocation = null;
 
         // add settings for pig statistics
-        String setScriptProp = conf.get(PigConfiguration.INSERT_ENABLED, "true");
+        String setScriptProp = conf.get(PigConfiguration.PIG_SCRIPT_INFO_ENABLED, "true");
         if (setScriptProp.equalsIgnoreCase("true")) {
             MRScriptState ss = MRScriptState.get();
             ss.addSettingsToConf(mro, conf);
@@ -546,42 +544,6 @@ public class JobControlCompiler{
                 nwJob.setNumReduceTasks(0);
             }
 
-            for (String udf : mro.UDFs) {
-                if (udf.contains("GFCross")) {
-                    Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf));
-                    if (func instanceof GFCross) {
-                        String crossKey = ((GFCross)func).getCrossKey();
-                        // If non GFCross has been processed yet
-                        if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) {
-                            pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
-                                    Integer.toString(nwJob.getNumReduceTasks()));
-                        }
-                        conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey,
-                                (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey));
-                    }
-                }
-            }
-
-            if(lds!=null && lds.size()>0){
-                for (POLoad ld : lds) {
-                    //Store the target operators for tuples read
-                    //from this input
-                    List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
-                    List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
-                    if(ldSucs!=null){
-                        for (PhysicalOperator operator2 : ldSucs) {
-                            ldSucKeys.add(operator2.getOperatorKey());
-                        }
-                    }
-                    inpTargets.add(ldSucKeys);
-                    inpSignatureLists.add(ld.getSignature());
-                    inpLimits.add(ld.getLimit());
-                    //Remove the POLoad from the plan
-                    if (!pigContext.inIllustrator)
-                        mro.mapPlan.remove(ld);
-                }
-            }
-
             if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal())
             {
                 if (okToRunLocal(nwJob, mro, lds)) {
@@ -610,6 +572,22 @@ public class JobControlCompiler{
                     conf.setBoolean(PigImplConstants.CONVERTED_TO_LOCAL, true);
                 } else {
                     log.info(BIG_JOB_LOG_MSG);
+                    // Search to see if we have any UDF/LoadFunc/StoreFunc that need to pack things into the
+                    // distributed cache.
+                    List<String> cacheFiles = new ArrayList<String>();
+                    List<String> shipFiles = new ArrayList<String>();
+                    UdfCacheShipFilesVisitor mapUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.mapPlan);
+                    mapUdfCacheFileVisitor.visit();
+                    cacheFiles.addAll(mapUdfCacheFileVisitor.getCacheFiles());
+                    shipFiles.addAll(mapUdfCacheFileVisitor.getShipFiles());
+
+                    UdfCacheShipFilesVisitor reduceUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.reducePlan);
+                    reduceUdfCacheFileVisitor.visit();
+                    cacheFiles.addAll(reduceUdfCacheFileVisitor.getCacheFiles());
+                    shipFiles.addAll(reduceUdfCacheFileVisitor.getShipFiles());
+
+                    setupDistributedCache(pigContext, conf, cacheFiles.toArray(new String[]{}), false);
+
                     // Setup the DistributedCache for this job
                     List<URL> allJars = new ArrayList<URL>();
 
@@ -619,6 +597,19 @@ public class JobControlCompiler{
                         }
                     }
 
+                    for (String udf : mro.UDFs) {
+                        Class clazz = pigContext.getClassForAlias(udf);
+                        if (clazz != null) {
+                            String jar = JarManager.findContainingJar(clazz);
+                            if (jar!=null) {
+                                URL jarURL = new File(jar).toURI().toURL();
+                                if (!allJars.contains(jarURL)) {
+                                    allJars.add(jarURL);
+                                }
+                            }
+                        }
+                    }
+
                     for (String scriptJar : pigContext.scriptJars) {
                         URL jar = new File(scriptJar).toURI().toURL();
                         if (!allJars.contains(jar)) {
@@ -626,6 +617,13 @@ public class JobControlCompiler{
                         }
                     }
 
+                    for (String shipFile : shipFiles) {
+                        URL jar = new File(shipFile).toURI().toURL();
+                        if (!allJars.contains(jar)) {
+                            allJars.add(jar);
+                        }
+                    }
+
                     for (String defaultJar : JarManager.getDefaultJars()) {
                         URL jar = new File(defaultJar).toURI().toURL();
                         if (!allJars.contains(jar)) {
@@ -641,7 +639,6 @@ public class JobControlCompiler{
                             }
                         }
                         if (!predeployed) {
-                            log.info("Adding jar to DistributedCache: " + jar);
                             putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
                         }
                     }
@@ -653,6 +650,37 @@ public class JobControlCompiler{
                 }
             }
 
+            for (String udf : mro.UDFs) {
+                if (udf.contains("GFCross")) {
+                    Object func = PigContext.instantiateFuncFromSpec(new FuncSpec(udf));
+                    if (func instanceof GFCross) {
+                        String crossKey = ((GFCross)func).getCrossKey();
+                        conf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey,
+                                Integer.toString(mro.getRequestedParallelism()));
+                    }
+                }
+            }
+
+            if(lds!=null && lds.size()>0){
+                for (POLoad ld : lds) {
+                    //Store the target operators for tuples read
+                    //from this input
+                    List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
+                    List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
+                    if(ldSucs!=null){
+                        for (PhysicalOperator operator2 : ldSucs) {
+                            ldSucKeys.add(operator2.getOperatorKey());
+                        }
+                    }
+                    inpTargets.add(ldSucKeys);
+                    inpSignatureLists.add(ld.getSignature());
+                    inpLimits.add(ld.getLimit());
+                    //Remove the POLoad from the plan
+                    if (!pigContext.inIllustrator)
+                        mro.mapPlan.remove(ld);
+                }
+            }
+
             if(Utils.isLocal(pigContext, conf)) {
                 ConfigurationUtil.replaceConfigForLocalMode(conf);
             }
@@ -779,10 +807,6 @@ public class JobControlCompiler{
             // serialized
             setupDistributedCacheForJoin(mro, pigContext, conf);
 
-            // Search to see if we have any UDFs that need to pack things into the
-            // distributed cache.
-            setupDistributedCacheForUdfs(mro, pigContext, conf);
-
             SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
 
             POPackage pack = null;
@@ -1022,9 +1046,9 @@ public class JobControlCompiler{
         Configuration conf = nwJob.getConfiguration();
 
         // set various parallelism into the job conf for later analysis, PIG-2779
-        conf.setInt("pig.info.reducers.default.parallel", pigContext.defaultParallel);
-        conf.setInt("pig.info.reducers.requested.parallel", mro.requestedParallelism);
-        conf.setInt("pig.info.reducers.estimated.parallel", mro.estimatedParallelism);
+        conf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pigContext.defaultParallel);
+        conf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, mro.requestedParallelism);
+        conf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, mro.estimatedParallelism);
 
         // this is for backward compatibility, and we encourage to use runtimeParallelism at runtime
         mro.requestedParallelism = jobParallelism;
@@ -1080,10 +1104,10 @@ public class JobControlCompiler{
             MapReduceOper mapReducerOper) throws IOException {
         Configuration conf = job.getConfiguration();
 
-        PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ?
+        PigReducerEstimator estimator = conf.get(PIG_EXEC_REDUCER_ESTIMATOR) == null ?
                 new InputSizeReducerEstimator() :
                     PigContext.instantiateObjectFromParams(conf,
-                            REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
+                            PIG_EXEC_REDUCER_ESTIMATOR, PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY, PigReducerEstimator.class);
 
                 log.info("Using reducer estimator: " + estimator.getClass().getName());
                 int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
@@ -1478,13 +1502,6 @@ public class JobControlCompiler{
         .visit();
     }
 
-    private void setupDistributedCacheForUdfs(MapReduceOper mro,
-            PigContext pigContext,
-            Configuration conf) throws IOException {
-        new UdfDistributedCacheVisitor(mro.mapPlan, pigContext, conf).visit();
-        new UdfDistributedCacheVisitor(mro.reducePlan, pigContext, conf).visit();
-    }
-
     private static void setupDistributedCache(PigContext pigContext,
             Configuration conf,
             Properties properties, String key,
@@ -1633,11 +1650,50 @@ public class JobControlCompiler{
         // Turn on the symlink feature
         DistributedCache.createSymlink(conf);
 
-        // REGISTER always copies locally the jar file. see PigServer.registerJar()
-        Path pathInHDFS = shipToHDFS(pigContext, conf, url);
-        // and add to the DistributedCache
-        DistributedCache.addFileToClassPath(pathInHDFS, conf);
-        pigContext.addSkipJar(url.getPath());
+        Path distCachePath = getExistingDistCacheFilePath(conf, url);
+        if (distCachePath != null) {
+            log.info("Jar file " + url + " already in DistributedCache as "
+                    + distCachePath + ". Not copying to hdfs and adding again");
+            // Path already in dist cache
+            if (!HadoopShims.isHadoopYARN()) {
+                // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
+                // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
+                // But path may only be in 'mapred.cache.files' and not be in
+                // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
+                DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
+            }
+        }
+        else {
+            // REGISTER always copies locally the jar file. see PigServer.registerJar()
+            Path pathInHDFS = shipToHDFS(pigContext, conf, url);
+            DistributedCache.addFileToClassPath(pathInHDFS, conf, FileSystem.get(conf));
+            log.info("Added jar " + url + " to DistributedCache through " + pathInHDFS);
+        }
+
+    }
+
+    private static Path getExistingDistCacheFilePath(Configuration conf, URL url) throws IOException {
+        URI[] cacheFileUris = DistributedCache.getCacheFiles(conf);
+        if (cacheFileUris != null) {
+            String fileName = url.getRef() == null ? FilenameUtils.getName(url.getPath()) : url.getRef();
+            for (URI cacheFileUri : cacheFileUris) {
+                Path path = new Path(cacheFileUri);
+                String cacheFileName = cacheFileUri.getFragment() == null ? path.getName() : cacheFileUri.getFragment();
+                // Match
+                //     - if both filenames are same and no symlinks (or)
+                //     - if both symlinks are same (or)
+                //     - symlink of existing cache file is same as the name of the new file to be added.
+                //         That would be the case when hbase-0.98.4.jar#hbase.jar is configured via Oozie
+                // and register hbase.jar is done in the pig script.
+                // If two different files are symlinked to the same name, then there is a conflict
+                // and hadoop itself does not guarantee which file will be symlinked to that name.
+                // So we are good.
+                if (fileName.equals(cacheFileName)) {
+                    return path;
+                }
+            }
+        }
+        return null;
     }
 
     private static Path getCacheStagingDir(Configuration conf) throws IOException {
@@ -1763,6 +1819,8 @@ public class JobControlCompiler{
             ArrayList<String> replicatedPath = new ArrayList<String>();
 
             FileSpec[] newReplFiles = new FileSpec[replFiles.length];
+            long maxSize = Long.valueOf(pigContext.getProperties().getProperty(
+                    PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES, "1000000000"));
 
             // the first input is not replicated
             long sizeOfReplicatedInputs = 0;
@@ -1782,7 +1840,7 @@ public class JobControlCompiler{
                         Path path = new Path(replFiles[i].getFileName());
                         FileSystem fs = path.getFileSystem(conf);
                         sizeOfReplicatedInputs +=
-                                MapRedUtil.getPathLength(fs, fs.getFileStatus(path));
+                                MapRedUtil.getPathLength(fs, fs.getFileStatus(path), maxSize);
                     }
                     newReplFiles[i] = new FileSpec(symlink,
                             (replFiles[i] == null ? null : replFiles[i].getFuncSpec()));
@@ -1790,9 +1848,7 @@ public class JobControlCompiler{
 
                 join.setReplFiles(newReplFiles);
 
-                String maxSize = pigContext.getProperties().getProperty(
-                        PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES, "1000000000");
-                if (sizeOfReplicatedInputs > Long.parseLong(maxSize)){
+                if (sizeOfReplicatedInputs > maxSize) {
                     throw new VisitorException("Replicated input files size: "
                             + sizeOfReplicatedInputs + " exceeds " +
                             PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES + ": " + maxSize);
@@ -1854,41 +1910,6 @@ public class JobControlCompiler{
         }
     }
 
-    private static class UdfDistributedCacheVisitor extends PhyPlanVisitor {
-
-        private PigContext pigContext = null;
-        private Configuration conf = null;
-
-        public UdfDistributedCacheVisitor(PhysicalPlan plan,
-                PigContext pigContext,
-                Configuration conf) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
-                    plan));
-            this.pigContext = pigContext;
-            this.conf = conf;
-        }
-
-        @Override
-        public void visitUserFunc(POUserFunc func) throws VisitorException {
-
-            // XXX Hadoop currently doesn't support distributed cache in local mode.
-            // This line will be removed after the support is added
-            if (Utils.isLocal(pigContext, conf)) return;
-
-            // set up distributed cache for files indicated by the UDF
-            String[] files = func.getCacheFiles();
-            if (files == null) return;
-
-            try {
-                setupDistributedCache(pigContext, conf, files, false);
-            } catch (IOException e) {
-                String msg = "Internal error. Distributed cache could not " +
-                        "be set up for the requested files";
-                throw new VisitorException(msg, e);
-            }
-        }
-    }
-
     private static class ParallelConstantVisitor extends PhyPlanVisitor {
 
         private int rp;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 27 12:49:54 2014
@@ -1076,7 +1076,14 @@ public class MRCompiler extends PhyPlanV
     @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{
-            nonBlocking(op);
+            if (op.isMapSideOnly() && curMROp.isMapDone()) {
+                FileSpec fSpec = getTempFileSpec();
+                MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
+                curMROp = startNew(fSpec, prevMROper);
+                curMROp.mapPlan.addAsLeaf(op);
+            } else {
+                nonBlocking(op);
+            }
             List<PhysicalPlan> plans = op.getInputPlans();
             if(plans!=null)
                 for (PhysicalPlan plan : plans) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Nov 27 12:49:54 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -637,10 +638,10 @@ public class MapReduceLauncher extends L
                 pc.getProperties().getProperty(
                         "last.input.chunksize", JoinPackager.DEFAULT_CHUNK_SIZE);
 
-        String prop = pc.getProperties().getProperty(PigConfiguration.PROP_NO_COMBINER);
+        String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_COMBINER);
         if (!pc.inIllustrator && !("true".equals(prop)))  {
             boolean doMapAgg =
-                    Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG,"false"));
+                    Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,"false"));
             CombinerOptimizer co = new CombinerOptimizer(plan, doMapAgg);
             co.visit();
             //display the warning message(s) from the CombinerOptimizer
@@ -686,7 +687,7 @@ public class MapReduceLauncher extends L
         fRem.visit();
 
         boolean isMultiQuery =
-            Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+            Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "true"));
 
         if (isMultiQuery) {
             // reduces the number of MROpers in the MR plan generated
@@ -797,13 +798,13 @@ public class MapReduceLauncher extends L
             throw new ExecException(backendException);
         }
         try {
-            TaskReport[] mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
+            Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
             if (mapRep != null) {
                 getErrorMessages(mapRep, "map", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(mapRep);
                 mapRep = null;
             }
-            TaskReport[] redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+            Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
             if (redRep != null) {
                 getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(redRep);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Nov 27 12:49:54 2014
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
@@ -28,6 +29,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -523,23 +525,32 @@ public class MapReduceOper extends Opera
     }
 
     private POCounter getCounterOperation() {
-        PhysicalOperator operator;
-        Iterator<PhysicalOperator> it =  this.mapPlan.getLeaves().iterator();
-
-        while(it.hasNext()) {
-            operator = it.next();
-            if(operator instanceof POCounter)
-                return (POCounter) operator;
+        POCounter counter = getCounterOperation(this.mapPlan);
+        if (counter == null) {
+            counter = getCounterOperation(this.reducePlan);
         }
+        return counter;
+    }
 
-        it =  this.reducePlan.getLeaves().iterator();
+    private POCounter getCounterOperation(PhysicalPlan plan) {
+        PhysicalOperator operator;
+        Iterator<PhysicalOperator> it = plan.getLeaves().iterator();
 
-        while(it.hasNext()) {
+        while (it.hasNext()) {
             operator = it.next();
-            if(operator instanceof POCounter)
+            if (operator instanceof POCounter) {
                 return (POCounter) operator;
+            } else if (operator instanceof POStore) {
+                List<PhysicalOperator> preds = plan.getPredecessors(operator);
+                if (preds != null) {
+                    for (PhysicalOperator pred : preds) {
+                        if (pred instanceof POCounter) {
+                            return (POCounter) pred;
+                        }
+                    }
+                }
+            }
         }
-
         return null;
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Nov 27 12:49:54 2014
@@ -121,6 +121,11 @@ class MultiQueryOptimizer extends MROpPl
                         + " uses customPartitioner, do not merge it");
                 continue;
             }
+            if (successor.isCounterOperation()) {
+                log.debug("Splittee " + successor.getOperatorKey().getId()
+                        + " has POCounter, do not merge it");
+                continue;
+            }
             if (isMapOnly(successor)) {
                 if (isSingleLoadMapperPlan(successor.mapPlan)
                         && isSinglePredecessor(successor)) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Nov 27 12:49:54 2014
@@ -349,6 +349,7 @@ public class PhyPlanSetter extends PhyPl
     @Override
     public void visitPreCombinerLocalRearrange(
             POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
+        super.visitPreCombinerLocalRearrange(preCombinerLocalRearrange);
         preCombinerLocalRearrange.setParentPlan(parent);
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Thu Nov 27 12:49:54 2014
@@ -122,8 +122,10 @@ public class PigBytesRawComparator exten
             if( dataByteArraysCompare ) {
               rc = WritableComparator.compareBytes(b1, offset1, length1, b2, offset2, length2);
             } else {
-              // Subtract 2, one for null byte and one for index byte
-              rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+              // Subtract 2, one for null byte and one for index byte. Also, do not reverse the sign
+              // of rc when mAsc[0] is false because BinInterSedesTupleRawComparator.compare() already
+              // takes that into account.
+              return mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
             }
         } else {
             // For sorting purposes two nulls are equal.

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Thu Nov 27 12:49:54 2014
@@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -73,6 +75,15 @@ public class PigCombiner {
         PigContext pigContext = null;
         private volatile boolean initialized = false;
 
+        static {
+            JVMReuseManager.getInstance().registerForStaticDataCleanup(Combine.class);
+        }
+
+        @StaticDataCleanup
+        public static void staticDataCleanup() {
+            firstTime = true;
+        }
+
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Thu Nov 27 12:49:54 2014
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.joda.time.DateTimeZone;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +30,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -54,6 +53,7 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
@@ -162,6 +162,7 @@ public abstract class PigGenericMapBase 
 
         Configuration job = context.getConfiguration();
         SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+        context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
         PigMapReduce.sJobContext = context;
         PigMapReduce.sJobConfInternal.set(context.getConfiguration());
         PigMapReduce.sJobConf = context.getConfiguration();
@@ -214,11 +215,7 @@ public abstract class PigGenericMapBase 
 
         log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location"));
 
-        String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
-        if (dtzStr != null && dtzStr.length() > 0) {
-            // ensure that the internal timezone is uniformly in UTC offset style
-            DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
-        }
+        Utils.setDefaultTimeZone(PigMapReduce.sJobConfInternal.get());
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Thu Nov 27 12:49:54 2014
@@ -30,7 +30,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -57,8 +60,8 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
-import org.joda.time.DateTimeZone;
 
 /**
  * This class is the static Mapper &amp; Reducer classes that
@@ -100,6 +103,17 @@ public class PigGenericMapReduce {
 
     public static ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(PigGenericMapReduce.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        sJobContext = null;
+        sJobConf = null;
+        sJobConfInternal = new ThreadLocal<Configuration>();
+    }
+
     public static class Map extends PigMapBase {
 
         @Override
@@ -306,6 +320,7 @@ public class PigGenericMapReduce {
                 pack = getPack(context);
             Configuration jConf = context.getConfiguration();
             SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+            context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
             sJobContext = context;
             sJobConfInternal.set(context.getConfiguration());
             sJobConf = context.getConfiguration();
@@ -347,11 +362,7 @@ public class PigGenericMapReduce {
 
             log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
 
-            String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
-            if (dtzStr != null && dtzStr.length() > 0) {
-                // ensure that the internal timezone is uniformly in UTC offset style
-                DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
-            }
+            Utils.setDefaultTimeZone(PigMapReduce.sJobConfInternal.get());
         }
 
         /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Thu Nov 27 12:49:54 2014
@@ -56,17 +56,11 @@ public final class PigHadoopLogger imple
         return logger;
     }
 
-    public void destroy() {
-        if (reporter != null) {
-            reporter.destroy();
-        }
-        reporter = null;
-    }
-
     public void setReporter(PigStatusReporter reporter) {
         this.reporter = reporter;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     public void warn(Object o, String msg, Enum warningEnum) {
         String className = o.getClass().getName();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Nov 27 12:49:54 2014
@@ -93,6 +93,7 @@ public class PigInputFormat extends Inpu
         Configuration conf = context.getConfiguration();
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                 .deserialize(conf.get("udf.import.list")));
+        MapRedUtil.setupUDFContext(conf);
         LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
         // Pass loader signature to LoadFunc and to InputFormat through
         // the conf

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Thu Nov 27 12:49:54 2014
@@ -60,6 +60,13 @@ public class PigMapReduceCounter {
                     pOperator = mp.getPredecessors(pOperator).get(0);
                 }
             }
+
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                reporter.incrCounter(
+                        JobControlCompiler.PIG_MAP_RANK_NAME
+                        + context.getJobID().toString(), taskID, 0);
+            }
         }
 
         /**
@@ -69,15 +76,11 @@ public class PigMapReduceCounter {
         public void collect(Context context, Tuple tuple)
         throws InterruptedException, IOException {
             context.write(null, tuple);
-            try {
-                PigStatusReporter reporter = PigStatusReporter.getInstance();
-                if (reporter != null) {
-                    reporter.incrCounter(
-                            JobControlCompiler.PIG_MAP_RANK_NAME
-                            + context.getJobID().toString(), taskID, 1);
-                }
-            } catch (Exception ex) {
-                log.error("Error on incrementer of PigMapCounter");
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                reporter.incrCounter(
+                        JobControlCompiler.PIG_MAP_RANK_NAME
+                        + context.getJobID().toString(), taskID, 1);
             }
         }
     }
@@ -116,6 +119,7 @@ public class PigMapReduceCounter {
             }
 
             this.context = context;
+            incrementCounter(0L);
         }
 
         /**
@@ -127,21 +131,14 @@ public class PigMapReduceCounter {
          * @param increment is the value to add to the corresponding global counter.
          **/
         public static void incrementCounter(Long increment) {
-            try {
-                PigStatusReporter reporter = PigStatusReporter.getInstance();
-                if (reporter != null) {
-
-                    if(leaf instanceof POCounter){
-                        reporter.incrCounter(
-                                JobControlCompiler.PIG_MAP_RANK_NAME
-                                + context.getJobID().toString(), taskID, increment);
-                    }
-
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                if(leaf instanceof POCounter){
+                    reporter.incrCounter(
+                            JobControlCompiler.PIG_MAP_RANK_NAME
+                            + context.getJobID().toString(), taskID, increment);
                 }
-            } catch (Exception ex) {
-                log.error("Error on incrementer of PigReduceCounter");
             }
-
         }
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Thu Nov 27 12:49:54 2014
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapreduce.Job;
@@ -37,7 +36,6 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -50,19 +48,19 @@ import org.apache.pig.impl.util.ObjectSe
  */
 @SuppressWarnings("unchecked")
 public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> {
-    
+
     private enum Mode { SINGLE_STORE, MULTI_STORE};
-    
+
     /** the temporary directory for the multi store */
     public static final String PIG_MAPRED_OUTPUT_DIR = "pig.mapred.output.dir";
     /** the relative path that can be used to build a temporary
      * place to store the output from a number of map-reduce tasks*/
     public static final String PIG_TMP_PATH =  "pig.tmp.path";
-    
-    List<POStore> reduceStores = null;
-    List<POStore> mapStores = null;
-    Configuration currentConf = null;
-    
+
+    protected List<POStore> reduceStores = null;
+    protected List<POStore> mapStores = null;
+    protected Configuration currentConf = null;
+
     @Override
     public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext)
                 throws IOException, InterruptedException {
@@ -97,27 +95,27 @@ public class PigOutputFormat extends Out
     @SuppressWarnings("unchecked")
     static public class PigRecordWriter
             extends RecordWriter<WritableComparable, Tuple> {
-        
+
         /**
          * the actual RecordWriter
          */
         private RecordWriter wrappedWriter;
-        
+
         /**
          * the StoreFunc for the single store
          */
         private StoreFuncInterface sFunc;
-        
+
         /**
          * Single Query or multi query
          */
         private Mode mode;
-        
-        public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc, 
+
+        public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc,
                 Mode mode)
-                throws IOException {            
+                throws IOException {
             this.mode = mode;
-            
+
             if(mode == Mode.SINGLE_STORE) {
                 this.wrappedWriter = wrappedWriter;
                 this.sFunc = sFunc;
@@ -128,7 +126,7 @@ public class PigOutputFormat extends Out
         /**
          * We only care about the values, so we are going to skip the keys when
          * we write.
-         * 
+         *
          * @see org.apache.hadoop.mapreduce.RecordWriter#write(Object, Object)
          */
         @Override
@@ -142,7 +140,7 @@ public class PigOutputFormat extends Out
         }
 
         @Override
-        public void close(TaskAttemptContext taskattemptcontext) throws 
+        public void close(TaskAttemptContext taskattemptcontext) throws
         IOException, InterruptedException {
             if(mode == Mode.SINGLE_STORE) {
                 wrappedWriter.close(taskattemptcontext);
@@ -150,24 +148,24 @@ public class PigOutputFormat extends Out
         }
 
     }
-    
+
     /**
      * Before delegating calls to underlying OutputFormat or OutputCommitter
      * Pig needs to ensure the Configuration in the JobContext contains
-     * the output location and StoreFunc 
-     * for the specific store - so set these up in the context for this specific 
+     * the output location and StoreFunc
+     * for the specific store - so set these up in the context for this specific
      * store
      * @param jobContext the {@link JobContext}
      * @param store the POStore
      * @throws IOException on failure
      */
-    public static void setLocation(JobContext jobContext, POStore store) throws 
+    public static void setLocation(JobContext jobContext, POStore store) throws
     IOException {
         Job storeJob = new Job(jobContext.getConfiguration());
         StoreFuncInterface storeFunc = store.getStoreFunc();
         String outputLocation = store.getSFile().getFileName();
         storeFunc.setStoreLocation(outputLocation, storeJob);
-        
+
         // the setStoreLocation() method would indicate to the StoreFunc
         // to set the output location for its underlying OutputFormat.
         // Typically OutputFormat's store the output location in the
@@ -176,7 +174,7 @@ public class PigOutputFormat extends Out
         // OutputFormat might have set) and merge it with the Configuration
         // we started with so that when this method returns the Configuration
         // supplied as input has the updates.
-        ConfigurationUtil.mergeConf(jobContext.getConfiguration(), 
+        ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
                 storeJob.getConfiguration());
     }
 
@@ -187,20 +185,20 @@ public class PigOutputFormat extends Out
         checkOutputSpecsHelper(reduceStores, jobcontext);
     }
 
-    private void checkOutputSpecsHelper(List<POStore> stores, JobContext 
+    private void checkOutputSpecsHelper(List<POStore> stores, JobContext
             jobcontext) throws IOException, InterruptedException {
         for (POStore store : stores) {
             // make a copy of the original JobContext so that
-            // each OutputFormat get a different copy 
+            // each OutputFormat get a different copy
             JobContext jobContextCopy = HadoopShims.createJobContext(
                     jobcontext.getConfiguration(), jobcontext.getJobID());
-            
+
             // set output location
             PigOutputFormat.setLocation(jobContextCopy, store);
-            
+
             StoreFuncInterface sFunc = store.getStoreFunc();
             OutputFormat of = sFunc.getOutputFormat();
-            
+
             // The above call should have update the conf in the JobContext
             // to have the output location - now call checkOutputSpecs()
             try {
@@ -224,23 +222,22 @@ public class PigOutputFormat extends Out
      * @param currentConf2
      * @param storeLookupKey
      * @return
-     * @throws IOException 
+     * @throws IOException
      */
-    private List<POStore> getStores(Configuration conf, String storeLookupKey) 
+    private List<POStore> getStores(Configuration conf, String storeLookupKey)
     throws IOException {
         return (List<POStore>)ObjectSerializer.deserialize(
                 conf.get(storeLookupKey));
     }
-    
-    
-    private void setupUdfEnvAndStores(JobContext jobcontext)
+
+    protected void setupUdfEnvAndStores(JobContext jobcontext)
     throws IOException{
         Configuration newConf = jobcontext.getConfiguration();
-        
-        // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside 
+
+        // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside
         // construct of PigOutputCommitter, can make use of it
         MapRedUtil.setupUDFContext(newConf);
-        
+
         // if there is a udf in the plan we would need to know the import
         // path so we can instantiate the udf. This is required because
         // we will be deserializing the POStores out of the plan in the next
@@ -261,13 +258,13 @@ public class PigOutputFormat extends Out
         // config properties have changed (eg. creating stores).
         currentConf = new Configuration(newConf);
     }
-    
+
     /**
      * Check if given property prop is same in configurations conf1, conf2
      * @param prop
      * @param conf1
      * @param conf2
-     * @return true if both are equal 
+     * @return true if both are equal
      */
     private boolean isConfPropEqual(String prop, Configuration conf1,
             Configuration conf2) {
@@ -283,10 +280,10 @@ public class PigOutputFormat extends Out
     }
 
     @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext 
+    public OutputCommitter getOutputCommitter(TaskAttemptContext
             taskattemptcontext) throws IOException, InterruptedException {
         setupUdfEnvAndStores(taskattemptcontext);
-        
+
         // we return an instance of PigOutputCommitter to Hadoop - this instance
         // will wrap the real OutputCommitter(s) belonging to the store(s)
         return new PigOutputCommitter(taskattemptcontext,

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Thu Nov 27 12:49:54 2014
@@ -17,8 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import static org.apache.pig.PigConfiguration.TIME_UDFS;
-import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY;
+import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE;
+import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE_FREQUENCY;
 import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
 
 import java.io.IOException;
@@ -119,10 +119,10 @@ public class PigRecordReader extends Rec
         idx = 0;
         this.limit = limit;
         initNextRecordReader();
-        doTiming = inputSpecificConf.getBoolean(TIME_UDFS, false);
+        doTiming = inputSpecificConf.getBoolean(PIG_UDF_PROFILE, false);
         if (doTiming) {
             counterGroup = loadFunc.toString();
-            timingFrequency = inputSpecificConf.getLong(TIME_UDFS_FREQUENCY, 100L);
+            timingFrequency = inputSpecificConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L);
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Thu Nov 27 12:49:54 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
@@ -109,7 +110,7 @@ public class WeightedRangePartitioner ex
             Map<String, Object> quantileMap = null;
             Configuration conf;
             if (!pigContext.getExecType().isLocal()) {
-                conf = new Configuration(true);
+                conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
             } else {
                 conf = new Configuration(false);
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Nov 27 12:49:54 2014
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -100,7 +102,7 @@ public abstract class PhysicalOperator e
     // Will be used by operators to report status or transmit heartbeat
     // Should be set by the backends to appropriate implementations that
     // wrap their own version of a reporter.
-    public static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
+    protected static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
 
     // Will be used by operators to aggregate warning messages
     // Should be set by the backends to appropriate implementations that
@@ -120,6 +122,10 @@ public abstract class PhysicalOperator e
 
     private List<OriginalLocation> originalLocations =  new ArrayList<OriginalLocation>();
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(PhysicalOperator.class);
+    }
+
     public PhysicalOperator(OperatorKey k) {
         this(k, -1, null);
     }
@@ -402,9 +408,9 @@ public abstract class PhysicalOperator e
     }
 
     public Result getNextDataBag() throws ExecException {
-        Result ret = null;
+        Result val = new Result();
         DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
-        for (ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
+        for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
             if (ret.returnStatus == POStatus.STATUS_ERR) {
                 return ret;
             } else if (ret.returnStatus == POStatus.STATUS_NULL) {
@@ -413,9 +419,9 @@ public abstract class PhysicalOperator e
                 tmpBag.add((Tuple) ret.result);
             }
         }
-        ret.result = tmpBag;
-        ret.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
-        return ret;
+        val.result = tmpBag;
+        val.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
+        return val;
     }
 
     public Result getNextBigInteger() throws ExecException {
@@ -451,6 +457,11 @@ public abstract class PhysicalOperator e
         PhysicalOperator.reporter.set(reporter);
     }
 
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        reporter = new ThreadLocal<PigProgressable>();
+    }
+
     /**
      * Make a deep copy of this operator. This function is blank, however,
      * we should leave a place holder so that the subclasses can clone

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Nov 27 12:49:54 2014
@@ -18,10 +18,10 @@
 
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
-import static org.apache.pig.PigConfiguration.TIME_UDFS;
-import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY;
-import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER;
+import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE;
+import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE_FREQUENCY;
 import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
+import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -37,7 +37,6 @@ import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
-import org.apache.pig.PigWarning;
 import org.apache.pig.TerminatingAccumulator;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -67,7 +66,8 @@ public class POUserFunc extends Expressi
 
     private transient String counterGroup;
     private transient EvalFunc func;
-    private transient String[] cacheFiles = null;
+    private transient List<String> cacheFiles = null;
+    private transient List<String> shipFiles = null;
 
     FuncSpec funcSpec;
     FuncSpec origFSpec;
@@ -157,10 +157,10 @@ public class POUserFunc extends Expressi
             func.setPigLogger(pigLogger);
             Configuration jobConf = UDFContext.getUDFContext().getJobConf();
             if (jobConf != null) {
-                doTiming = jobConf.getBoolean(TIME_UDFS, false);
+                doTiming = jobConf.getBoolean(PIG_UDF_PROFILE, false);
                 if (doTiming) {
                     counterGroup = funcSpec.toString();
-                    timingFrequency = jobConf.getLong(TIME_UDFS_FREQUENCY, 100L);
+                    timingFrequency = jobConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L);
                 }
             }
             // We initialize here instead of instantiateFunc because this is called
@@ -280,27 +280,6 @@ public class POUserFunc extends Expressi
         }
         try {
             if(result.returnStatus == POStatus.STATUS_OK) {
-                Tuple t = (Tuple) result.result;
-
-                // For backward compatibility, we short-circuit tuples whose
-                // size is 1 and field is null. (See PIG-3679)
-                if (t.size() == 1 && t.isNull(0)) {
-                    pigLogger.warn(this, "All the input values are null, skipping the invocation of UDF",
-                            PigWarning.SKIP_UDF_CALL_FOR_NULL);
-                    Schema outputSchema = func.outputSchema(func.getInputSchema());
-                    // If the output schema is tuple (i.e. multiple fields are
-                    // to be returned), we return a tuple where every field is
-                    // null.
-                    if (outputSchema != null && outputSchema.getField(0).type == DataType.TUPLE) {
-                        result.result = tf.newTuple(outputSchema.getField(0).schema.size());
-                    // Otherwise, we simply return null since it can be cast to
-                    // any data type.
-                    } else {
-                        result.result = null;
-                    }
-                    return result;
-                }
-
                 if (isAccumulative()) {
                     if (isAccumStarted()) {
                         if (!haveCheckedIfTerminatingAccumulator) {
@@ -554,20 +533,28 @@ public class POUserFunc extends Expressi
     public FuncSpec getFuncSpec() {
         return funcSpec;
     }
-    
+
     public void setFuncSpec(FuncSpec funcSpec) {
         this.funcSpec = funcSpec;
         instantiateFunc(funcSpec);
     }
 
-    public String[] getCacheFiles() {
+    public List<String> getCacheFiles() {
         return cacheFiles;
     }
 
-    public void setCacheFiles(String[] cf) {
+    public void setCacheFiles(List<String> cf) {
         cacheFiles = cf;
     }
 
+    public List<String> getShipFiles() {
+        return shipFiles;
+    }
+
+    public void setShipFiles(List<String> sf) {
+        shipFiles = sf;
+    }
+
     public boolean combinable() {
         return (func instanceof Algebraic);
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Nov 27 12:49:54 2014
@@ -347,7 +347,12 @@ public class PhyPlanVisitor extends Plan
      */
     public void visitPreCombinerLocalRearrange(
             POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
-        // TODO Auto-generated method stub
+        List<PhysicalPlan> inpPlans = preCombinerLocalRearrange.getPlans();
+        for (PhysicalPlan plan : inpPlans) {
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
+            popWalker();
+        }
     }
 
     public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Thu Nov 27 12:49:54 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.Arrays;
 import java.util.Map;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -48,6 +49,9 @@ public class CombinerPackager extends Pa
     private Map<Integer, Integer> keyLookup;
 
     private int numBags;
+    
+    private transient boolean initialized;
+    private transient boolean useDefaultBag;
 
     /**
      * A new POPostCombinePackage will be constructed as a near clone of the
@@ -91,15 +95,16 @@ public class CombinerPackager extends Pa
     }
 
     private DataBag createDataBag(int numBags) {
-        String bagType = null;
-        if (PigMapReduce.sJobConfInternal.get() != null) {
-            bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
-        }
-
-        if (bagType != null && bagType.equalsIgnoreCase("default")) {
-            return new NonSpillableDataBag();
+        if (!initialized) {
+            initialized = true;
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE);
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
         }
-        return new InternalCachedBag(numBags);
+        return useDefaultBag ? new NonSpillableDataBag() : new InternalCachedBag(numBags);
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Thu Nov 27 12:49:54 2014
@@ -242,9 +242,15 @@ public class MultiQueryPackager extends 
 
     @Override
     public Tuple getValueTuple(PigNullableWritable keyWritable,
-            NullableTuple ntup, int index) throws ExecException {
+            NullableTuple ntup, int origIndex) throws ExecException {
         this.keyWritable = keyWritable;
-        return packagers.get(((int) index) & idxPart).getValueTuple(
-                keyWritable, ntup, index);
+        int index = origIndex & idxPart;
+        PigNullableWritable newKey = keyWritable;
+        if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {                                       
+            Tuple tup = (Tuple)this.keyWritable.getValueAsPigType();
+            newKey = HDataType.getWritableComparableTypes(tup.get(0), packagers.get(index).getKeyType());
+            newKey.setIndex((byte)origIndex);
+        }
+        return packagers.get(index).getValueTuple(newKey, ntup, origIndex);
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Nov 27 12:49:54 2014
@@ -19,8 +19,8 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Iterator;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -38,9 +38,6 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
-import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * The collected group operator is a special operator used when users give
@@ -71,7 +68,7 @@ public class POCollectedGroup extends Ph
 
     private Object prevKey = null;
 
-    private boolean useDefaultBag = false;
+    private transient boolean useDefaultBag;
 
     public POCollectedGroup(OperatorKey k) {
         this(k, -1, null);
@@ -127,18 +124,14 @@ public class POCollectedGroup extends Ph
     @Override
     public Result getNextTuple() throws ExecException {
 
-        // Since the output is buffered, we need to flush the last
-        // set of records when the close method is called by mapper.
-        if (this.parentPlan.endOfAllInput) {
-            return getStreamCloseResult();
-        }
-
         Result inp = null;
         Result res = null;
 
         while (true) {
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP) {
+                // Since the output is buffered, we need to flush the last
+                // set of records when the close method is called by mapper.
                 if (this.parentPlan.endOfAllInput) {
                     return getStreamCloseResult();
                 } else {
@@ -172,7 +165,7 @@ public class POCollectedGroup extends Ph
             if (prevKey == null && outputBag == null) {
 
                 if (PigMapReduce.sJobConfInternal.get() != null) {
-                    String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+                    String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE);
                     if (bagType != null && bagType.equalsIgnoreCase("default")) {
                         useDefaultBag = true;
                     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Thu Nov 27 12:49:54 2014
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -42,15 +43,18 @@ import org.apache.pig.impl.plan.VisitorE
  * Find the distinct set of tuples in a bag.
  * This is a blocking operator. All the input is put in the hashset implemented
  * in DistinctDataBag which also provides the other DataBag interfaces.
- * 
- * 
+ *
+ *
  */
 public class PODistinct extends PhysicalOperator implements Cloneable {
     private static final Log log = LogFactory.getLog(PODistinct.class);
     private static final long serialVersionUID = 1L;
     private boolean inputsAccumulated = false;
     private DataBag distinctBag = null;
-    transient Iterator<Tuple> it;
+
+    private transient boolean initialized;
+    private transient boolean useDefaultBag;
+    private transient Iterator<Tuple> it;
 
     // PIG-3385: Since GlobalRearrange is not used by PODistinct, passing the
     // custom partioner through here
@@ -87,17 +91,19 @@ public class PODistinct extends Physical
     @Override
     public Result getNextTuple() throws ExecException {
          if (!inputsAccumulated) {
-            // by default, we create InternalSortedBag, unless user configures
+            // by default, we create InternalDistinctBag, unless user configures
             // explicitly to use old bag
-            String bagType = null;
-            if (PigMapReduce.sJobConfInternal.get() != null) {
-                bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type");
-            }
-            if (bagType != null && bagType.equalsIgnoreCase("default")) {
-                distinctBag = BagFactory.getInstance().newDistinctBag();
-            } else {
-                distinctBag = new InternalDistinctBag(3);
-            }
+             if (!initialized) {
+                 initialized = true;
+                 if (PigMapReduce.sJobConfInternal.get() != null) {
+                     String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_DISTINCT_TYPE);
+                     if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                         useDefaultBag = true;
+                     }
+                 }
+             }
+             distinctBag = useDefaultBag ? BagFactory.getInstance().newDistinctBag()
+                     : new InternalDistinctBag(3);
 
             Result in = processInput();
             while (in.returnStatus != POStatus.STATUS_EOP) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Thu Nov 27 12:49:54 2014
@@ -92,6 +92,10 @@ public class POForEach extends PhysicalO
 
     protected Tuple inpTuple;
 
+    // Indicate the foreach statement can only in map side
+    // Currently only used in MR cross (See PIG-4175)
+    protected boolean mapSideOnly = false;
+
     private Schema schema;
 
     public POForEach(OperatorKey k) {
@@ -274,8 +278,9 @@ public class POForEach extends PhysicalO
                                 throw new ExecException(e);
                             }
                         }else{
-                            inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
-                            //                       buffer.clear();
+                            if (buffer instanceof POPackage.POPackageTupleBuffer) {
+                                inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
+                            }
                             setAccumEnd();
                         }
 
@@ -293,7 +298,7 @@ public class POForEach extends PhysicalO
                             break;
                         }
                     }
-
+                    buffer.clear();
                 } else {
                     res = processPlan();
                 }
@@ -786,4 +791,11 @@ public class POForEach extends PhysicalO
         return planLeafOps;
     }
 
+    public void setMapSideOnly(boolean mapSideOnly) {
+        this.mapSideOnly = mapSideOnly;
+    }
+
+    public boolean isMapSideOnly() {
+        return mapSideOnly;
+    }
 }