You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/04/15 01:09:45 UTC

svn commit: r1739205 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/...

Author: rohini
Date: Thu Apr 14 23:09:45 2016
New Revision: 1739205

URL: http://svn.apache.org/viewvc?rev=1739205&view=rev
Log:
PIG-4866: Do not serialize PigContext in configuration to the backend (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
    pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
    pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
    pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Apr 14 23:09:45 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4866: Do not serialize PigContext in configuration to the backend (rohini)
+
 PIG-4547: Update Jython version to 2.7.0 (erwaman via daijy)
 
 PIG-4862: POProject slow by creating StackTrace repeatedly (knoguchi)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Thu Apr 14 23:09:45 2016
@@ -40,7 +40,7 @@ import org.apache.pig.tools.pigstats.Pig
 public class HJob implements ExecJob {
 
     private final Log log = LogFactory.getLog(getClass());
-    
+
     protected JOB_STATUS status;
     protected PigContext pigContext;
     protected FileSpec outFileSpec;
@@ -48,7 +48,7 @@ public class HJob implements ExecJob {
     protected String alias;
     protected POStore poStore;
     private PigStats stats;
-    
+
     public HJob(JOB_STATUS status,
                 PigContext pigContext,
                 POStore store,
@@ -59,7 +59,7 @@ public class HJob implements ExecJob {
         this.outFileSpec = poStore.getSFile();
         this.alias = alias;
     }
-    
+
     public HJob(JOB_STATUS status,
             PigContext pigContext,
             POStore store,
@@ -72,37 +72,41 @@ public class HJob implements ExecJob {
         this.alias = alias;
         this.stats = stats;
     }
-    
+
+    @Override
     public JOB_STATUS getStatus() {
         return status;
     }
-    
+
+    @Override
     public boolean hasCompleted() throws ExecException {
         return true;
     }
-    
+
+    @Override
     public Iterator<Tuple> getResults() throws ExecException {
         final LoadFunc p;
-        
+
         try{
-             LoadFunc originalLoadFunc = 
+             LoadFunc originalLoadFunc =
                  (LoadFunc)PigContext.instantiateFuncFromSpec(
                          outFileSpec.getFuncSpec());
-             
-             p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, 
+
+             p = (LoadFunc) new ReadToEndLoader(originalLoadFunc,
                      ConfigurationUtil.toConfiguration(
-                     pigContext.getProperties()), outFileSpec.getFileName(), 0, pigContext);
+                     pigContext.getProperties()), outFileSpec.getFileName(), 0);
 
         }catch (Exception e){
             int errCode = 2088;
             String msg = "Unable to get results for: " + outFileSpec;
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-        
+
         return new Iterator<Tuple>() {
             Tuple   t;
             boolean atEnd;
 
+            @Override
             public boolean hasNext() {
                 if (atEnd)
                     return false;
@@ -120,6 +124,7 @@ public class HJob implements ExecJob {
                 return !atEnd;
             }
 
+            @Override
             public Tuple next() {
                 Tuple next = t;
                 if (next != null) {
@@ -136,6 +141,7 @@ public class HJob implements ExecJob {
                 return next;
             }
 
+            @Override
             public void remove() {
                 throw new RuntimeException("Removal not supported");
             }
@@ -143,31 +149,38 @@ public class HJob implements ExecJob {
         };
     }
 
+    @Override
     public Properties getConfiguration() {
         return pigContext.getProperties();
     }
 
+    @Override
     public PigStats getStatistics() {
         //throw new UnsupportedOperationException();
         return stats;
     }
 
+    @Override
     public void completionNotification(Object cookie) {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void kill() throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getLogs(OutputStream log) throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getSTDOut(OutputStream out) throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getSTDError(OutputStream error) throws ExecException {
         throw new UnsupportedOperationException();
     }
@@ -176,6 +189,7 @@ public class HJob implements ExecJob {
         backendException = e;
     }
 
+    @Override
     public Exception getException() {
         return backendException;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Apr 14 23:09:45 2016
@@ -702,7 +702,8 @@ public class JobControlCompiler{
             // since this path would be invalid for the new job being created
             pigContext.getProperties().remove("mapreduce.job.credentials.binary");
 
-            conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+            conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal());
+            conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties()));
             conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
             // this is for unit tests since some don't create PigServer
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Thu Apr 14 23:09:45 2016
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +40,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -72,7 +74,6 @@ public class PigCombiner {
         PhysicalOperator[] roots;
         PhysicalOperator leaf;
 
-        PigContext pigContext = null;
         private volatile boolean initialized = false;
 
         //@StaticDataCleanup
@@ -91,9 +92,11 @@ public class PigCombiner {
             Configuration jConf = context.getConfiguration();
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
-                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-                if (pigContext.getLog4jProperties()!=null)
-                    PropertyConfigurator.configure(pigContext.getLog4jProperties());
+                Properties log4jProperties = (Properties) ObjectSerializer
+                        .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+                if (log4jProperties != null) {
+                    PropertyConfigurator.configure(log4jProperties);
+                }
                 UDFContext.getUDFContext().reset();
                 MapRedUtil.setupUDFContext(context.getConfiguration());
 
@@ -143,7 +146,7 @@ public class PigCombiner {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -268,7 +271,6 @@ public class PigCombiner {
             pigReporter = null;
             // Avoid OOM in Tez.
             PhysicalOperator.setReporter(null);
-            pigContext = null;
             roots = null;
             cp = null;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Thu Apr 14 23:09:45 2016
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import org.apache.pig.data.SchemaTupleBa
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -88,7 +90,6 @@ public abstract class PigGenericMapBase
 
     private PhysicalOperator leaf;
 
-    PigContext pigContext = null;
     private volatile boolean initialized = false;
 
     /**
@@ -168,13 +169,15 @@ public abstract class PigGenericMapBase
         inIllustrator = inIllustrator(context);
 
         PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
-        pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
 
         // This attempts to fetch all of the generated code from the distributed cache, and resolve it
-        SchemaTupleBackend.initialize(job, pigContext);
+        SchemaTupleBackend.initialize(job);
 
-        if (pigContext.getLog4jProperties()!=null)
-            PropertyConfigurator.configure(pigContext.getLog4jProperties());
+        Properties log4jProperties = (Properties) ObjectSerializer
+                .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+        if (log4jProperties != null) {
+            PropertyConfigurator.configure(log4jProperties);
+        }
 
         if (mp == null)
             mp = (PhysicalPlan) ObjectSerializer.deserialize(
@@ -236,7 +239,7 @@ public abstract class PigGenericMapBase
             pigReporter.setRep(context);
             PhysicalOperator.setReporter(pigReporter);
 
-            boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+            boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
             PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
             pigStatusReporter.setContext(new MRTaskContext(context));
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -249,8 +252,7 @@ public abstract class PigGenericMapBase
                     MapReducePOStoreImpl impl
                         = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
-                    if (!pigContext.inIllustrator)
-                        store.setUp();
+                    store.setUp();
                 }
             }
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Thu Apr 14 23:09:45 2016
@@ -287,7 +287,6 @@ public class PigGenericMapReduce {
 
         private PhysicalOperator leaf;
 
-        PigContext pigContext = null;
         protected volatile boolean initialized = false;
 
         private boolean inIllustrator = false;
@@ -319,10 +318,9 @@ public class PigGenericMapReduce {
             sJobConf = context.getConfiguration();
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
-                pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
 
                 // This attempts to fetch all of the generated code from the distributed cache, and resolve it
-                SchemaTupleBackend.initialize(jConf, pigContext);
+                SchemaTupleBackend.initialize(jConf);
 
                 if (rp == null)
                     rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
@@ -377,7 +375,7 @@ public class PigGenericMapReduce {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -608,7 +606,7 @@ public class PigGenericMapReduce {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Apr 14 23:09:45 2016
@@ -197,14 +197,11 @@ public class PigInputFormat extends Inpu
 
         ArrayList<FileSpec> inputs;
         ArrayList<ArrayList<OperatorKey>> inpTargets;
-        PigContext pigContext;
         try {
             inputs = (ArrayList<FileSpec>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUTS));
             inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUT_TARGETS));
-            pigContext = (PigContext) ObjectSerializer.deserialize(conf
-                    .get("pig.pigContext"));
             PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
             MapRedUtil.setupUDFContext(conf);
         } catch (Exception e) {
@@ -234,7 +231,7 @@ public class PigInputFormat extends Inpu
 
                 // if the execution is against Mapred DFS, set
                 // working dir to /user/<userid>
-                if(!Utils.isLocal(pigContext, conf)) {
+                if(!Utils.isLocal(conf)) {
                     fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
                 }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Thu Apr 14 23:09:45 2016
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,13 +30,12 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.NullableBigDecimalWritable;
 import org.apache.pig.impl.io.NullableBigIntegerWritable;
@@ -52,7 +50,6 @@ import org.apache.pig.impl.io.NullableTe
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable>
@@ -62,7 +59,6 @@ public class WeightedRangePartitioner ex
             new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
     protected PigNullableWritable[] quantiles;
     protected RawComparator<PigNullableWritable> comparator;
-    private PigContext pigContext;
     protected Configuration job;
 
     protected boolean inited = false;
@@ -93,11 +89,6 @@ public class WeightedRangePartitioner ex
     @SuppressWarnings("unchecked")
     public void init() {
         weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
-        try {
-            pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to deserialize pig context: ", e);
-        }
 
         String quantilesFile = job.get("pig.quantilesFile", "");
         if (quantilesFile.length() == 0) {
@@ -109,10 +100,10 @@ public class WeightedRangePartitioner ex
             // use local file system to get the quantilesFile
             Map<String, Object> quantileMap = null;
             Configuration conf;
-            if (!pigContext.getExecType().isLocal()) {
-                conf = ConfigurationUtil.toConfiguration(pigContext.getProperties());
-            } else {
+            if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)) {
                 conf = new Configuration(false);
+            } else {
+                conf = new Configuration(job);
             }
             if (job.get("fs.file.impl") != null) {
                 conf.set("fs.file.impl", job.get("fs.file.impl"));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Apr 14 23:09:45 2016
@@ -108,7 +108,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
-import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -479,7 +478,8 @@ public class TezDagBuilder extends TezOp
 
         conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
         conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
-        conf.set("pig.pigContext", serializedPigContext);
+        conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
+        conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
         conf.set("udf.import.list", serializedUDFImportList);
 
         if(to.isGlobalSort() || to.isLimitAfterSort()){
@@ -593,6 +593,8 @@ public class TezDagBuilder extends TezOp
         setOutputFormat(job);
         payloadConf.set("udf.import.list", serializedUDFImportList);
         payloadConf.set("exectype", "TEZ");
+        payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
+        payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));
 
         // Process stores
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -611,11 +613,7 @@ public class TezDagBuilder extends TezOp
             payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
             payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
             inputPayLoad = new Configuration(payloadConf);
-            if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) {
-                inputPayLoad.set("pig.pigContext", serializedPigContext);
-            }
         }
-        payloadConf.set("pig.pigContext", serializedPigContext);
 
         if (tezOp.getSampleOperator() != null) {
             payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
@@ -805,7 +803,7 @@ public class TezDagBuilder extends TezOp
                         vmPluginName = PigGraceShuffleVertexManager.class.getName();
                         tezOp.setUseGraceParallelism(true);
                         vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
-                        vmPluginConf.set("pig.pigContext", serializedPigContext);
+                        vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
                     } else {
                         vmPluginName = ShuffleVertexManager.class.getName();
                     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Thu Apr 14 23:09:45 2016
@@ -66,11 +66,6 @@ public class LoaderProcessor extends Tez
         this.jobConf.setBoolean("mapred.mapper.new-api", true);
         this.jobConf.setClass("mapreduce.inputformat.class",
                 PigInputFormat.class, InputFormat.class);
-        try {
-            this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        } catch (IOException e) {
-            throw new VisitorException(e);
-        }
     }
 
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java Thu Apr 14 23:09:45 2016
@@ -33,15 +33,15 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
@@ -72,7 +72,7 @@ public class PigGraceShuffleVertexManage
             conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
             bytesPerTask = conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                     InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
-            pc = (PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext"));
+            pc = (PigContext)ObjectSerializer.deserialize(conf.get(PigImplConstants.PIG_CONTEXT));
             tezPlan = (TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan"));
             TezEstimatedParallelismClearer clearer = new TezEstimatedParallelismClearer(tezPlan);
             try {
@@ -81,9 +81,10 @@ public class PigGraceShuffleVertexManage
                 throw new TezUncheckedException(e);
             }
             TezOperator op = tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
-    
+
             // Collect grandparents of the vertex
-            Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { 
+            Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() {
+                @Override
                 public String apply(TezOperator op) { return op.getOperatorKey().toString(); }
             };
             grandParents = Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), tezOpToString);
@@ -135,7 +136,7 @@ public class PigGraceShuffleVertexManage
         // Now one of the predecessor is about to start, we need to make a decision now
         if (anyPredAboutToStart) {
             // All grandparents finished, start parents with right parallelism
-            
+
             for (TezOperator pred : preds) {
                 if (pred.getRequestedParallelism()==-1) {
                     List<TezOperator> predPreds = tezPlan.getPredecessors(pred);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Thu Apr 14 23:09:45 2016
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -32,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
@@ -53,6 +55,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -132,7 +135,11 @@ public class PigProcessor extends Abstra
         SpillableMemoryManager.getInstance().configure(conf);
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                 .deserialize(conf.get("udf.import.list")));
-        PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext"));
+        Properties log4jProperties = (Properties) ObjectSerializer
+                .deserialize(conf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+        if (log4jProperties != null) {
+            PropertyConfigurator.configure(log4jProperties);
+        }
 
         // To determine front-end in UDFContext
         conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier());
@@ -158,7 +165,7 @@ public class PigProcessor extends Abstra
 
         String execPlanString = conf.get(PLAN);
         execPlan = (PhysicalPlan) ObjectSerializer.deserialize(execPlanString);
-        SchemaTupleBackend.initialize(conf, pc);
+        SchemaTupleBackend.initialize(conf);
         PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new org.apache.hadoop.mapreduce.JobID());
 
         // Set the job conf as a thread-local member of PigMapReduce
@@ -167,7 +174,7 @@ public class PigProcessor extends Abstra
 
         Utils.setDefaultTimeZone(conf);
 
-        boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
+        boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
         PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
         pigStatusReporter.setContext(new TezTaskContext(getContext()));
         pigHadoopLogger = PigHadoopLogger.getInstance();

Modified: pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java Thu Apr 14 23:09:45 2016
@@ -39,6 +39,7 @@ import org.apache.pig.data.utils.Structu
 import org.apache.pig.data.utils.StructuresHelper.Triple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -272,14 +273,20 @@ public class SchemaTupleBackend {
     private static SchemaTupleBackend stb;
 
     public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
-        initialize(jConf, pigContext, pigContext.getExecType().isLocal());
+        if (stb != null) {
+            SchemaTupleFrontend.lazyReset(pigContext);
+        }
+        initialize(jConf, pigContext.getExecType().isLocal());
     }
 
-    public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {
+    public static void initialize(Configuration jConf) throws IOException {
+        initialize(jConf, Utils.isLocal(jConf));
+    }
+
+    public static void initialize(Configuration jConf, boolean isLocal) throws IOException {
         if (stb != null) {
             LOG.warn("SchemaTupleBackend has already been initialized");
         } else {
-            SchemaTupleFrontend.lazyReset(pigContext);
             SchemaTupleFrontend.reset();
             SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal);
             stbInstance.copyAndResolve();

Modified: pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigImplConstants.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigImplConstants.java Thu Apr 14 23:09:45 2016
@@ -38,6 +38,12 @@ public class PigImplConstants {
     public static final String PIG_OPTIMIZER_RULES_KEY = "pig.optimizer.rules";
 
     /**
+     * Used by pig to indicate that current job is running in local mode (local/tez_local)
+     * ie. ExecType.isLocal() is true
+     */
+    public static final String PIG_EXECTYPE_MODE_LOCAL = "pig.exectype.mode.local";
+
+    /**
      * Used by pig to indicate that current job has been converted to run in local mode
      */
     public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
@@ -63,4 +69,14 @@ public class PigImplConstants {
      * Parallelism to be used for CROSS operation by GFCross UDF
      */
     public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism";
+
+    /**
+     * Pig context
+     */
+    public static final String PIG_CONTEXT = "pig.pigContext";
+
+    /**
+     * Pig log4j properties
+     */
+    public static final String PIG_LOG4J_PROPERTIES = "pig.log4j.properties";
 }

Modified: pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Thu Apr 14 23:09:45 2016
@@ -64,13 +64,13 @@ import org.apache.pig.impl.util.ObjectSe
 public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{
 
     private static final Log LOG = LogFactory.getLog(DefaultIndexableLoader.class);
-    
+
     // FileSpec of index file which will be read from HDFS.
     private String indexFile;
     private String indexFileLoadFuncSpec;
-    
+
     private LoadFunc loader;
-    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.  
+    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.
     private LinkedList<Tuple> index;
     private FuncSpec rightLoaderFuncSpec;
 
@@ -79,9 +79,9 @@ public class DefaultIndexableLoader exte
     private transient TupleFactory mTupleFactory;
 
     private String inpLocation;
-    
+
     public DefaultIndexableLoader(
-            String loaderFuncSpec, 
+            String loaderFuncSpec,
             String indexFile,
             String indexFileLoadFuncSpec,
             String scope,
@@ -93,39 +93,39 @@ public class DefaultIndexableLoader exte
         this.scope = scope;
         this.inpLocation = inputLocation;
     }
-    
+
     @SuppressWarnings("unchecked")
     @Override
     public void seekNear(Tuple keys) throws IOException{
         // some setup
         mTupleFactory = TupleFactory.getInstance();
 
-        /* Currently whole of index is read into memory. Typically, index is small. Usually 
+        /* Currently whole of index is read into memory. Typically, index is small. Usually
            few KBs in size. So, this should not be an issue.
            However, reading whole index at startup time is not required. So, this can be improved upon.
            Assumption: Index being read is sorted on keys followed by filename, followed by offset.
          */
 
         // Index is modeled as FIFO Queue, that frees us from keeping track of which index entry should be read next.
-        
+
         // the keys are sent in a tuple. If there is really only
         // 1 join key, it would be the first field of the tuple. If
         // there are multiple Join keys, the tuple itself represents
         // the join key
         Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
         POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)));
-                
+
         Properties props = ConfigurationUtil.getLocalFSProperties();
         PigContext pc = new PigContext(ExecType.LOCAL, props);
         ld.setPc(pc);
         index = new LinkedList<Tuple>();
         for(Result res=ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple())
-            index.offer((Tuple) res.result);   
+            index.offer((Tuple) res.result);
+
 
-        
         Tuple prevIdxEntry = null;
         Tuple matchedEntry;
-     
+
         // When the first call is made, we need to seek into right input at correct offset.
         while(true){
             // Keep looping till we find first entry in index >= left key
@@ -148,15 +148,15 @@ public class DefaultIndexableLoader exte
                 prevIdxEntry = curIdxEntry;
                 continue;
             }
-            
+
             if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
                 index.addFirst(curIdxEntry);  // We need to add back the current index Entry because we are reading ahead.
                 if(null == prevIdxEntry)   // very first entry in index.
                     matchedEntry = curIdxEntry;
                 else{
-                    matchedEntry = prevIdxEntry; 
+                    matchedEntry = prevIdxEntry;
                     // start join from previous idx entry, it might have tuples
-                    // with this key                    
+                    // with this key
                     index.addFirst(prevIdxEntry);
                 }
                 break;
@@ -168,43 +168,43 @@ public class DefaultIndexableLoader exte
         if (matchedEntry == null) {
             LOG.warn("Empty index file: input directory is empty");
         } else {
-        
+
             Object extractedKey = extractKeysFromIdxTuple(matchedEntry);
-            
+
             if (extractedKey != null) {
                 Class idxKeyClass = extractedKey.getClass();
                 if( ! firstLeftKey.getClass().equals(idxKeyClass)){
-    
+
                     // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also.
                     int errCode = 2166;
                     String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side.";
                     throw new ExecException(errMsg,errCode,PigException.BUG);
                 }
-            } 
+            }
         }
-        
+
         //add remaining split indexes to splitsAhead array
         int [] splitsAhead = new int[index.size()];
         int splitsAheadIdx = 0;
         for(Tuple t : index){
             splitsAhead[splitsAheadIdx++] = (Integer) t.get( t.size()-1 );
         }
-        
+
         initRightLoader(splitsAhead);
     }
-    
+
     private void initRightLoader(int [] splitsToBeRead) throws IOException{
-        PigContext pc = (PigContext) ObjectSerializer
-                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.pigContext"));
-        
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        
+        Properties properties = (Properties) ObjectSerializer
+                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.client.sys.props"));
+
+        Configuration conf = ConfigurationUtil.toConfiguration(properties);
+
         // Hadoop security need this property to be set
         if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
-            conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, 
+            conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY,
                     System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
         }
-        
+
         //create ReadToEndLoader that will read the given splits in order
         loader = new ReadToEndLoader((LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
                 conf, inpLocation, splitsToBeRead);
@@ -216,7 +216,7 @@ public class DefaultIndexableLoader exte
 
         if(idxTupSize == 3)
             return idxTuple.get(0);
-        
+
         int numColsInKey = (idxTupSize - 2);
         List<Object> list = new ArrayList<Object>(numColsInKey);
         for(int i=0; i < numColsInKey; i++)
@@ -228,13 +228,13 @@ public class DefaultIndexableLoader exte
     private OperatorKey genKey(){
         return new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
-    
+
     @Override
     public Tuple getNext() throws IOException {
         Tuple t = loader.getNext();
         return t;
     }
-    
+
     @Override
     public void close() throws IOException {
     }
@@ -242,14 +242,14 @@ public class DefaultIndexableLoader exte
     @Override
     public void initialize(Configuration conf) throws IOException {
         // nothing to do
-        
+
     }
 
     @Override
     public InputFormat getInputFormat() throws IOException {
         throw new UnsupportedOperationException();
     }
-    
+
     @Override
     public LoadCaster getLoadCaster() throws IOException {
         throw new UnsupportedOperationException();
@@ -264,7 +264,7 @@ public class DefaultIndexableLoader exte
     public void setLocation(String location, Job job) throws IOException {
         // nothing to do
     }
-    
+
     public void setIndexFile(String indexFile) {
         this.indexFile = indexFile;
     }

Modified: pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java Thu Apr 14 23:09:45 2016
@@ -40,17 +40,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
  * This is wrapper Loader which wraps a real LoadFunc underneath and allows
- * to read a file completely starting a given split (indicated by a split index 
+ * to read a file completely starting a given split (indicated by a split index
  * which is used to look in the List<InputSplit> returned by the underlying
  * InputFormat's getSplits() method). So if the supplied split index is 0, this
  * loader will read the entire file. If it is non zero it will read the partial
  * file beginning from that split to the last split.
- * 
+ *
  * The call sequence to use this is:
  * 1) construct an object using the constructor
  * 2) Call getNext() in a loop till it returns null
@@ -61,52 +60,50 @@ public class ReadToEndLoader extends Loa
      * the wrapped LoadFunc which will do the actual reading
      */
     private LoadFunc wrappedLoadFunc;
-    
+
     /**
      * the Configuration object used to locate the input location - this will
      * be used to call {@link LoadFunc#setLocation(String, Job)} on
      * the wrappedLoadFunc
      */
     private Configuration conf;
-    
+
     /**
      * the input location string (typically input file/dir name )
      */
     private String inputLocation;
-      
+
     /**
      * If the splits to be read are not in increasing sequence of integers
      * this array can be used
      */
     private int[] toReadSplits = null;
-    
+
     /**
      * index into toReadSplits
      */
     private int toReadSplitsIdx = 0;
-    
+
     /**
      * the index of the split the loader is currently reading from
      */
     private int curSplitIndex;
-    
+
     /**
      * the input splits returned by underlying {@link InputFormat#getSplits(JobContext)}
      */
     private List<InputSplit> inpSplits = null;
-    
+
     /**
      * underlying RecordReader
      */
     private RecordReader reader = null;
-    
+
     /**
      * underlying InputFormat
      */
     private InputFormat inputFormat = null;
-    
-    private PigContext pigContext;
-    
+
     private String udfContextSignature = null;
 
     /**
@@ -114,8 +111,8 @@ public class ReadToEndLoader extends Loa
      * @param conf
      * @param inputLocation
      * @param splitIndex
-     * @throws IOException 
-     * @throws InterruptedException 
+     * @throws IOException
+     * @throws InterruptedException
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int splitIndex) throws IOException {
@@ -125,17 +122,7 @@ public class ReadToEndLoader extends Loa
         this.curSplitIndex = splitIndex;
         init();
     }
-    
-    public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
-            String inputLocation, int splitIndex, PigContext pigContext) throws IOException {
-        this.wrappedLoadFunc = wrappedLoadFunc;
-        this.inputLocation = inputLocation;
-        this.conf = conf;
-        this.curSplitIndex = splitIndex;
-        this.pigContext = pigContext;
-        init();
-    }
-    
+
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int splitIndex, String signature) throws IOException {
         this.udfContextSignature = signature;
@@ -147,14 +134,14 @@ public class ReadToEndLoader extends Loa
     }
 
     /**
-     * This constructor takes an array of split indexes (toReadSplitIdxs) of the 
+     * This constructor takes an array of split indexes (toReadSplitIdxs) of the
      * splits to be read.
      * @param wrappedLoadFunc
      * @param conf
      * @param inputLocation
      * @param toReadSplitIdxs
-     * @throws IOException 
-     * @throws InterruptedException 
+     * @throws IOException
+     * @throws InterruptedException
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int[] toReadSplitIdxs) throws IOException {
@@ -166,21 +153,21 @@ public class ReadToEndLoader extends Loa
             toReadSplitIdxs.length > 0 ? toReadSplitIdxs[0] : Integer.MAX_VALUE;
         init();
     }
-    
+
     @SuppressWarnings("unchecked")
     private void init() throws IOException {
-        if (conf != null && pigContext != null) {
-            SchemaTupleBackend.initialize(conf, pigContext, true);
+        if (conf != null) {
+            SchemaTupleBackend.initialize(conf, true);
         }
 
         // make a copy so that if the underlying InputFormat writes to the
         // conf, we don't affect the caller's copy
         conf = new Configuration(conf);
 
-        // let's initialize the wrappedLoadFunc 
+        // let's initialize the wrappedLoadFunc
         Job job = new Job(conf);
         wrappedLoadFunc.setUDFContextSignature(this.udfContextSignature);
-        wrappedLoadFunc.setLocation(inputLocation, 
+        wrappedLoadFunc.setLocation(inputLocation,
                 job);
         // The above setLocation call could write to the conf within
         // the job - get a hold of the modified conf
@@ -191,10 +178,10 @@ public class ReadToEndLoader extends Loa
                     new JobID()));
         } catch (InterruptedException e) {
             throw new IOException(e);
-        }        
+        }
     }
 
-    private boolean initializeReader() throws IOException, 
+    private boolean initializeReader() throws IOException,
     InterruptedException {
         // Close the previous reader first
         if(reader != null) {
@@ -206,14 +193,14 @@ public class ReadToEndLoader extends Loa
             return false;
         }
         InputSplit curSplit = inpSplits.get(curSplitIndex);
-        TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf, 
+        TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf,
                 new TaskAttemptID());
         reader = inputFormat.createRecordReader(curSplit, tAContext);
         reader.initialize(curSplit, tAContext);
         // create a dummy pigsplit - other than the actual split, the other
         // params are really not needed here where we are just reading the
         // input completely
-        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, 
+        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1,
                 new ArrayList<OperatorKey>(), -1);
         // Set the conf object so that if the wrappedLoadFunc uses it,
         // it won't be null
@@ -244,7 +231,7 @@ public class ReadToEndLoader extends Loa
             throw new IOException(e);
         }
     }
-    
+
     private Tuple getNextHelper() throws IOException, InterruptedException {
         Tuple t = null;
         while(initializeReader()) {
@@ -258,8 +245,8 @@ public class ReadToEndLoader extends Loa
         }
         return null;
     }
-    
-    
+
+
     /**
      * Updates curSplitIndex , just increment if splitIndexes is null,
      * else get next split in splitIndexes
@@ -331,7 +318,7 @@ public class ReadToEndLoader extends Loa
              ((LoadMetadata) wrappedLoadFunc).setPartitionFilter(partitionFilter);
         }
     }
-    
+
     @Override
     public void setUDFContextSignature(String signature) {
         this.udfContextSignature = signature;

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1739205&r1=1739204&r2=1739205&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Apr 14 23:09:45 2016
@@ -580,6 +580,11 @@ public class Utils {
         return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
     }
 
+    public static boolean isLocal(Configuration conf) {
+        return conf.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)
+                || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
+    }
+
     // PIG-3929 use parameter substitution for pig properties similar to Hadoop Configuration
     // Following code has been borrowed from Hadoop's Configuration#substituteVars
     private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");