You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/08/24 16:04:01 UTC

svn commit: r1757545 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: ./ converter/ running/

Author: xuefu
Date: Wed Aug 24 16:04:01 2016
New Revision: 1757545

URL: http://svn.apache.org/viewvc?rev=1757545&view=rev
Log:
PIG-4970: Remove the deserialize and serialization of JobConf in code for spark mode (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Aug 24 16:04:01 2016
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -41,7 +40,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigConfiguration;
-import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -70,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
@@ -110,7 +107,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -154,7 +150,7 @@ public class SparkLauncher extends Launc
         if (LOG.isDebugEnabled())
             LOG.debug(physicalPlan);
         this.pigContext = pigContext;
-        initialize();
+        initialize(physicalPlan);
         SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
         if (LOG.isDebugEnabled())
             explain(sparkplan, System.out, "text", true);
@@ -177,44 +173,33 @@ public class SparkLauncher extends Launc
                 .normalize().toString()
                 + "/";
 
-
-        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
-                physicalPlan, POStore.class);
-        POStore firstStore = stores.getFirst();
-        if (firstStore != null) {
-            MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext,
-                    jobConf);
-        }
-
         new ParallelismSetter(sparkplan, jobConf).visit();
 
-        byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
-
         SparkPigStatusReporter.getInstance().setCounters(new SparkCounters(sparkContext));
 
         // Create conversion map, mapping between pig operator and spark convertor
         Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
                 = new HashMap<Class<? extends PhysicalOperator>, RDDConverter>();
         convertMap.put(POLoad.class, new LoadConverter(pigContext,
-                physicalPlan, sparkContext.sc()));
-        convertMap.put(POStore.class, new StoreConverter(pigContext));
-        convertMap.put(POForEach.class, new ForEachConverter(confBytes));
-        convertMap.put(POFilter.class, new FilterConverter(confBytes));
-        convertMap.put(POPackage.class, new PackageConverter(confBytes));
+                physicalPlan, sparkContext.sc(), jobConf));
+        convertMap.put(POStore.class, new StoreConverter(jobConf));
+        convertMap.put(POForEach.class, new ForEachConverter());
+        convertMap.put(POFilter.class, new FilterConverter());
+        convertMap.put(POPackage.class, new PackageConverter());
         convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
         convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
-	    convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter(confBytes));
+	    convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter());
         convertMap.put(POLimit.class, new LimitConverter());
         convertMap.put(PODistinct.class, new DistinctConverter());
         convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
         convertMap.put(POSort.class, new SortConverter());
         convertMap.put(POSplit.class, new SplitConverter());
         convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
-        convertMap.put(POMergeJoin.class, new MergeJoinConverter(confBytes));
+        convertMap.put(POMergeJoin.class, new MergeJoinConverter());
         convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
         convertMap.put(POCounter.class, new CounterConverter());
         convertMap.put(PORank.class, new RankConverter());
-        convertMap.put(POStream.class, new StreamConverter(confBytes));
+        convertMap.put(POStream.class, new StreamConverter());
         convertMap.put(POFRJoin.class, new FRJoinConverter());
         convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
         convertMap.put(POReduceBySpark.class, new ReduceByConverter());
