You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC

svn commit: r1784224 [4/17] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Feb 24 03:34:37 2017
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -60,7 +61,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
@@ -71,7 +71,6 @@ import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.PigJobControl;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
@@ -90,6 +89,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataType;
@@ -122,7 +122,6 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
@@ -312,7 +311,7 @@ public class JobControlCompiler{
                     " should be a time in ms. default=" + defaultPigJobControlSleep, e);
         }
 
-        JobControl jobCtrl = new PigJobControl(grpName, timeToSleep);
+        JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
 
         try {
             List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -385,7 +384,7 @@ public class JobControlCompiler{
         ArrayList<Pair<String,Long>> counterPairs;
 
         try {
-            counters = MRJobStats.getCounters(job);
+            counters = HadoopShims.getCounters(job);
 
             String groupName = getGroupName(counters.getGroupNames());
             // In case that the counter group was not find, we need to find
@@ -703,8 +702,7 @@ public class JobControlCompiler{
             // since this path would be invalid for the new job being created
             pigContext.getProperties().remove("mapreduce.job.credentials.binary");
 
-            conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal());
-            conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties()));
+            conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
             conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
             // this is for unit tests since some don't create PigServer
 
@@ -1673,6 +1671,14 @@ public class JobControlCompiler{
         if (distCachePath != null) {
             log.info("Jar file " + url + " already in DistributedCache as "
                     + distCachePath + ". Not copying to hdfs and adding again");
+            // Path already in dist cache
+            if (!HadoopShims.isHadoopYARN()) {
+                // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth.
+                // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files
+                // But path may only be in 'mapred.cache.files' and not be in
+                // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there
+                DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf));
+            }
         }
         else {
             // REGISTER always copies locally the jar file. see PigServer.registerJar()
@@ -1958,9 +1964,20 @@ public class JobControlCompiler{
 
     public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
         // the OutputFormat we report to Hadoop is always PigOutputFormat which
-        // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
+        // can be wrapped with LazyOutputFormat provided if it is supported by
+        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
         if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
-            LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class);
+            try {
+                Class<?> clazz = PigContext
+                        .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+                Method method = clazz.getMethod("setOutputFormatClass",
+                        org.apache.hadoop.mapreduce.Job.class, Class.class);
+                method.invoke(null, job, PigOutputFormat.class);
+            } catch (Exception e) {
+                job.setOutputFormatClass(PigOutputFormat.class);
+                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+                        + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+            }
         } else {
             job.setOutputFormatClass(PigOutputFormat.class);
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Feb 24 03:34:37 2017
@@ -1116,9 +1116,7 @@ public class MRCompiler extends PhyPlanV
         try{
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
-            if (op.getPkgr().getPackageType() == PackageType.JOIN
-                    || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
-                // Bloom join is not implemented in mapreduce mode and falls back to regular join
+            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
                 curMROp.markRegularJoin();
             } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
@@ -1280,7 +1278,7 @@ public class MRCompiler extends PhyPlanV
                             List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job));
                             List<List<InputSplit>> results = MapRedUtil
                             .getCombinePigSplits(splits,
-                                    fs.getDefaultBlockSize(path),
+                                    HadoopShims.getDefaultBlockSize(fs, path),
                                     conf);
                             numFiles += results.size();
                         } else {
@@ -2434,7 +2432,7 @@ public class MRCompiler extends PhyPlanV
         }else{
             for(int i=0; i<transformPlans.size(); i++) {
                 eps1.add(transformPlans.get(i));
-                flat1.add(i == transformPlans.size() - 1 ? true : false);
+                flat1.add(true);
             }
         }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb 24 03:34:37 2017
@@ -19,9 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.PrintStream;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -42,8 +40,7 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.pig.PigConfiguration;
@@ -68,7 +65,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -82,18 +78,15 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
-import org.python.google.common.collect.Lists;
-
 
 /**
  * Main class that launches pig for Map Reduce
  *
  */
