You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/03/12 22:18:41 UTC

svn commit: r1786618 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: SparkLauncher.java converter/ForEachConverter.java

Author: zly
Date: Sun Mar 12 22:18:40 2017
New Revision: 1786618

URL: http://svn.apache.org/viewvc?rev=1786618&view=rev
Log:
PIG-5054: Initialize SchemaTupleBackend correctly in backend in spark mode if spark job has more than 1 stage (Adam via Liyun)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1786618&r1=1786617&r2=1786618&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Sun Mar 12 22:18:40 2017
@@ -194,7 +194,7 @@ public class SparkLauncher extends Launc
         convertMap.put(POLoad.class, new LoadConverter(pigContext,
                 physicalPlan, sparkContext.sc(), jobConf, sparkEngineConf));
         convertMap.put(POStore.class, new StoreConverter(jobConf));
-        convertMap.put(POForEach.class, new ForEachConverter());
+        convertMap.put(POForEach.class, new ForEachConverter(jobConf));
         convertMap.put(POFilter.class, new FilterConverter());
         convertMap.put(POPackage.class, new PackageConverter());
         convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1786618&r1=1786617&r2=1786618&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Sun Mar 12 22:18:40 2017
@@ -17,18 +17,24 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
@@ -38,12 +44,22 @@ import org.apache.spark.rdd.RDD;
 @SuppressWarnings({"serial" })
 public class ForEachConverter implements RDDConverter<Tuple, Tuple, POForEach> {
 
+    private JobConf jobConf;
+
+    public ForEachConverter(JobConf jobConf) {
+        this.jobConf = jobConf;
+    }
+
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
             POForEach physicalOperator) {
+
+        byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
-        ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
+        ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, confBytes);
+
         return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
     }
 
@@ -51,12 +67,18 @@ public class ForEachConverter implements
             FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
 
         private POForEach poForEach;
+        private byte[] confBytes;
+        private transient JobConf jobConf;
 
-        private ForEachFunction(POForEach poForEach) {
+        private ForEachFunction(POForEach poForEach, byte[] confBytes) {
             this.poForEach = poForEach;
+            this.confBytes = confBytes;
         }
 
         public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+            initialize();
+
             // Initialize a reporter as the UDF might want to report progress.
             PhysicalOperator.setReporter(new ProgressableReporter());
             PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
@@ -94,5 +116,17 @@ public class ForEachConverter implements
                 }
             };
         }
+
+        private void initialize() {
+            if (this.jobConf == null) {
+                try {
+                    this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+                    PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+                    SchemaTupleBackend.initialize(jobConf, pc);
+                } catch (IOException e) {
+                    throw new RuntimeException("Couldn't initialize ForEachConverter");
+                }
+            }
+        }
     }
 }