@@ -378,13 +363,12 @@ public class SparkLauncher extends Launc
 
     private void cacheFiles(String cacheFiles) throws IOException {
         if (cacheFiles != null) {
-            Configuration conf = SparkUtil.newJobConf(pigContext);
             for (String file : cacheFiles.split(",")) {
                 String fileName = extractFileName(file.trim());
                 Path src = new Path(extractFileUrl(file.trim()));
                 File tmpFile = File.createTempFile(fileName, ".tmp");
                 Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
-                FileSystem fs = tmpFilePath.getFileSystem(conf);
+                FileSystem fs = tmpFilePath.getFileSystem(jobConf);
                 fs.copyToLocalFile(src, tmpFilePath);
                 tmpFile.deleteOnExit();
                 LOG.info(String.format("cacheFile:%s", fileName));
@@ -659,12 +643,9 @@ public class SparkLauncher extends Launc
         pigContext.getProperties().setProperty("spark.udf.import.list", udfImportList);
     }
 
-    private void initialize() throws IOException {
+    private void initialize(PhysicalPlan physicalPlan) throws IOException {
         saveUdfImportList();
-        jobConf = SparkUtil.newJobConf(pigContext);
-        jobConf.set(PigConstants.LOCAL_CODE_DIR,
-                System.getProperty("java.io.tmpdir"));
-
+        jobConf = SparkUtil.newJobConf(pigContext, physicalPlan);
         SchemaTupleBackend.initialize(jobConf, pigContext);
         Utils.setDefaultTimeZone(jobConf);
         PigMapReduce.sJobConfInternal.set(jobConf);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Wed Aug 24 16:04:01 2016
@@ -19,8 +19,10 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 
 import scala.Product2;
 import scala.Tuple2;
@@ -30,6 +32,7 @@ import scala.reflect.ClassTag;
 import scala.reflect.ClassTag$;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigConstants;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
@@ -37,7 +40,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -45,7 +51,6 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.UDFContext;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
 import org.apache.spark.rdd.RDD;
@@ -66,7 +71,7 @@ public class SparkUtil {
         return (ClassTag<Product2<K, V>>) (Object) getManifest(Product2.class);
     }
 
-    public static JobConf newJobConf(PigContext pigContext) throws IOException {
+    public static JobConf newJobConf(PigContext pigContext, PhysicalPlan physicalPlan) throws IOException {
         JobConf jobConf = new JobConf(
                 ConfigurationUtil.toConfiguration(pigContext.getProperties()));
         // Serialize the PigContext so it's available in Executor thread.
@@ -74,9 +79,20 @@ public class SparkUtil {
         // Serialize the thread local variable inside PigContext separately
         jobConf.set("udf.import.list",
                 ObjectSerializer.serialize(PigContext.getPackageImportList()));
-        UDFContext.getUDFContext().serialize(jobConf);
         Random rand = new Random();
         jobConf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, Integer.toString(rand.nextInt()));
+        jobConf.set(PigConstants.LOCAL_CODE_DIR,
+                System.getProperty("java.io.tmpdir"));
+        jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
+        jobConf.set(PigConstants.TASK_INDEX, "0");
+
+        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+                physicalPlan, POStore.class);
+        POStore firstStore = stores.getFirst();
+        if (firstStore != null) {
+            MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext,
+                    jobConf);
+        }
         return jobConf;
     }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java Wed Aug 24 16:04:01 2016
@@ -17,28 +17,17 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
-import java.util.UUID;
 
 import scala.runtime.AbstractFunction1;
 
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.PigConstants;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.rdd.RDD;
 
 /**
@@ -47,18 +36,12 @@ import org.apache.spark.rdd.RDD;
 @SuppressWarnings({ "serial" })
 public class FilterConverter implements RDDConverter<Tuple, Tuple, POFilter> {
 
-    private byte[] confBytes;
-
-    public FilterConverter(byte[] confBytes) {
-        this.confBytes = confBytes;
-    }
-
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
             POFilter physicalOperator) {
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
-        FilterFunction filterFunction = new FilterFunction(physicalOperator, confBytes);
+        FilterFunction filterFunction = new FilterFunction(physicalOperator);
         return rdd.filter(filterFunction);
     }
 
@@ -66,17 +49,13 @@ public class FilterConverter implements
             AbstractFunction1<Tuple, Object> implements Serializable {
 
         private POFilter poFilter;
-        private byte[] confBytes;
-        private transient JobConf jobConf;
 
-        private FilterFunction(POFilter poFilter, byte[] confBytes) {
+        private FilterFunction(POFilter poFilter) {
             this.poFilter = poFilter;
-            this.confBytes = confBytes;
         }
 
         @Override
         public Boolean apply(Tuple v1) {
-            initializeJobConf();
             Result result;
             try {
                 poFilter.setInputs(null);
@@ -101,24 +80,5 @@ public class FilterConverter implements
                         "Unexpected response code from filter: " + result);
             }
         }
-
-        void initializeJobConf() {
-            if (this.jobConf != null) {
-                return;
-            }
-            this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
-            PigMapReduce.sJobConfInternal.set(jobConf);
-            try {
-                MapRedUtil.setupUDFContext(jobConf);
-                PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
-                SchemaTupleBackend.initialize(jobConf, pc);
-                // Although Job ID and task index are not really applicable for spark,
-                // set them here to overcome PIG-4827
-                jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
-                jobConf.set(PigConstants.TASK_INDEX, "0");
-            } catch (IOException ioe) {
-                throw new RuntimeException("Problem while configuring UDFContext from FilterConverter.", ioe);
-            }
-        }
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Wed Aug 24 16:04:01 2016
@@ -17,29 +17,18 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
 
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.PigConstants;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
@@ -49,18 +38,12 @@ import org.apache.spark.rdd.RDD;
 @SuppressWarnings({"serial" })
 public class ForEachConverter implements RDDConverter<Tuple, Tuple, POForEach> {
 
-    private byte[] confBytes;
-
-    public ForEachConverter(byte[] confBytes) {
-        this.confBytes = confBytes;
-    }
-
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
             POForEach physicalOperator) {
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
-        ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, this.confBytes);
+        ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
         return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
     }
 
@@ -68,34 +51,12 @@ public class ForEachConverter implements
             FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
 
         private POForEach poForEach;
-        private byte[] confBytes;
-        private transient JobConf jobConf;
 
-        private ForEachFunction(POForEach poForEach, byte[] confBytes) {
+        private ForEachFunction(POForEach poForEach) {
             this.poForEach = poForEach;
-            this.confBytes = confBytes;
-        }
-
-        void initializeJobConf() {
-            if (this.jobConf != null) {
-                return;
-            }
-            this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
-            PigMapReduce.sJobConfInternal.set(jobConf);
-            try {
-                MapRedUtil.setupUDFContext(jobConf);
-                PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
-                SchemaTupleBackend.initialize(jobConf, pc);
-                jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
-                jobConf.set(PigConstants.TASK_INDEX, "0");
-            } catch (IOException ioe) {
-                String msg = "Problem while configuring UDFContext from ForEachConverter.";
-                throw new RuntimeException(msg, ioe);
-            }
         }
 
         public Iterable<Tuple> call(final Iterator<Tuple> input) {
-            initializeJobConf();
             // Initialize a reporter as the UDF might want to report progress.
             PhysicalOperator.setReporter(new ProgressableReporter());
             PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Wed Aug 24 16:04:01 2016
@@ -33,14 +33,11 @@ import scala.runtime.AbstractFunction1;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
@@ -58,11 +55,6 @@ import org.apache.spark.rdd.RDD;
 public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> {
     private static final Log LOG = LogFactory
             .getLog(JoinGroupSparkConverter.class);
-    private byte[] confBytes;
-
-    public JoinGroupSparkConverter(byte[] confBytes) {
-        this.confBytes = confBytes;
-    }
 
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark op) throws IOException {
@@ -92,7 +84,7 @@ public class JoinGroupSparkConverter imp
 
             RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
                     (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
-            return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp, this.confBytes)).rdd();
+            return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp)).rdd();
         }
     }
 
@@ -317,26 +309,13 @@ public class JoinGroupSparkConverter imp
             Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
 
         private final POPackage pkgOp;
-        private byte[] confBytes;
-        private JobConf jobConf = null;
 
-        public GroupPkgFunction(POPackage pkgOp, byte[] confBytes) {
+        public GroupPkgFunction(POPackage pkgOp) {
             this.pkgOp = pkgOp;
-            this.confBytes = confBytes;
-        }
-
-        void initializeJobConf() {
-            jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
-            jobConf.set("pig.cachedbag.type", "default");
-            PigMapReduce.sJobConfInternal.set(jobConf);
         }
 
         @Override
         public Tuple call(final Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
-            if( jobConf == null) {
-                initializeJobConf();
-            }
-
             try {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("GroupPkgFunction in " + input);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Aug 24 16:04:01 2016
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
@@ -67,26 +68,20 @@ public class LoadConverter implements RD
     private PigContext pigContext;
     private PhysicalPlan physicalPlan;
     private SparkContext sparkContext;
+    private JobConf jobConf;
 
     public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan,
-            SparkContext sparkContext) {
+            SparkContext sparkContext, JobConf jobConf) {
         this.pigContext = pigContext;
         this.physicalPlan = physicalPlan;
         this.sparkContext = sparkContext;
+        this.jobConf = jobConf;
     }
 
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessorRdds, POLoad op)
             throws IOException {
-
-        // This configuration will be "broadcasted" by Spark, one to every
-        // node. Since we are changing the config here, the safe approach is
-        // to create a new conf for a new RDD.
-        JobConf jobConf = SparkUtil.newJobConf(pigContext);
         configureLoader(physicalPlan, op, jobConf);
-        // need to serialize the configuration loaded in jobConf
-        // to make sure we can access the right config later
-        UDFContext.getUDFContext().serialize(jobConf);
 
         // Set the input directory for input formats that are backed by a
         // filesystem. (Does not apply to HBase, for example).
@@ -99,6 +94,13 @@ public class LoadConverter implements RD
             jobConf.set("pig.noSplitCombination", "true");
         }
 
+
+        //serialize the UDFContext#udfConfs in jobConf
+        UDFContext.getUDFContext().serialize(jobConf);
+
+        //SparkContext.newAPIHadoop will broadcast the jobConf to other worker nodes.
+        //Later in PigInputFormatSpark#createRecordReader, jobConf will be used to
+        //initialize PigContext,UDFContext and SchemaTupleBackend.
         RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD(
                 jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
 
@@ -216,11 +218,11 @@ public class LoadConverter implements RD
         inpSignatures.add(poLoad.getSignature());
         inpLimits.add(poLoad.getLimit());
 
-        jobConf.set("pig.inputs", ObjectSerializer.serialize(pigInputs));
-        jobConf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
-        jobConf.set("pig.inpSignatures",
+        jobConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(pigInputs));
+        jobConf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
+        jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
                 ObjectSerializer.serialize(inpSignatures));
-        jobConf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+        jobConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));
 
         return jobConf;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java Wed Aug 24 16:04:01 2016
@@ -22,13 +22,9 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -39,18 +35,13 @@ import org.apache.spark.rdd.RDD;
 public class MergeJoinConverter implements
         RDDConverter<Tuple, Tuple, POMergeJoin> {
 
-    private byte[] confBytes;
-    public MergeJoinConverter(byte[] confBytes) {
-        this.confBytes = confBytes;
-    }
-
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
                               POMergeJoin poMergeJoin) throws IOException {
 
         SparkUtil.assertPredecessorSize(predecessors, poMergeJoin, 1);
         RDD<Tuple> rdd = predecessors.get(0);
-        MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin, confBytes);
+        MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin);
 
         return rdd.toJavaRDD().mapPartitions(mergeJoinFunction, true).rdd();
     }
@@ -59,24 +50,12 @@ public class MergeJoinConverter implemen
             FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
 
         private POMergeJoin poMergeJoin;
-        private byte[] confBytes;
-
-        private transient JobConf jobConf;
 
-        private MergeJoinFunction(POMergeJoin poMergeJoin, byte[] confBytes) {
+        private MergeJoinFunction(POMergeJoin poMergeJoin) {
             this.poMergeJoin = poMergeJoin;
-            this.confBytes = confBytes;
-        }
-
-        void initializeJobConf() {
-            if (this.jobConf == null) {
-                this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
-                PigMapReduce.sJobConfInternal.set(jobConf);
-            }
         }
 
         public Iterable<Tuple> call(final Iterator<Tuple> input) {
-            initializeJobConf();
 
             return new Iterable<Tuple>() {
                 @Override

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java Wed Aug 24 16:04:01 2016
@@ -26,13 +26,10 @@ import scala.runtime.AbstractFunction1;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
@@ -43,13 +40,6 @@ import org.apache.spark.rdd.RDD;
 public class PackageConverter implements RDDConverter<Tuple, Tuple, POPackage> {
     private static final Log LOG = LogFactory.getLog(PackageConverter.class);
 
-    private transient JobConf jobConf;
-    private byte[] confBytes;
-
-    public PackageConverter(byte[] confBytes) {
-        this.confBytes = confBytes;
-    }
-
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
             POPackage physicalOperator) throws IOException {
@@ -57,7 +47,7 @@ public class PackageConverter implements
         RDD<Tuple> rdd = predecessors.get(0);
         // package will generate the group from the result of the local
         // rearrange
-        return rdd.map(new PackageFunction(physicalOperator, this.confBytes),
+        return rdd.map(new PackageFunction(physicalOperator),
                 SparkUtil.getManifest(Tuple.class));
     }
 
@@ -65,23 +55,13 @@ public class PackageConverter implements
             AbstractFunction1<Tuple, Tuple> implements Serializable {
 
         private final POPackage physicalOperator;
-        private byte[] confBytes;
 
-        public PackageFunction(POPackage physicalOperator, byte[] confBytes) {
+        public PackageFunction(POPackage physicalOperator) {
             this.physicalOperator = physicalOperator;
-            this.confBytes = confBytes;
-        }
-
-        void initializeJobConf() {
-            JobConf jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
-            jobConf.set("pig.cachedbag.type", "default");
-            PigMapReduce.sJobConfInternal.set(jobConf);
         }
 
         @Override
         public Tuple apply(final Tuple t) {
-            initializeJobConf();
-
             // (key, Seq<Tuple>:{(index, key, value without key)})
             if (LOG.isDebugEnabled())
                 LOG.debug("PackageFunction in " + t);
@@ -159,7 +139,6 @@ public class PackageConverter implements
                 LOG.debug("PackageFunction out " + out);
             return out;
         }
-
     }
 
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java Wed Aug 24 16:04:01 2016
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
@@ -43,7 +42,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
@@ -59,12 +57,11 @@ import com.google.common.collect.Lists;
 public class StoreConverter implements
         RDDConverter<Tuple, Tuple2<Text, Tuple>, POStore> {
 
-  private static final Log LOG = LogFactory.getLog(StoreConverter.class);
+    private static final Log LOG = LogFactory.getLog(StoreConverter.class);
 
-    private PigContext pigContext;
-
-    public StoreConverter(PigContext pigContext) {
-        this.pigContext = pigContext;
+    private JobConf jobConf = null;
+    public StoreConverter(JobConf jobConf) {
+        this.jobConf = jobConf;
     }
 
     @Override
@@ -84,7 +81,6 @@ public class StoreConverter implements
                 rddPairs.rdd(), SparkUtil.getManifest(Text.class),
                 SparkUtil.getManifest(Tuple.class), null);
 
-        JobConf jobConf = SparkUtil.newJobConf(pigContext);
         POStore poStore = configureStorer(jobConf, op);
 
         if ("true".equalsIgnoreCase(jobConf

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Wed Aug 24 16:04:01 2016
@@ -22,69 +22,35 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
 public class StreamConverter implements
 		RDDConverter<Tuple, Tuple, POStream> {
-	private byte[] confBytes;
-
-	public StreamConverter(byte[] confBytes) {
-		this.confBytes = confBytes;
-	}
 
 	@Override
 	public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
 			POStream poStream) throws IOException {
 		SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
 		RDD<Tuple> rdd = predecessors.get(0);
-		StreamFunction streamFunction = new StreamFunction(poStream, confBytes);
+		StreamFunction streamFunction = new StreamFunction(poStream);
 		return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd();
 	}
 
 	private static class StreamFunction implements
 			FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
 		private POStream poStream;
-		private transient JobConf jobConf;
-		private byte[] confBytes;
 
-		private StreamFunction(POStream poStream, byte[] confBytes) {
+		private StreamFunction(POStream poStream) {
 			this.poStream = poStream;
-			this.confBytes = confBytes;
-		}
-
-		void initializeJobConf() {
-			if (this.jobConf == null) {
-				this.jobConf = KryoSerializer
-						.deserializeJobConf(this.confBytes);
-				PigMapReduce.sJobConfInternal.set(jobConf);
-				try {
-					MapRedUtil.setupUDFContext(jobConf);
-					PigContext pc = (PigContext) ObjectSerializer
-							.deserialize(jobConf.get("pig.pigContext"));
-					SchemaTupleBackend.initialize(jobConf, pc);
-
-				} catch (IOException ioe) {
-					String msg = "Problem while configuring UDFContext from ForEachConverter.";
-					throw new RuntimeException(msg, ioe);
-				}
-			}
 		}
 
 		public Iterable<Tuple> call(final Iterator<Tuple> input) {
-			initializeJobConf();
 			return new Iterable<Tuple>() {
 				@Override
 				public Iterator<Tuple> iterator() {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed Aug 24 16:04:01 2016
@@ -31,8 +31,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
@@ -42,7 +46,7 @@ public class PigInputFormatSpark extends
 	public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
 			TaskAttemptContext context) throws IOException,
 			InterruptedException {
-        init();
+        initLogger();
         resetUDFContext();
         //PigSplit#conf is the default hadoop configuration, we need get the configuration
         //from context.getConfigration() to retrieve pig properties
@@ -55,14 +59,24 @@ public class PigInputFormatSpark extends
             PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID());
         }
         PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex());
+        // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and
+        // SchemaTupleBackend by reading properties from JobConf
+        initialize(conf);
         return super.createRecordReader(split, context);
     }
 
-	private void resetUDFContext() {
+    private void initialize(Configuration jobConf) throws IOException {
+        MapRedUtil.setupUDFContext(jobConf);
+        PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+        SchemaTupleBackend.initialize(jobConf, pc);
+        PigMapReduce.sJobConfInternal.set(jobConf);
+    }
+
+    private void resetUDFContext() {
 		UDFContext.getUDFContext().reset();
 	}
 
-	private void init() {
+	private void initLogger() {
 		PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
 		pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
 		PhysicalOperator.setPigLogger(pigHadoopLogger);