-public class MapReduceLauncher extends Launcher {
+public class MapReduceLauncher extends Launcher{
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
 
@@ -101,30 +94,14 @@ public class MapReduceLauncher extends L
 
     private boolean aggregateWarning = false;
 
-    public MapReduceLauncher() {
-        super();
-        Utils.addShutdownHookWithPriority(new HangingJobKiller(),
-                PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
-    }
-
     @Override
     public void kill() {
         try {
-            if (jc != null && jc.getRunningJobs().size() > 0) {
-                log.info("Received kill signal");
+            log.debug("Receive kill signal");
+            if (jc!=null) {
                 for (Job job : jc.getRunningJobs()) {
-                    org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
-                    try {
-                        if (mrJob != null) {
-                            mrJob.killJob();
-                        }
-                    } catch (Exception ir) {
-                        throw new IOException(ir);
-                    }
+                    HadoopShims.killJob(job);
                     log.info("Job " + job.getAssignedJobID() + " killed");
-                    String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-                            .format(Calendar.getInstance().getTime());
-                    System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
                 }
             }
         } catch (Exception e) {
@@ -324,7 +301,8 @@ public class MapReduceLauncher extends L
                 // Now wait, till we are finished.
                 while(!jc.allFinished()){
 
-                    jcThread.join(sleepTime);
+                    try { jcThread.join(sleepTime); }
+                    catch (InterruptedException e) {}
 
                     List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
 
@@ -343,6 +321,11 @@ public class MapReduceLauncher extends L
                                 log.info("detailed locations: " + aliasLocation);
                             }
 
+                            if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) {
+                                log.info("More information at: http://" + jobTrackerLoc
+                                        + "/jobdetails.jsp?jobid=" + job.getAssignedJobID());
+                            }
+
                             // update statistics for this job so jobId is set
                             MRPigStatsUtil.addJobStats(job);
                             MRScriptState.get().emitJobStartedNotification(
@@ -492,6 +475,10 @@ public class MapReduceLauncher extends L
             for (Job job : succJobs) {
                 List<POStore> sts = jcc.getStores(job);
                 for (POStore st : sts) {
+                    if (Utils.isLocal(pc, job.getJobConf())) {
+                        HadoopShims.storeSchemaForLocal(job, st);
+                    }
+
                     if (!st.isTmpStore()) {
                         // create an "_SUCCESS" file in output location if
                         // output location is a filesystem dir
@@ -757,7 +744,7 @@ public class MapReduceLauncher extends L
     @SuppressWarnings("deprecation")
     void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) {
         try {
-            Counters counters = MRJobStats.getCounters(job);
+            Counters counters = HadoopShims.getCounters(job);
             if (counters==null)
             {
                 long nullCounterCount =
@@ -811,13 +798,13 @@ public class MapReduceLauncher extends L
             throw new ExecException(backendException);
         }
         try {
-            Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP);
+            Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
             if (mapRep != null) {
                 getErrorMessages(mapRep, "map", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(mapRep);
                 mapRep = null;
             }
-            Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE);
+            Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
             if (redRep != null) {
                 getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(redRep);
@@ -835,6 +822,5 @@ public class MapReduceLauncher extends L
             throw new ExecException(e);
         }
     }
-
 }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Feb 24 03:34:37 2017
@@ -65,10 +65,7 @@ public class MapReduceOper extends Opera
     // this is needed when the key is null to create
     // an appropriate NullableXXXWritable object
     public byte mapKeyType;
-
-    //record the map key types of all splittees
-    public byte[] mapKeyTypeOfSplittees;
-
+    
     //Indicates that the map plan creation
     //is complete
     boolean mapDone = false;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Fri Feb 24 03:34:37 2017
@@ -18,7 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -581,17 +580,18 @@ class MultiQueryOptimizer extends MROpPl
     }
 
     private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
-        Set<Byte> keyTypes = new HashSet<Byte>();
-        for (MapReduceOper splittee : splittees) {
-            keyTypes.add(splittee.mapKeyType);
-            if (splittee.mapKeyTypeOfSplittees != null) {
-                for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; i++) {
-                    keyTypes.add(splittee.mapKeyTypeOfSplittees[i]);
+        boolean sameKeyType = true;
+        for (MapReduceOper outer : splittees) {
+            for (MapReduceOper inner : splittees) {
+                if (inner.mapKeyType != outer.mapKeyType) {
+                    sameKeyType = false;
+                    break;
                 }
             }
-
+            if (!sameKeyType) break;
         }
-        return keyTypes.size() == 1;
+
+        return sameKeyType;
     }
 
     private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType)
@@ -1035,20 +1035,10 @@ class MultiQueryOptimizer extends MROpPl
         splitter.mapKeyType = sameKeyType ?
                 mergeList.get(0).mapKeyType : DataType.TUPLE;
 
-
-        setMapKeyTypeForSplitter(splitter,mergeList);
-
         log.info("Requested parallelism of splitter: "
                 + splitter.getRequestedParallelism());
     }
 
-    private void setMapKeyTypeForSplitter(MapReduceOper splitter, List<MapReduceOper> mergeList) {
-        splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()];
-        for (int i = 0; i < mergeList.size(); i++) {
-            splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType;
-        }
-    }
-
     private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce,
             MapReduceOper splitter, POSplit splitOp) throws VisitorException {
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Feb 24 03:34:37 2017
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,11 +36,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -75,6 +72,7 @@ public class PigCombiner {
         PhysicalOperator[] roots;
         PhysicalOperator leaf;
 
+        PigContext pigContext = null;
         private volatile boolean initialized = false;
 
         //@StaticDataCleanup
@@ -93,11 +91,9 @@ public class PigCombiner {
             Configuration jConf = context.getConfiguration();
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
-                Properties log4jProperties = (Properties) ObjectSerializer
-                        .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
-                if (log4jProperties != null) {
-                    PropertyConfigurator.configure(log4jProperties);
-                }
+                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
+                if (pigContext.getLog4jProperties()!=null)
+                    PropertyConfigurator.configure(pigContext.getLog4jProperties());
                 UDFContext.getUDFContext().reset();
                 MapRedUtil.setupUDFContext(context.getConfiguration());
 
@@ -147,7 +143,7 @@ public class PigCombiner {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -161,7 +157,7 @@ public class PigCombiner {
             // tuples out of the getnext() call of POJoinPackage
             // In this case, we process till we see EOP from
             // POJoinPacakage.getNext()
-            if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager)
+            if (pack.getPkgr() instanceof JoinPackager)
             {
                 pack.attachInput(key, tupIter.iterator());
                 while (true)
@@ -272,6 +268,7 @@ public class PigCombiner {
             pigReporter = null;
             // Avoid OOM in Tez.
             PhysicalOperator.setReporter(null);
+            pigContext = null;
             roots = null;
             cp = null;
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri Feb 24 03:34:37 2017
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,7 +45,6 @@ import org.apache.pig.data.SchemaTupleBa
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -90,6 +88,7 @@ public abstract class PigGenericMapBase
 
     private PhysicalOperator leaf;
 
+    PigContext pigContext = null;
     private volatile boolean initialized = false;
 
     /**
@@ -169,15 +168,13 @@ public abstract class PigGenericMapBase
         inIllustrator = inIllustrator(context);
 
         PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
+        pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
 
         // This attempts to fetch all of the generated code from the distributed cache, and resolve it
-        SchemaTupleBackend.initialize(job);
+        SchemaTupleBackend.initialize(job, pigContext);
 
-        Properties log4jProperties = (Properties) ObjectSerializer
-                .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
-        if (log4jProperties != null) {
-            PropertyConfigurator.configure(log4jProperties);
-        }
+        if (pigContext.getLog4jProperties()!=null)
+            PropertyConfigurator.configure(pigContext.getLog4jProperties());
 
         if (mp == null)
             mp = (PhysicalPlan) ObjectSerializer.deserialize(
@@ -239,7 +236,7 @@ public abstract class PigGenericMapBase
             pigReporter.setRep(context);
             PhysicalOperator.setReporter(pigReporter);
 
-            boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
+            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
             PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
             pigStatusReporter.setContext(new MRTaskContext(context));
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -252,7 +249,8 @@ public abstract class PigGenericMapBase
                     MapReducePOStoreImpl impl
                         = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
-                    store.setUp();
+                    if (!pigContext.inIllustrator)
+                        store.setUp();
                 }
             }
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri Feb 24 03:34:37 2017
@@ -287,6 +287,7 @@ public class PigGenericMapReduce {
 
         private PhysicalOperator leaf;
 
+        PigContext pigContext = null;
         protected volatile boolean initialized = false;
 
         private boolean inIllustrator = false;
@@ -318,9 +319,10 @@ public class PigGenericMapReduce {
             sJobConf = context.getConfiguration();
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
+                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
 
                 // This attempts to fetch all of the generated code from the distributed cache, and resolve it
-                SchemaTupleBackend.initialize(jConf);
+                SchemaTupleBackend.initialize(jConf, pigContext);
 
                 if (rp == null)
                     rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
@@ -375,7 +377,7 @@ public class PigGenericMapReduce {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -606,7 +608,7 @@ public class PigGenericMapReduce {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Fri Feb 24 03:34:37 2017
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.util.Map;
+import java.util.WeakHashMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.EvalFunc;
@@ -38,6 +41,7 @@ public final class PigHadoopLogger imple
 
     private PigStatusReporter reporter = null;
     private boolean aggregate = false;
+    private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
 
     private PigHadoopLogger() {
     }
@@ -64,6 +68,11 @@ public final class PigHadoopLogger imple
 
         if (getAggregate()) {
             if (reporter != null) {
+                // log at least once
+                if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
+                    log.warn(displayMessage);
+                    msgMap.put(o, displayMessage);
+                }
                 if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
                     reporter.incrCounter(className, warningEnum.name(), 1);
                 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Feb 24 03:34:37 2017
@@ -197,11 +197,14 @@ public class PigInputFormat extends Inpu
 
         ArrayList<FileSpec> inputs;
         ArrayList<ArrayList<OperatorKey>> inpTargets;
+        PigContext pigContext;
         try {
             inputs = (ArrayList<FileSpec>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUTS));
             inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUT_TARGETS));
+            pigContext = (PigContext) ObjectSerializer.deserialize(conf
+                    .get("pig.pigContext"));
             PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
             MapRedUtil.setupUDFContext(conf);
         } catch (Exception e) {
@@ -231,7 +234,7 @@ public class PigInputFormat extends Inpu
 
                 // if the execution is against Mapred DFS, set
                 // working dir to /user/<userid>
-                if(!Utils.isLocal(conf)) {
+                if(!Utils.isLocal(pigContext, conf)) {
                     fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
                 }
 
@@ -267,7 +270,7 @@ public class PigInputFormat extends Inpu
                                 jobcontext.getJobID()));
                 List<InputSplit> oneInputPigSplits = getPigSplits(
                         oneInputSplits, i, inpTargets.get(i),
-                        fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()),
+                        HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()),
                         combinable, confClone);
                 splits.addAll(oneInputPigSplits);
             } catch (ExecException ee) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Fri Feb 24 03:34:37 2017
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -155,7 +156,12 @@ public class PigOutputCommitter extends
         for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
             if (mapCommitter.first!=null) {
                 try {
-                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported();
+                    // Use reflection, Hadoop 1.x line does not have such method
+                    Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
+                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
+                            && (Boolean)m.invoke(mapCommitter.first);
+                } catch (NoSuchMethodException e) {
+                    allOutputCommitterSupportRecovery = false;
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -167,7 +173,12 @@ public class PigOutputCommitter extends
             reduceOutputCommitters) {
             if (reduceCommitter.first!=null) {
                 try {
-                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported();
+                    // Use reflection, Hadoop 1.x line does not have such method
+                    Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
+                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
+                            && (Boolean)m.invoke(reduceCommitter.first);
+                } catch (NoSuchMethodException e) {
+                    allOutputCommitterSupportRecovery = false;
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -186,7 +197,10 @@ public class PigOutputCommitter extends
                         mapCommitter.second);
                 try {
                     // Use reflection, Hadoop 1.x line does not have such method
-                    mapCommitter.first.recoverTask(updatedContext);
+                    Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
+                    m.invoke(mapCommitter.first, updatedContext);
+                } catch (NoSuchMethodException e) {
+                    // We are using Hadoop 1.x, ignore
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -198,7 +212,11 @@ public class PigOutputCommitter extends
                 TaskAttemptContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
                 try {
-                    reduceCommitter.first.recoverTask(updatedContext);
+                    // Use reflection, Hadoop 1.x line does not have such method
+                    Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
+                    m.invoke(reduceCommitter.first, updatedContext);
+                } catch (NoSuchMethodException e) {
+                    // We are using Hadoop 1.x, ignore
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -238,7 +256,10 @@ public class PigOutputCommitter extends
                         mapCommitter.second);
                 // PIG-2642 promote files before calling storeCleanup/storeSchema 
                 try {
-                    mapCommitter.first.commitJob(updatedContext);
+                    // Use reflection, 20.2 does not have such method
+                    Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
+                    m.setAccessible(true);
+                    m.invoke(mapCommitter.first, updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -252,7 +273,10 @@ public class PigOutputCommitter extends
                         reduceCommitter.second);
                 // PIG-2642 promote files before calling storeCleanup/storeSchema 
                 try {
-                    reduceCommitter.first.commitJob(updatedContext);
+                    // Use reflection, 20.2 does not have such method
+                    Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
+                    m.setAccessible(true);
+                    m.invoke(reduceCommitter.first, updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -269,7 +293,10 @@ public class PigOutputCommitter extends
                 JobContext updatedContext = setUpContext(context,
                         mapCommitter.second);
                 try {
-                    mapCommitter.first.abortJob(updatedContext, state);
+                    // Use reflection, 20.2 does not have such method
+                    Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
+                    m.setAccessible(true);
+                    m.invoke(mapCommitter.first, updatedContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -282,7 +309,10 @@ public class PigOutputCommitter extends
                 JobContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
                 try {
-                    reduceCommitter.first.abortJob(updatedContext, state);
+                    // Use reflection, 20.2 does not have such method
+                    Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
+                    m.setAccessible(true);
+                    m.invoke(reduceCommitter.first, updatedContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Feb 24 03:34:37 2017
@@ -515,11 +515,9 @@ public class PigSplit extends InputSplit
             for (int i = 0; i < wrappedSplits.length; i++) {
                 st.append("Input split["+i+"]:\n   Length = "+ wrappedSplits[i].getLength()+"\n   ClassName: " +
                     wrappedSplits[i].getClass().getName() + "\n   Locations:\n");
-                if (wrappedSplits[i]!=null && wrappedSplits[i].getLocations()!=null) {
-                    for (String location :  wrappedSplits[i].getLocations())
-                        st.append("    "+location+"\n");
-                    st.append("\n-----------------------\n");
-                }
+                for (String location :  wrappedSplits[i].getLocations())
+                    st.append("    "+location+"\n");
+                st.append("\n-----------------------\n");
           }
         } catch (IOException e) {
           return null;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Fri Feb 24 03:34:37 2017
@@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe
     Random rGen;
     float[] probVec;
     float epsilon = 0.0001f;
-
+        
     private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
-
-    public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) {
-        rGen = new Random(seed);
+    
+    public DiscreteProbabilitySampleGenerator(float[] probVec) {
+        rGen = new Random();
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         this.probVec = probVec;
-        if (1-epsilon > sum || sum > 1+epsilon) {
+        if (1-epsilon > sum || sum > 1+epsilon) { 
             LOG.info("Sum of probabilities should be near one: " + sum);
         }
     }
-
+    
     public int getNext(){
         double toss = rGen.nextDouble();
         // if the uniformly random number that I generated
@@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe
             toss -= probVec[i];
             if(toss<=0.0)
                 return i;
-        }
+        }        
         return lastIdx;
     }
-
+    
     public static void main(String[] args) {
         float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
-        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
+        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec);
         CountingMap<Integer> cm = new CountingMap<Integer>();
         for(int i=0;i<100;i++){
             cm.put(gen.getNext(), 1);
@@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe
     public String toString() {
         return Arrays.toString(probVec);
     }
-
-
+    
+    
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Feb 24 03:34:37 2017
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -30,13 +31,13 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.NullableBigDecimalWritable;
 import org.apache.pig.impl.io.NullableBigIntegerWritable;
@@ -51,6 +52,7 @@ import org.apache.pig.impl.io.NullableTe
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
@@ -60,6 +62,7 @@ public class WeightedRangePartitioner ex
             new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
     protected PigNullableWritable[] quantiles;
     protected RawComparator<PigNullableWritable> comparator;
+    private PigContext pigContext;
     protected Configuration job;
 
     protected boolean inited = false;
@@ -90,6 +93,11 @@ public class WeightedRangePartitioner ex
     @SuppressWarnings("unchecked")
     public void init() {
         weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+        try {
+            pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to deserialize pig context: ", e);
+        }
 
         String quantilesFile = job.get("pig.quantilesFile", "");
         if (quantilesFile.length() == 0) {
@@ -101,10 +109,10 @@ public class WeightedRangePartitioner ex
             // use local file system to get the quantilesFile
             Map<String, Object> quantileMap = null;
             Configuration conf;
-            if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)) {
-                conf = new Configuration(false);
+            if (!pigContext.getExecType().isLocal()) {
+                conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
             } else {
-                conf = new Configuration(job);
+                conf = new Configuration(false);
             }
             if (job.get("fs.file.impl") != null) {
                 conf.set("fs.file.impl", job.get("fs.file.impl"));
@@ -130,13 +138,11 @@ public class WeightedRangePartitioner ex
                 DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
                 InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
                 convertToArray(quantilesList);
-                long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode();
-                long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
                 for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                     Tuple key = (Tuple)ent.getKey(); // sample item which repeats
                     float[] probVec = getProbVec((Tuple)ent.getValue());
                     weightedParts.put(getPigNullableWritable(key),
-                            new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
+                            new DiscreteProbabilitySampleGenerator(probVec));
                 }
             }
             // else - the quantiles file is empty - unless we have a bug, the

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri Feb 24 03:34:37 2017
@@ -21,16 +21,14 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -107,7 +105,7 @@ public class EndOfAllInputSetter extends
         public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
             endOfAllInputFlag = true;
         }
-
+        
         @Override
         public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
             endOfAllInputFlag = true;
@@ -124,13 +122,6 @@ public class EndOfAllInputSetter extends
             }
         }
 
-        @Override
-        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
-            if (lr instanceof POBuildBloomRearrangeTez) {
-                endOfAllInputFlag = true;
-            }
-            super.visitLocalRearrange(lr);
-        }
 
         /**
          * @return if end of all input is present

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Fri Feb 24 03:34:37 2017
@@ -27,7 +27,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -43,7 +43,7 @@ public class MRPrinter extends MROpPlanV
      * @param plan MR plan to print
      */
     public MRPrinter(PrintStream ps, MROperPlan plan) {
-        super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, true));
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
         mStream = ps;
         mStream.println("#--------------------------------------------------");
         mStream.println("# Map Reduce Plan                                  ");

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Feb 24 03:34:37 2017
@@ -441,10 +441,6 @@ public abstract class PhysicalOperator e
     public void reset() {
     }
 
-    public boolean isEndOfAllInput() {
-        return parentPlan.endOfAllInput;
-    }
-
     /**
      * @return PigProgressable stored in threadlocal
      */

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Fri Feb 24 03:34:37 2017
@@ -19,10 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.math.RoundingMode;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -39,8 +36,6 @@ public class Divide extends BinaryExpres
      *
      */
     private static final long serialVersionUID = 1L;
-    public static final short BIGDECIMAL_MINIMAL_SCALE = 6;
-    private static final Log LOG = LogFactory.getLog(Divide.class);
 
     public Divide(OperatorKey k) {
         super(k);
@@ -77,22 +72,12 @@ public class Divide extends BinaryExpres
         case DataType.BIGINTEGER:
             return ((BigInteger) a).divide((BigInteger) b);
         case DataType.BIGDECIMAL:
-            return bigDecimalDivideWithScale(a, b);
+            return ((BigDecimal) a).divide((BigDecimal) b);
         default:
             throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType));
         }
     }
 
-    private Number bigDecimalDivideWithScale(Number a, Number b) {
-        // Using same result scaling as Hive. See Arithmetic Rules:
-        //   https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf
-        int resultScale = Math.max(BIGDECIMAL_MINIMAL_SCALE, ((BigDecimal)a).scale() + ((BigDecimal)b).precision() + 1);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("For bigdecimal divide: using " + resultScale + " as result scale.");
-        }
-        return ((BigDecimal)a).divide((BigDecimal)b, resultScale, RoundingMode.HALF_UP);
-    }
-
     /*
      * This method is used to invoke the appropriate method, as Java does not provide generic
      * dispatch for it.

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Fri Feb 24 03:34:37 2017
@@ -28,7 +28,6 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -90,8 +89,6 @@ public class POCast extends ExpressionOp
                 caster = ((LoadFunc)obj).getLoadCaster();
             } else if (obj instanceof StreamToPig) {
                 caster = ((StreamToPig)obj).getLoadCaster();
-            } else if (obj instanceof EvalFunc) {
-                caster = ((EvalFunc)obj).getLoadCaster();
             } else {
                 throw new IOException("Invalid class type "
                         + funcSpec.getClassName());
@@ -168,7 +165,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBigInteger(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "BigInteger.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -284,7 +281,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBigDecimal(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "BigDecimal.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -399,7 +396,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBoolean(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "boolean for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "boolean.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -513,7 +510,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToInteger(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "int.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -639,7 +636,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToLong(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "long.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -762,7 +759,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToDouble(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "double.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -884,7 +881,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToFloat(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "float.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1010,7 +1007,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToDateTime(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "datetime.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1121,7 +1118,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToCharArray(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "string for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "string.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1273,7 +1270,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToTuple(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "tuple.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1335,7 +1332,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBag(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "bag.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1366,7 +1363,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToTuple(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "tuple.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1391,7 +1388,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToMap(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "tuple.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1405,7 +1402,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBoolean(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "int.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1444,7 +1441,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToInteger(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "int.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1490,7 +1487,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToDouble(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "double.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1536,7 +1533,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToLong(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "long.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1582,7 +1579,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToFloat(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "float.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1628,7 +1625,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToDateTime(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "datetime.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1667,7 +1664,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToCharArray(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "float.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1715,7 +1712,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBigInteger(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "BigInteger.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1760,7 +1757,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBigDecimal(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
+                    String msg = unknownByteArrayErrorMessage + "BigDecimal.";
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1798,10 +1795,6 @@ public class POCast extends ExpressionOp
             default:
                 throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT);
             }
-        case DataType.BYTEARRAY:
-            //no-op (PIG-4933)
-            result = obj;
-            break;
         default:
             throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT);
         }
@@ -1868,7 +1861,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBag(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "bag.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1959,7 +1952,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToMap(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "map for " + this.getOriginalLocations();
+                        String msg = unknownByteArrayErrorMessage + "map.";
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Fri Feb 24 03:34:37 2017
@@ -158,19 +158,23 @@ public class POProject extends Expressio
             illustratorMarkup(inpValue, res.result, -1);
             return res;
         } else if(columns.size() == 1) {
-            if ( inpValue == null ) {
-                // the tuple is null, so a dereference should also produce a null
-                res.returnStatus = POStatus.STATUS_OK;
-                ret = null;
-            } else if( inpValue.size() > columns.get(0) ) {
+            try {
                 ret = inpValue.get(columns.get(0));
-            } else {
+            } catch (IndexOutOfBoundsException ie) {
                 if(pigLogger != null) {
                     pigLogger.warn(this,"Attempt to access field " +
                             "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                 }
                 res.returnStatus = POStatus.STATUS_OK;
                 ret = null;
+            } catch (NullPointerException npe) {
+                // the tuple is null, so a dereference should also produce a null
+                // there is a slight danger here that the Tuple implementation
+                // may have given the exception for a different reason but if we
+                // don't catch it, we will die and the most common case for the
+                // exception would be because the tuple is null
+                res.returnStatus = POStatus.STATUS_OK;
+                ret = null;
             }
         } else if(isProjectToEnd){
             ret = getRangeTuple(inpValue);
@@ -211,18 +215,23 @@ public class POProject extends Expressio
      */
     private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i)
     throws ExecException {
-        if( inpValue == null ) {
-            // the tuple is null, so a dereference should also produce a null
-            objList.add(null);
-        } else if( inpValue.size() > i ) {
+        try {
             objList.add(inpValue.get(i));
-        } else {
+        } catch (IndexOutOfBoundsException ie) {
             if(pigLogger != null) {
                 pigLogger.warn(this,"Attempt to access field " + i +
                         " which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
             }
             objList.add(null);
         }
+        catch (NullPointerException npe) {
+            // the tuple is null, so a dereference should also produce a null
+            // there is a slight danger here that the Tuple implementation
+            // may have given the exception for a different reason but if we
+            // don't catch it, we will die and the most common case for the
+            // exception would be because the tuple is null
+            objList.add(null);
+        }
     }
 
     @Override
@@ -397,17 +406,21 @@ public class POProject extends Expressio
             Object ret;
 
             if(columns.size() == 1) {
-                if( inpValue == null ) {
-                    // the tuple is null, so a dereference should also produce a null
-                    ret = null;
-                } else if( inpValue.size() > columns.get(0) ) {
+                try{
                     ret = inpValue.get(columns.get(0));
-                } else {
+                } catch (IndexOutOfBoundsException ie) {
                     if(pigLogger != null) {
                         pigLogger.warn(this,"Attempt to access field " +
                                 "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                     }
                     ret = null;
+                } catch (NullPointerException npe) {
+                    // the tuple is null, so a dereference should also produce a null
+                    // there is a slight danger here that the Tuple implementation
+                    // may have given the exception for a different reason but if we
+                    // don't catch it, we will die and the most common case for the
+                    // exception would be because the tuple is null
+                    ret = null;
                 }
             } else if(isProjectToEnd) {
                 ret = getRangeTuple(inpValue);
@@ -415,17 +428,21 @@ public class POProject extends Expressio
                 ArrayList<Object> objList = new ArrayList<Object>(columns.size());
 
                 for(int col: columns) {
-                    if( inpValue == null ) {
-                        // the tuple is null, so a dereference should also produce a null
-                        objList.add(null);
-                    } else if( inpValue.size() > col ) {
+                    try {
                         objList.add(inpValue.get(col));
-                    } else {
+                    } catch (IndexOutOfBoundsException ie) {
                         if(pigLogger != null) {
                             pigLogger.warn(this,"Attempt to access field " +
                                     "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                         }
                         objList.add(null);
+                    } catch (NullPointerException npe) {
+                        // the tuple is null, so a dereference should also produce a null
+                        // there is a slight danger here that the Tuple implementation
+                        // may have given the exception for a different reason but if we
+                        // don't catch it, we will die and the most common case for the
+                        // exception would be because the tuple is null
+                        objList.add(null);
                     }
                 }
                 ret = mTupleFactory.newTuple(objList);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Fri Feb 24 03:34:37 2017
@@ -49,7 +49,7 @@ public class CombinerPackager extends Pa
     private Map<Integer, Integer> keyLookup;
 
     private int numBags;
-
+    
     private transient boolean initialized;
     private transient boolean useDefaultBag;
 
@@ -77,15 +77,6 @@ public class CombinerPackager extends Pa
         }
     }
 
-    @Override
-    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
-            throws ExecException {
-        this.key = key;
-        this.bags = bags;
-        this.readOnce = readOnce;
-        // Bag can be read directly and need not be materialized again
-    }
-
     /**
      * @param keyInfo the keyInfo to set
      */

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Fri Feb 24 03:34:37 2017
@@ -17,7 +17,7 @@
  */
 
 /**
- *
+ * 
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -49,15 +48,6 @@ public class LitePackager extends Packag
     private PigNullableWritable keyWritable;
 
     @Override
-    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
-            throws ExecException {
-        this.key = key;
-        this.bags = bags;
-        this.readOnce = readOnce;
-        // Bag can be read directly and need not be materialized again
-    }
-
-    @Override
     public boolean[] getInner() {
         return null;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java Fri Feb 24 03:34:37 2017
@@ -256,9 +256,4 @@ public class POCross extends PhysicalOpe
         data = null;
     }
 
-    @Override
-    public void reset() {
-        clearMemory();
-    }
-
 }