You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/03/12 22:18:41 UTC
svn commit: r1786618 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark:
SparkLauncher.java converter/ForEachConverter.java
Author: zly
Date: Sun Mar 12 22:18:40 2017
New Revision: 1786618
URL: http://svn.apache.org/viewvc?rev=1786618&view=rev
Log:
PIG-5054: Initialize SchemaTupleBackend correctly in backend in spark mode if spark job has more than 1 stage (Adam via Liyun)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1786618&r1=1786617&r2=1786618&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Sun Mar 12 22:18:40 2017
@@ -194,7 +194,7 @@ public class SparkLauncher extends Launc
convertMap.put(POLoad.class, new LoadConverter(pigContext,
physicalPlan, sparkContext.sc(), jobConf, sparkEngineConf));
convertMap.put(POStore.class, new StoreConverter(jobConf));
- convertMap.put(POForEach.class, new ForEachConverter());
+ convertMap.put(POForEach.class, new ForEachConverter(jobConf));
convertMap.put(POFilter.class, new FilterConverter());
convertMap.put(POPackage.class, new PackageConverter());
convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1786618&r1=1786617&r2=1786618&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Sun Mar 12 22:18:40 2017
@@ -17,18 +17,24 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@@ -38,12 +44,22 @@ import org.apache.spark.rdd.RDD;
@SuppressWarnings({"serial" })
public class ForEachConverter implements RDDConverter<Tuple, Tuple, POForEach> {
+ private JobConf jobConf;
+
+ public ForEachConverter(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POForEach physicalOperator) {
+
+ byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
- ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
+ ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, confBytes);
+
return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
}
@@ -51,12 +67,18 @@ public class ForEachConverter implements
FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
private POForEach poForEach;
+ private byte[] confBytes;
+ private transient JobConf jobConf;
- private ForEachFunction(POForEach poForEach) {
+ private ForEachFunction(POForEach poForEach, byte[] confBytes) {
this.poForEach = poForEach;
+ this.confBytes = confBytes;
}
public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+ initialize();
+
// Initialize a reporter as the UDF might want to report progress.
PhysicalOperator.setReporter(new ProgressableReporter());
PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
@@ -94,5 +116,17 @@ public class ForEachConverter implements
}
};
}
+
+ private void initialize() {
+ if (this.jobConf == null) {
+ try {
+ this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+ PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+ SchemaTupleBackend.initialize(jobConf, pc);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't initialize ForEachConverter");
+ }
+ }
+ }
}
}