You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC

svn commit: r1783988 [8/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Wed Feb 22 09:43:41 2017
@@ -32,7 +32,8 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.TIPStatus;
+import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.FuncSpec;
@@ -40,7 +41,6 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.PlanException;
@@ -76,7 +76,7 @@ public abstract class Launcher {
     protected Map<FileSpec, Exception> failureMap;
     protected JobControl jc = null;
 
-    class HangingJobKiller extends Thread {
+    protected class HangingJobKiller extends Thread {
         public HangingJobKiller() {}
 
         @Override
@@ -90,7 +90,6 @@ public abstract class Launcher {
     }
 
     protected Launcher() {
-        Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
         // handle the windows portion of \r
         if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
             newLine = "\r\n";
@@ -104,7 +103,6 @@ public abstract class Launcher {
     public void reset() {
         failureMap = Maps.newHashMap();
         totalHadoopTimeSpent = 0;
-        jc = null;
     }
 
     /**
@@ -179,7 +177,7 @@ public abstract class Launcher {
             String exceptionCreateFailMsg = null;
             boolean jobFailed = false;
             if (msgs.length > 0) {
-                if (HadoopShims.isJobFailed(report)) {
+                if (report.getCurrentStatus()== TIPStatus.FAILED) {
                     jobFailed = true;
                 }
                 Set<String> errorMessageSet = new HashSet<String>();
@@ -261,11 +259,30 @@ public abstract class Launcher {
 
         List<Job> runnJobs = jc.getRunningJobs();
         for (Job j : runnJobs) {
-            prog += HadoopShims.progressOfRunningJob(j);
+            prog += progressOfRunningJob(j);
         }
         return prog;
     }
 
+    /**
+     * Returns the progress of a Job j which is part of a submitted JobControl
+     * object. The progress is for this Job. So it has to be scaled down by the
+     * num of jobs that are present in the JobControl.
+     *
+     * @param j The Job for which progress is required
+     * @return Returns the percentage progress of this Job
+     * @throws IOException
+     */
+    private static double progressOfRunningJob(Job j)
+            throws IOException {
+        org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
+        try {
+            return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
+        } catch (Exception ir) {
+            return 0;
+        }
+    }
+
     public long getTotalHadoopTimeSpent() {
         return totalHadoopTimeSpent;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Wed Feb 22 09:43:41 2017
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -122,7 +123,8 @@ public class FetchLauncher {
         poStore.setUp();
 
         TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
-        HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+        //Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+        conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, taskAttemptID.getId());
 
         if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
             MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Wed Feb 22 09:43:41 2017
@@ -95,7 +95,7 @@ public class FetchPOStoreImpl extends PO
         }
         if (outputCommitter.needsTaskCommit(context))
             outputCommitter.commitTask(context);
-        HadoopShims.commitOrCleanup(outputCommitter, context);
+        outputCommitter.commitJob(context);
     }
 
     @Override
@@ -109,7 +109,7 @@ public class FetchPOStoreImpl extends PO
             }
             writer = null;
         }
-        HadoopShims.commitOrCleanup(outputCommitter, context);
+        outputCommitter.commitJob(context);
     }
 
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Wed Feb 22 09:43:41 2017
@@ -22,43 +22,48 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
-
-import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 
 /**
  * A special implementation of combiner used only for distinct.  This combiner
  * does not even parse out the records.  It just throws away duplicate values
- * in the key in order ot minimize the data being sent to the reduce.
+ * in the key in order to minimize the data being sent to the reduce.
  */
 public class DistinctCombiner {
 
-    public static class Combine 
+    public static class Combine
         extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-        
+
         private final Log log = LogFactory.getLog(getClass());
 
-        ProgressableReporter pigReporter;
-        
-        /**
-         * Configures the reporter 
-         */
+        private static boolean firstTime = true;
+
+        //@StaticDataCleanup
+        public static void staticDataCleanup() {
+            firstTime = true;
+        }
+
         @Override
         protected void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
-            pigReporter = new ProgressableReporter();
+            Configuration jConf = context.getConfiguration();
+            // Avoid log spamming
+            if (firstTime) {
+                log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
+                firstTime = false;
+            }
         }
-        
+
         /**
          * The reduce function which removes values.
          */
         @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
                 throws IOException, InterruptedException {
-            
-            pigReporter.setRep(context);
 
             // Take the first value and the key and collect
             // just that.
@@ -66,6 +71,7 @@ public class DistinctCombiner {
             NullableTuple val = iter.next();
             context.write(key, val);
         }
+
     }
-    
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Wed Feb 22 09:43:41 2017
@@ -75,16 +75,24 @@ public class FileBasedOutputSizeReader i
             return -1;
         }
 
-        long bytes = 0;
         Path p = new Path(getLocationUri(sto));
-        FileSystem fs = p.getFileSystem(conf);
-        FileStatus[] lst = fs.listStatus(p);
+        return getPathSize(p, p.getFileSystem(conf));
+    }
+
+    private long getPathSize(Path storePath, FileSystem fs) throws IOException {
+        long bytes = 0;
+        FileStatus[] lst = fs.listStatus(storePath);
         if (lst != null) {
             for (FileStatus status : lst) {
-                bytes += status.getLen();
+                if (status.isFile()) {
+                    if (status.getLen() > 0)
+                        bytes += status.getLen();
+                }
+                else { // recursively count nested leaves' (files) sizes
+                    bytes += getPathSize(status.getPath(), fs);
+                }
             }
         }
-
         return bytes;
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Wed Feb 22 09:43:41 2017
@@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i
         return reducers;
     }
 
-    static long getTotalInputFileSize(Configuration conf,
+    public static long getTotalInputFileSize(Configuration conf,
             List<POLoad> lds, Job job) throws IOException {
         return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
     }
@@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i
     /**
      * Get the input size for as many inputs as possible. Inputs that do not report
      * their size nor can pig look that up itself are excluded from this size.
-     * 
+     *
      * @param conf Configuration
      * @param lds List of POLoads
      * @param job Job

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb 22 09:43:41 2017
@@ -24,7 +24,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -61,6 +60,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
@@ -71,6 +71,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.PigJobControl;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
@@ -89,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataType;
@@ -122,6 +122,7 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
@@ -311,7 +312,7 @@ public class JobControlCompiler{
                     " should be a time in ms. default=" + defaultPigJobControlSleep, e);
         }
 
-        JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
+        JobControl jobCtrl = new PigJobControl(grpName, timeToSleep);
 
         try {
             List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -384,7 +385,7 @@ public class JobControlCompiler{
         ArrayList<Pair<String,Long>> counterPairs;
 
         try {
-            counters = HadoopShims.getCounters(job);
+            counters = MRJobStats.getCounters(job);
 
             String groupName = getGroupName(counters.getGroupNames());
             // In case that the counter group was not find, we need to find
@@ -702,7 +703,8 @@ public class JobControlCompiler{
             // since this path would be invalid for the new job being created
             pigContext.getProperties().remove("mapreduce.job.credentials.binary");
 
-            conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+            conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal());
+            conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties()));
             conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
             // this is for unit tests since some don't create PigServer
 
@@ -1671,14 +1673,6 @@ public class JobControlCompiler{
         if (distCachePath != null) {
             log.info("Jar file " + url + " already in DistributedCache as "
                     + distCachePath + ". Not copying to hdfs and adding again");
-            // Path already in dist cache
-            if (!HadoopShims.isHadoopYARN()) {
-                // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
-                // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
-                // But path may only be in 'mapred.cache.files' and not be in
-                // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
-                DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
-            }
         }
         else {
             // REGISTER always copies locally the jar file. see PigServer.registerJar()
@@ -1964,20 +1958,9 @@ public class JobControlCompiler{
 
     public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
         // the OutputFormat we report to Hadoop is always PigOutputFormat which
-        // can be wrapped with LazyOutputFormat provided if it is supported by
-        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+        // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
         if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
-            try {
-                Class<?> clazz = PigContext
-                        .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
-                Method method = clazz.getMethod("setOutputFormatClass",
-                        org.apache.hadoop.mapreduce.Job.class, Class.class);
-                method.invoke(null, job, PigOutputFormat.class);
-            } catch (Exception e) {
-                job.setOutputFormatClass(PigOutputFormat.class);
-                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
-                        + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
-            }
+            LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class);
         } else {
             job.setOutputFormatClass(PigOutputFormat.class);
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 22 09:43:41 2017
@@ -1116,7 +1116,9 @@ public class MRCompiler extends PhyPlanV
         try{
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
-            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+            if (op.getPkgr().getPackageType() == PackageType.JOIN
+                    || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+                // Bloom join is not implemented in mapreduce mode and falls back to regular join
                 curMROp.markRegularJoin();
             } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
@@ -1278,7 +1280,7 @@ public class MRCompiler extends PhyPlanV
                             List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job));
                             List<List<InputSplit>> results = MapRedUtil
                             .getCombinePigSplits(splits,
-                                    HadoopShims.getDefaultBlockSize(fs, path),
+                                    fs.getDefaultBlockSize(path),
                                     conf);
                             numFiles += results.size();
                         } else {
@@ -2432,7 +2434,7 @@ public class MRCompiler extends PhyPlanV
         }else{
             for(int i=0; i<transformPlans.size(); i++) {
                 eps1.add(transformPlans.get(i));
-                flat1.add(true);
+                flat1.add(i == transformPlans.size() - 1 ? true : false);
             }
         }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Feb 22 09:43:41 2017
@@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -40,7 +42,8 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.pig.PigConfiguration;
@@ -65,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -78,15 +82,18 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
+import org.python.google.common.collect.Lists;
+
 
 /**
  * Main class that launches pig for Map Reduce
  *
  */
-public class MapReduceLauncher extends Launcher{
+public class MapReduceLauncher extends Launcher {
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
 
@@ -94,14 +101,30 @@ public class MapReduceLauncher extends L
 
     private boolean aggregateWarning = false;
 
+    public MapReduceLauncher() {
+        super();
+        Utils.addShutdownHookWithPriority(new HangingJobKiller(),
+                PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+    }
+
     @Override
     public void kill() {
         try {
-            log.debug("Receive kill signal");
-            if (jc!=null) {
+            if (jc != null && jc.getRunningJobs().size() > 0) {
+                log.info("Received kill signal");
                 for (Job job : jc.getRunningJobs()) {
-                    HadoopShims.killJob(job);
+                    org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
+                    try {
+                        if (mrJob != null) {
+                            mrJob.killJob();
+                        }
+                    } catch (Exception ir) {
+                        throw new IOException(ir);
+                    }
                     log.info("Job " + job.getAssignedJobID() + " killed");
+                    String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                            .format(Calendar.getInstance().getTime());
+                    System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
                 }
             }
         } catch (Exception e) {
@@ -301,8 +324,7 @@ public class MapReduceLauncher extends L
                 // Now wait, till we are finished.
                 while(!jc.allFinished()){
 
-                    try { jcThread.join(sleepTime); }
-                    catch (InterruptedException e) {}
+                    jcThread.join(sleepTime);
 
                     List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
 
@@ -321,11 +343,6 @@ public class MapReduceLauncher extends L
                                 log.info("detailed locations: " + aliasLocation);
                             }
 
-                            if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) {
-                                log.info("More information at: http://" + jobTrackerLoc
-                                        + "/jobdetails.jsp?jobid=" + job.getAssignedJobID());
-                            }
-
                             // update statistics for this job so jobId is set
                             MRPigStatsUtil.addJobStats(job);
                             MRScriptState.get().emitJobStartedNotification(
@@ -475,10 +492,6 @@ public class MapReduceLauncher extends L
             for (Job job : succJobs) {
                 List<POStore> sts = jcc.getStores(job);
                 for (POStore st : sts) {
-                    if (Utils.isLocal(pc, job.getJobConf())) {
-                        HadoopShims.storeSchemaForLocal(job, st);
-                    }
-
                     if (!st.isTmpStore()) {
                         // create an "_SUCCESS" file in output location if
                         // output location is a filesystem dir
@@ -744,7 +757,7 @@ public class MapReduceLauncher extends L
     @SuppressWarnings("deprecation")
     void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) {
         try {
-            Counters counters = HadoopShims.getCounters(job);
+            Counters counters = MRJobStats.getCounters(job);
             if (counters==null)
             {
                 long nullCounterCount =
@@ -798,13 +811,13 @@ public class MapReduceLauncher extends L
             throw new ExecException(backendException);
         }
         try {
-            Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
+            Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP);
             if (mapRep != null) {
                 getErrorMessages(mapRep, "map", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(mapRep);
                 mapRep = null;
             }
-            Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+            Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE);
             if (redRep != null) {
                 getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(redRep);
@@ -822,5 +835,6 @@ public class MapReduceLauncher extends L
             throw new ExecException(e);
         }
     }
+
 }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Feb 22 09:43:41 2017
@@ -65,7 +65,10 @@ public class MapReduceOper extends Opera
     // this is needed when the key is null to create
     // an appropriate NullableXXXWritable object
     public byte mapKeyType;
-    
+
+    //record the map key types of all splittees
+    public byte[] mapKeyTypeOfSplittees;
+
     //Indicates that the map plan creation
     //is complete
     boolean mapDone = false;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Wed Feb 22 09:43:41 2017
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -580,18 +581,17 @@ class MultiQueryOptimizer extends MROpPl
     }
 
     private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
-        boolean sameKeyType = true;
-        for (MapReduceOper outer : splittees) {
-            for (MapReduceOper inner : splittees) {
-                if (inner.mapKeyType != outer.mapKeyType) {
-                    sameKeyType = false;
-                    break;
+        Set<Byte> keyTypes = new HashSet<Byte>();
+        for (MapReduceOper splittee : splittees) {
+            keyTypes.add(splittee.mapKeyType);
+            if (splittee.mapKeyTypeOfSplittees != null) {
+                for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; i++) {
+                    keyTypes.add(splittee.mapKeyTypeOfSplittees[i]);
                 }
             }
-            if (!sameKeyType) break;
-        }
 
-        return sameKeyType;
+        }
+        return keyTypes.size() == 1;
     }
 
     private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType)
@@ -1035,10 +1035,20 @@ class MultiQueryOptimizer extends MROpPl
         splitter.mapKeyType = sameKeyType ?
                 mergeList.get(0).mapKeyType : DataType.TUPLE;
 
+
+        setMapKeyTypeForSplitter(splitter,mergeList);
+
         log.info("Requested parallelism of splitter: "
                 + splitter.getRequestedParallelism());
     }
 
+    private void setMapKeyTypeForSplitter(MapReduceOper splitter, List<MapReduceOper> mergeList) {
+        splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()];
+        for (int i = 0; i < mergeList.size(); i++) {
+            splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType;
+        }
+    }
+
     private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce,
             MapReduceOper splitter, POSplit splitOp) throws VisitorException {
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Wed Feb 22 09:43:41 2017
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,9 +37,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -72,7 +75,6 @@ public class PigCombiner {
         PhysicalOperator[] roots;
         PhysicalOperator leaf;
 
-        PigContext pigContext = null;
         private volatile boolean initialized = false;
 
         //@StaticDataCleanup
@@ -91,9 +93,11 @@ public class PigCombiner {
             Configuration jConf = context.getConfiguration();
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
-                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-                if (pigContext.getLog4jProperties()!=null)
-                    PropertyConfigurator.configure(pigContext.getLog4jProperties());
+                Properties log4jProperties = (Properties) ObjectSerializer
+                        .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+                if (log4jProperties != null) {
+                    PropertyConfigurator.configure(log4jProperties);
+                }
                 UDFContext.getUDFContext().reset();
                 MapRedUtil.setupUDFContext(context.getConfiguration());
 
@@ -143,7 +147,7 @@ public class PigCombiner {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -157,7 +161,7 @@ public class PigCombiner {
             // tuples out of the getnext() call of POJoinPackage
             // In this case, we process till we see EOP from
             // POJoinPacakage.getNext()
-            if (pack.getPkgr() instanceof JoinPackager)
+            if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
             {
                 pack.attachInput(key, tupIter.iterator());
                 while (true)
@@ -268,7 +272,6 @@ public class PigCombiner {
             pigReporter = null;
             // Avoid OOM in Tez.
             PhysicalOperator.setReporter(null);
-            pigContext = null;
             roots = null;
             cp = null;
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Wed Feb 22 09:43:41 2017
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import org.apache.pig.data.SchemaTupleBa
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -88,7 +90,6 @@ public abstract class PigGenericMapBase
 
     private PhysicalOperator leaf;
 
-    PigContext pigContext = null;
     private volatile boolean initialized = false;
 
     /**
@@ -168,13 +169,15 @@ public abstract class PigGenericMapBase
         inIllustrator = inIllustrator(context);
 
         PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
-        pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
 
         // This attempts to fetch all of the generated code from the distributed cache, and resolve it
-        SchemaTupleBackend.initialize(job, pigContext);
+        SchemaTupleBackend.initialize(job);
 
-        if (pigContext.getLog4jProperties()!=null)
-            PropertyConfigurator.configure(pigContext.getLog4jProperties());
+        Properties log4jProperties = (Properties) ObjectSerializer
+                .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+        if (log4jProperties != null) {
+            PropertyConfigurator.configure(log4jProperties);
+        }
 
         if (mp == null)
             mp = (PhysicalPlan) ObjectSerializer.deserialize(
@@ -236,7 +239,7 @@ public abstract class PigGenericMapBase
             pigReporter.setRep(context);
             PhysicalOperator.setReporter(pigReporter);
 
-            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+            boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
             PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
             pigStatusReporter.setContext(new MRTaskContext(context));
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -249,8 +252,7 @@ public abstract class PigGenericMapBase
                     MapReducePOStoreImpl impl
                         = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
-                    if (!pigContext.inIllustrator)
-                        store.setUp();
+                    store.setUp();
                 }
             }
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Wed Feb 22 09:43:41 2017
@@ -287,7 +287,6 @@ public class PigGenericMapReduce {
 
         private PhysicalOperator leaf;
 
-        PigContext pigContext = null;
         protected volatile boolean initialized = false;
 
         private boolean inIllustrator = false;
@@ -319,10 +318,9 @@ public class PigGenericMapReduce {
             sJobConf = context.getConfiguration();
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
-                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
 
                 // This attempts to fetch all of the generated code from the distributed cache, and resolve it
-                SchemaTupleBackend.initialize(jConf, pigContext);
+                SchemaTupleBackend.initialize(jConf);
 
                 if (rp == null)
                     rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
@@ -377,7 +375,7 @@ public class PigGenericMapReduce {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -608,7 +606,7 @@ public class PigGenericMapReduce {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Feb 22 09:43:41 2017
@@ -17,9 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import java.util.Map;
-import java.util.WeakHashMap;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.EvalFunc;
@@ -41,7 +38,6 @@ public final class PigHadoopLogger imple
 
     private PigStatusReporter reporter = null;
     private boolean aggregate = false;
-    private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
 
     private PigHadoopLogger() {
     }
@@ -68,11 +64,6 @@ public final class PigHadoopLogger imple
 
         if (getAggregate()) {
             if (reporter != null) {
-                // log at least once
-                if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
-                    log.warn(displayMessage);
-                    msgMap.put(o, displayMessage);
-                }
                 if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
                     reporter.incrCounter(className, warningEnum.name(), 1);
                 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Wed Feb 22 09:43:41 2017
@@ -197,14 +197,11 @@ public class PigInputFormat extends Inpu
 
         ArrayList<FileSpec> inputs;
         ArrayList<ArrayList<OperatorKey>> inpTargets;
-        PigContext pigContext;
         try {
             inputs = (ArrayList<FileSpec>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUTS));
             inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUT_TARGETS));
-            pigContext = (PigContext) ObjectSerializer.deserialize(conf
-                    .get("pig.pigContext"));
             PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
             MapRedUtil.setupUDFContext(conf);
         } catch (Exception e) {
@@ -234,7 +231,7 @@ public class PigInputFormat extends Inpu
 
                 // if the execution is against Mapred DFS, set
                 // working dir to /user/<userid>
-                if(!Utils.isLocal(pigContext, conf)) {
+                if(!Utils.isLocal(conf)) {
                     fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
                 }
 
@@ -270,7 +267,7 @@ public class PigInputFormat extends Inpu
                                 jobcontext.getJobID()));
                 List<InputSplit> oneInputPigSplits = getPigSplits(
                         oneInputSplits, i, inpTargets.get(i),
-                        HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()),
+                        fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()),
                         combinable, confClone);
                 splits.addAll(oneInputPigSplits);
             } catch (ExecException ee) {

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+
+abstract public class PigMapBase extends PigGenericMapBase {
+    /**
+     * 
+     * Get mapper's illustrator context
+     * 
+     * @param conf  Configuration
+     * @param input Input bag to serve as data source
+     * @param output Map output buffer
+     * @param split the split
+     * @return Illustrator's context
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public Context getIllustratorContext(Configuration conf, DataBag input,
+          List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+          throws IOException, InterruptedException {
+    	org.apache.hadoop.mapreduce.Mapper.Context mapperContext = new WrappedMapper<Text, Tuple, PigNullableWritable, Writable>().getMapContext(new IllustratorContext(conf, input, output, split));
+        return mapperContext;
+    }
+    
+    public class IllustratorContext extends MapContextImpl<Text, Tuple, PigNullableWritable, Writable> {
+        private DataBag input;
+        List<Pair<PigNullableWritable, Writable>> output;
+        private Iterator<Tuple> it = null;
+        private Tuple value = null;
+        private boolean init  = false;
+
+        public IllustratorContext(Configuration conf, DataBag input,
+              List<Pair<PigNullableWritable, Writable>> output,
+              InputSplit split) throws IOException, InterruptedException {
+            super(conf, new TaskAttemptID(), null, null, null, new IllustrateDummyReporter(), split);
+            conf.set("inIllustrator", "true");
+            if (output == null)
+                throw new IOException("Null output can not be used");
+            this.input = input; this.output = output;
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            if (input == null) {
+                if (!init) {
+                    init = true;
+                    return true;
+                }
+                return false;
+            }
+            if (it == null)
+                it = input.iterator();
+            if (!it.hasNext())
+                return false;
+            value = it.next();
+            return true;
+        }
+        
+        @Override
+        public Text getCurrentKey() {
+          return null;
+        }
+        
+        @Override
+        public Tuple getCurrentValue() {
+          return value;
+        }
+        
+        @Override
+        public void write(PigNullableWritable key, Writable value) 
+            throws IOException, InterruptedException {
+            output.add(new Pair<PigNullableWritable, Writable>(key, value));
+        }
+        
+        @Override
+        public void progress() {
+          
+        }
+    }
+    
+    @Override
+    public boolean inIllustrator(Context context) {
+        return ((WrappedMapper.Context)context).getConfiguration().get("inIllustrator")!=null;
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.IllustratorContext;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+
+public class PigMapReduce extends PigGenericMapReduce {
+    
+    static class IllustrateReducerContext extends WrappedReducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+        public IllustratorContext 
+        getReducerContext(ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) {
+            return new IllustratorContext(reduceContext);
+        }
+        
+        public class IllustratorContext 
+            extends WrappedReducer.Context {
+            public IllustratorContext(
+                    ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) {
+                super(reduceContext);
+            }
+            public POPackage getPack() {
+                return ((Reduce.IllustratorContextImpl)reduceContext).pack;
+            }
+        }
+    }
+		  
+    public static class Reduce extends PigGenericMapReduce.Reduce {
+        /**
+         * Get reducer's illustrator context
+         * 
+         * @param input Input buffer as output by maps
+         * @param pkg package
+         * @return reducer's illustrator context
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        @Override
+        public Context getIllustratorContext(Job job,
+               List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
+        	org.apache.hadoop.mapreduce.Reducer.Context reducerContext = new IllustrateReducerContext()
+        			.getReducerContext(new IllustratorContextImpl(job, input, pkg));
+        	return reducerContext;
+        }
+        
+        @SuppressWarnings("unchecked")
+        public class IllustratorContextImpl extends ReduceContextImpl<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+            private PigNullableWritable currentKey = null, nextKey = null;
+            private NullableTuple nextValue = null;
+            private List<NullableTuple> currentValues = null;
+            private Iterator<Pair<PigNullableWritable, Writable>> it;
+            private final ByteArrayOutputStream bos;
+            private final DataOutputStream dos;
+            private final RawComparator sortComparator, groupingComparator;
+            public POPackage pack = null;
+            private IllustratorValueIterable iterable = new IllustratorValueIterable();
+
+            public IllustratorContextImpl(Job job,
+                  List<Pair<PigNullableWritable, Writable>> input,
+                  POPackage pkg
+                  ) throws IOException, InterruptedException {
+                super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
+                    null, null, null, null, new IllustrateDummyReporter(), null, PigNullableWritable.class, NullableTuple.class);
+                bos = new ByteArrayOutputStream();
+                dos = new DataOutputStream(bos);
+                org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
+                sortComparator = nwJob.getSortComparator();
+                groupingComparator = nwJob.getGroupingComparator();
+                
+                Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
+                        @Override
+                        public int compare(Pair<PigNullableWritable, Writable> o1,
+                                           Pair<PigNullableWritable, Writable> o2) {
+                            try {
+                                o1.first.write(dos);
+                                int l1 = bos.size();
+                                o2.first.write(dos);
+                                int l2 = bos.size();
+                                byte[] bytes = bos.toByteArray();
+                                bos.reset();
+                                return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
+                            } catch (IOException e) {
+                                throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
+                            }
+                        }
+                    }
+                );
+                currentValues = new ArrayList<NullableTuple>();
+                it = input.iterator();
+                if (it.hasNext()) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    nextKey = entry.first;
+                    nextValue = (NullableTuple) entry.second;
+                }
+                pack = pkg;
+            }
+            
+            public class IllustratorValueIterator implements ReduceContext.ValueIterator<NullableTuple> {
+                
+                private int pos = -1;
+                private int mark = -1;
+
+                @Override
+                public void mark() throws IOException {
+                    mark=pos-1;
+                    if (mark<-1)
+                        mark=-1;
+                }
+
+                @Override
+                public void reset() throws IOException {
+                    pos=mark;
+                }
+
+                @Override
+                public void clearMark() throws IOException {
+                    mark=-1;
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return pos<currentValues.size()-1;
+                }
+
+                @Override
+                public NullableTuple next() {
+                    pos++;
+                    return currentValues.get(pos);
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException("remove not implemented");
+                }
+
+                @Override
+                public void resetBackupStore() throws IOException {
+                    pos=-1;
+                    mark=-1;
+                }
+                
+            }
+            
+            protected class IllustratorValueIterable implements Iterable<NullableTuple> {
+                private IllustratorValueIterator iterator = new IllustratorValueIterator();
+                @Override
+                public Iterator<NullableTuple> iterator() {
+                    return iterator;
+                } 
+            }
+            
+            @Override
+            public PigNullableWritable getCurrentKey() {
+                return currentKey;
+            }
+            
+            @Override
+            public boolean nextKey() {
+                if (nextKey == null)
+                    return false;
+                currentKey = nextKey;
+                currentValues.clear();
+                currentValues.add(nextValue);
+                nextKey = null;
+                for(; it.hasNext(); ) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    /* Why can't raw comparison be used?
+                    byte[] bytes;
+                    int l1, l2;
+                    try {
+                        currentKey.write(dos);
+                        l1 = bos.size();
+                        entry.first.write(dos);
+                        l2 = bos.size();
+                        bytes = bos.toByteArray();
+                    } catch (IOException e) {
+                        throw new RuntimeException("nextKey exception : "+e.getMessage());
+                    }
+                    bos.reset();
+                    if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
+                    */
+                    if (groupingComparator.compare(currentKey, entry.first) == 0)
+                    {
+                        currentValues.add((NullableTuple)entry.second);
+                    } else {
+                        nextKey = entry.first;
+                        nextValue = (NullableTuple) entry.second;
+                        break;
+                    }
+                }
+                return true;
+            }
+            
+            @Override
+            public Iterable<NullableTuple> getValues() {
+                return iterable;
+            }
+            
+            @Override
+            public void write(PigNullableWritable k, Writable t) {
+            }
+            
+            @Override
+            public void progress() { 
+            }
+        }
+
+        @Override
+        public boolean inIllustrator(org.apache.hadoop.mapreduce.Reducer.Context context) {
+            return (context instanceof PigMapReduce.IllustrateReducerContext.IllustratorContext);
+        }
+
+        @Override
+        public POPackage getPack(org.apache.hadoop.mapreduce.Reducer.Context context) {
+            return ((PigMapReduce.IllustrateReducerContext.IllustratorContext) context).getPack();
+        }
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Wed Feb 22 09:43:41 2017
@@ -18,7 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -156,12 +155,7 @@ public class PigOutputCommitter extends
         for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
             if (mapCommitter.first!=null) {
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such method
-                    Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
-                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
-                            && (Boolean)m.invoke(mapCommitter.first);
-                } catch (NoSuchMethodException e) {
-                    allOutputCommitterSupportRecovery = false;
+                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -173,12 +167,7 @@ public class PigOutputCommitter extends
             reduceOutputCommitters) {
             if (reduceCommitter.first!=null) {
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such method
-                    Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
-                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
-                            && (Boolean)m.invoke(reduceCommitter.first);
-                } catch (NoSuchMethodException e) {
-                    allOutputCommitterSupportRecovery = false;
+                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -197,10 +186,7 @@ public class PigOutputCommitter extends
                         mapCommitter.second);
                 try {
                     // Use reflection, Hadoop 1.x line does not have such method
-                    Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
-                    m.invoke(mapCommitter.first, updatedContext);
-                } catch (NoSuchMethodException e) {
-                    // We are using Hadoop 1.x, ignore
+                    mapCommitter.first.recoverTask(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -212,11 +198,7 @@ public class PigOutputCommitter extends
                 TaskAttemptContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such method
-                    Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
-                    m.invoke(reduceCommitter.first, updatedContext);
-                } catch (NoSuchMethodException e) {
-                    // We are using Hadoop 1.x, ignore
+                    reduceCommitter.first.recoverTask(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -256,10 +238,7 @@ public class PigOutputCommitter extends
                         mapCommitter.second);
                 // PIG-2642 promote files before calling storeCleanup/storeSchema 
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
-                    m.setAccessible(true);
-                    m.invoke(mapCommitter.first, updatedContext);
+                    mapCommitter.first.commitJob(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -273,10 +252,7 @@ public class PigOutputCommitter extends
                         reduceCommitter.second);
                 // PIG-2642 promote files before calling storeCleanup/storeSchema 
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
-                    m.setAccessible(true);
-                    m.invoke(reduceCommitter.first, updatedContext);
+                    reduceCommitter.first.commitJob(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -293,10 +269,7 @@ public class PigOutputCommitter extends
                 JobContext updatedContext = setUpContext(context,
                         mapCommitter.second);
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
-                    m.setAccessible(true);
-                    m.invoke(mapCommitter.first, updatedContext, state);
+                    mapCommitter.first.abortJob(updatedContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -309,10 +282,7 @@ public class PigOutputCommitter extends
                 JobContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
-                    m.setAccessible(true);
-                    m.invoke(reduceCommitter.first, updatedContext, state);
+                    reduceCommitter.first.abortJob(updatedContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Wed Feb 22 09:43:41 2017
@@ -515,9 +515,11 @@ public class PigSplit extends InputSplit
             for (int i = 0; i < wrappedSplits.length; i++) {
                 st.append("Input split["+i+"]:\n   Length = "+ wrappedSplits[i].getLength()+"\n   ClassName: " +
                     wrappedSplits[i].getClass().getName() + "\n   Locations:\n");
-                for (String location :  wrappedSplits[i].getLocations())
-                    st.append("    "+location+"\n");
-                st.append("\n-----------------------\n");
+                if (wrappedSplits[i]!=null && wrappedSplits[i].getLocations()!=null) {
+                    for (String location :  wrappedSplits[i].getLocations())
+                        st.append("    "+location+"\n");
+                    st.append("\n-----------------------\n");
+                }
           }
         } catch (IOException e) {
           return null;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Wed Feb 22 09:43:41 2017
@@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe
     Random rGen;
     float[] probVec;
     float epsilon = 0.0001f;
-        
+
     private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
-    
-    public DiscreteProbabilitySampleGenerator(float[] probVec) {
-        rGen = new Random();
+
+    public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) {
+        rGen = new Random(seed);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         this.probVec = probVec;
-        if (1-epsilon > sum || sum > 1+epsilon) { 
+        if (1-epsilon > sum || sum > 1+epsilon) {
             LOG.info("Sum of probabilities should be near one: " + sum);
         }
     }
-    
+
     public int getNext(){
         double toss = rGen.nextDouble();
         // if the uniformly random number that I generated
@@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe
             toss -= probVec[i];
             if(toss<=0.0)
                 return i;
-        }        
+        }
         return lastIdx;
     }
-    
+
     public static void main(String[] args) {
         float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
-        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec);
+        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
         CountingMap<Integer> cm = new CountingMap<Integer>();
         for(int i=0;i<100;i++){
             cm.put(gen.getNext(), 1);
@@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe
     public String toString() {
         return Arrays.toString(probVec);
     }
-    
-    
+
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Wed Feb 22 09:43:41 2017
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,13 +30,13 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.NullableBigDecimalWritable;
 import org.apache.pig.impl.io.NullableBigIntegerWritable;
@@ -52,7 +51,6 @@ import org.apache.pig.impl.io.NullableTe
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
@@ -62,7 +60,6 @@ public class WeightedRangePartitioner ex
             new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
     protected PigNullableWritable[] quantiles;
     protected RawComparator<PigNullableWritable> comparator;
-    private PigContext pigContext;
     protected Configuration job;
 
     protected boolean inited = false;
@@ -93,11 +90,6 @@ public class WeightedRangePartitioner ex
     @SuppressWarnings("unchecked")
     public void init() {
         weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
-        try {
-            pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to deserialize pig context: ", e);
-        }
 
         String quantilesFile = job.get("pig.quantilesFile", "");
         if (quantilesFile.length() == 0) {
@@ -109,10 +101,10 @@ public class WeightedRangePartitioner ex
             // use local file system to get the quantilesFile
             Map<String, Object> quantileMap = null;
             Configuration conf;
-            if (!pigContext.getExecType().isLocal()) {
-                conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
-            } else {
+            if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)) {
                 conf = new Configuration(false);
+            } else {
+                conf = new Configuration(job);
             }
             if (job.get("fs.file.impl") != null) {
                 conf.set("fs.file.impl", job.get("fs.file.impl"));
@@ -138,11 +130,13 @@ public class WeightedRangePartitioner ex
                 DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
                 InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
                 convertToArray(quantilesList);
+                long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode();
+                long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
                 for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                     Tuple key = (Tuple)ent.getKey(); // sample item which repeats
                     float[] probVec = getProbVec((Tuple)ent.getValue());
                     weightedParts.put(getPigNullableWritable(key),
-                            new DiscreteProbabilitySampleGenerator(probVec));
+                            new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
                 }
             }
             // else - the quantiles file is empty - unless we have a bug, the

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Wed Feb 22 09:43:41 2017
@@ -21,14 +21,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -105,7 +107,7 @@ public class EndOfAllInputSetter extends
         public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
             endOfAllInputFlag = true;
         }
-        
+
         @Override
         public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
             endOfAllInputFlag = true;
@@ -122,6 +124,13 @@ public class EndOfAllInputSetter extends
             }
         }
 
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
+            if (lr instanceof POBuildBloomRearrangeTez) {
+                endOfAllInputFlag = true;
+            }
+            super.visitLocalRearrange(lr);
+        }
 
         /**
          * @return if end of all input is present

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Wed Feb 22 09:43:41 2017
@@ -27,7 +27,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -43,7 +43,7 @@ public class MRPrinter extends MROpPlanV
      * @param plan MR plan to print
      */
     public MRPrinter(PrintStream ps, MROperPlan plan) {
-        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+        super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, true));
         mStream = ps;
         mStream.println("#--------------------------------------------------");
         mStream.println("# Map Reduce Plan                                  ");

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Feb 22 09:43:41 2017
@@ -441,6 +441,10 @@ public abstract class PhysicalOperator e
     public void reset() {
     }
 
+    public boolean isEndOfAllInput() {
+        return parentPlan.endOfAllInput;
+    }
+
     /**
      * @return PigProgressable stored in threadlocal
      */

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Wed Feb 22 09:43:41 2017
@@ -19,7 +19,10 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.math.RoundingMode;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -36,6 +39,8 @@ public class Divide extends BinaryExpres
      *
      */
     private static final long serialVersionUID = 1L;
+    public static final short BIGDECIMAL_MINIMAL_SCALE = 6;
+    private static final Log LOG = LogFactory.getLog(Divide.class);
 
     public Divide(OperatorKey k) {
         super(k);
@@ -72,12 +77,22 @@ public class Divide extends BinaryExpres
         case DataType.BIGINTEGER:
             return ((BigInteger) a).divide((BigInteger) b);
         case DataType.BIGDECIMAL:
-            return ((BigDecimal) a).divide((BigDecimal) b);
+            return bigDecimalDivideWithScale(a, b);
         default:
             throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
         }
     }
 
+    private Number bigDecimalDivideWithScale(Number a, Number b) {
+        // Using same result scaling as Hive. See Arithmetic Rules:
+        //   https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf
+        int resultScale = Math.max(BIGDECIMAL_MINIMAL_SCALE, ((BigDecimal)a).scale() + ((BigDecimal)b).precision() + 1);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("For bigdecimal divide: using " + resultScale + " as result scale.");
+        }
+        return ((BigDecimal)a).divide((BigDecimal)b, resultScale, RoundingMode.HALF_UP);
+    }
+
     /*
      * This method is used to invoke the appropriate method, as Java does not provide generic
      * dispatch for it.