You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/08/24 16:04:01 UTC
svn commit: r1757545 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark:
./ converter/ running/
Author: xuefu
Date: Wed Aug 24 16:04:01 2016
New Revision: 1757545
URL: http://svn.apache.org/viewvc?rev=1757545&view=rev
Log:
PIG-4970: Remove the deserialize and serialization of JobConf in code for spark mode (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Aug 24 16:04:01 2016
@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -41,7 +40,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigConfiguration;
-import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -70,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter;
@@ -110,7 +107,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
@@ -154,7 +150,7 @@ public class SparkLauncher extends Launc
if (LOG.isDebugEnabled())
LOG.debug(physicalPlan);
this.pigContext = pigContext;
- initialize();
+ initialize(physicalPlan);
SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
if (LOG.isDebugEnabled())
explain(sparkplan, System.out, "text", true);
@@ -177,44 +173,33 @@ public class SparkLauncher extends Launc
.normalize().toString()
+ "/";
-
- LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
- physicalPlan, POStore.class);
- POStore firstStore = stores.getFirst();
- if (firstStore != null) {
- MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext,
- jobConf);
- }
-
new ParallelismSetter(sparkplan, jobConf).visit();
- byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
-
SparkPigStatusReporter.getInstance().setCounters(new SparkCounters(sparkContext));
// Create conversion map, mapping between pig operator and spark convertor
Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
= new HashMap<Class<? extends PhysicalOperator>, RDDConverter>();
convertMap.put(POLoad.class, new LoadConverter(pigContext,
- physicalPlan, sparkContext.sc()));
- convertMap.put(POStore.class, new StoreConverter(pigContext));
- convertMap.put(POForEach.class, new ForEachConverter(confBytes));
- convertMap.put(POFilter.class, new FilterConverter(confBytes));
- convertMap.put(POPackage.class, new PackageConverter(confBytes));
+ physicalPlan, sparkContext.sc(), jobConf));
+ convertMap.put(POStore.class, new StoreConverter(jobConf));
+ convertMap.put(POForEach.class, new ForEachConverter());
+ convertMap.put(POFilter.class, new FilterConverter());
+ convertMap.put(POPackage.class, new PackageConverter());
convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
- convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter(confBytes));
+ convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter());
convertMap.put(POLimit.class, new LimitConverter());
convertMap.put(PODistinct.class, new DistinctConverter());
convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
convertMap.put(POSort.class, new SortConverter());
convertMap.put(POSplit.class, new SplitConverter());
convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
- convertMap.put(POMergeJoin.class, new MergeJoinConverter(confBytes));
+ convertMap.put(POMergeJoin.class, new MergeJoinConverter());
convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
convertMap.put(POCounter.class, new CounterConverter());
convertMap.put(PORank.class, new RankConverter());
- convertMap.put(POStream.class, new StreamConverter(confBytes));
+ convertMap.put(POStream.class, new StreamConverter());
convertMap.put(POFRJoin.class, new FRJoinConverter());
convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
convertMap.put(POReduceBySpark.class, new ReduceByConverter());
@@ -378,13 +363,12 @@ public class SparkLauncher extends Launc
private void cacheFiles(String cacheFiles) throws IOException {
if (cacheFiles != null) {
- Configuration conf = SparkUtil.newJobConf(pigContext);
for (String file : cacheFiles.split(",")) {
String fileName = extractFileName(file.trim());
Path src = new Path(extractFileUrl(file.trim()));
File tmpFile = File.createTempFile(fileName, ".tmp");
Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
- FileSystem fs = tmpFilePath.getFileSystem(conf);
+ FileSystem fs = tmpFilePath.getFileSystem(jobConf);
fs.copyToLocalFile(src, tmpFilePath);
tmpFile.deleteOnExit();
LOG.info(String.format("cacheFile:%s", fileName));
@@ -659,12 +643,9 @@ public class SparkLauncher extends Launc
pigContext.getProperties().setProperty("spark.udf.import.list", udfImportList);
}
- private void initialize() throws IOException {
+ private void initialize(PhysicalPlan physicalPlan) throws IOException {
saveUdfImportList();
- jobConf = SparkUtil.newJobConf(pigContext);
- jobConf.set(PigConstants.LOCAL_CODE_DIR,
- System.getProperty("java.io.tmpdir"));
-
+ jobConf = SparkUtil.newJobConf(pigContext, physicalPlan);
SchemaTupleBackend.initialize(jobConf, pigContext);
Utils.setDefaultTimeZone(jobConf);
PigMapReduce.sJobConfInternal.set(jobConf);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Wed Aug 24 16:04:01 2016
@@ -19,8 +19,10 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
import scala.Product2;
import scala.Tuple2;
@@ -30,6 +32,7 @@ import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigConstants;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
@@ -37,7 +40,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -45,7 +51,6 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.UDFContext;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.rdd.RDD;
@@ -66,7 +71,7 @@ public class SparkUtil {
return (ClassTag<Product2<K, V>>) (Object) getManifest(Product2.class);
}
- public static JobConf newJobConf(PigContext pigContext) throws IOException {
+ public static JobConf newJobConf(PigContext pigContext, PhysicalPlan physicalPlan) throws IOException {
JobConf jobConf = new JobConf(
ConfigurationUtil.toConfiguration(pigContext.getProperties()));
// Serialize the PigContext so it's available in Executor thread.
@@ -74,9 +79,20 @@ public class SparkUtil {
// Serialize the thread local variable inside PigContext separately
jobConf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
- UDFContext.getUDFContext().serialize(jobConf);
Random rand = new Random();
jobConf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, Integer.toString(rand.nextInt()));
+ jobConf.set(PigConstants.LOCAL_CODE_DIR,
+ System.getProperty("java.io.tmpdir"));
+ jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
+ jobConf.set(PigConstants.TASK_INDEX, "0");
+
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+ physicalPlan, POStore.class);
+ POStore firstStore = stores.getFirst();
+ if (firstStore != null) {
+ MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext,
+ jobConf);
+ }
return jobConf;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java Wed Aug 24 16:04:01 2016
@@ -17,28 +17,17 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
-import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-import java.util.UUID;
import scala.runtime.AbstractFunction1;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.PigConstants;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.rdd.RDD;
/**
@@ -47,18 +36,12 @@ import org.apache.spark.rdd.RDD;
@SuppressWarnings({ "serial" })
public class FilterConverter implements RDDConverter<Tuple, Tuple, POFilter> {
- private byte[] confBytes;
-
- public FilterConverter(byte[] confBytes) {
- this.confBytes = confBytes;
- }
-
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POFilter physicalOperator) {
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
- FilterFunction filterFunction = new FilterFunction(physicalOperator, confBytes);
+ FilterFunction filterFunction = new FilterFunction(physicalOperator);
return rdd.filter(filterFunction);
}
@@ -66,17 +49,13 @@ public class FilterConverter implements
AbstractFunction1<Tuple, Object> implements Serializable {
private POFilter poFilter;
- private byte[] confBytes;
- private transient JobConf jobConf;
- private FilterFunction(POFilter poFilter, byte[] confBytes) {
+ private FilterFunction(POFilter poFilter) {
this.poFilter = poFilter;
- this.confBytes = confBytes;
}
@Override
public Boolean apply(Tuple v1) {
- initializeJobConf();
Result result;
try {
poFilter.setInputs(null);
@@ -101,24 +80,5 @@ public class FilterConverter implements
"Unexpected response code from filter: " + result);
}
}
-
- void initializeJobConf() {
- if (this.jobConf != null) {
- return;
- }
- this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
- PigMapReduce.sJobConfInternal.set(jobConf);
- try {
- MapRedUtil.setupUDFContext(jobConf);
- PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
- SchemaTupleBackend.initialize(jobConf, pc);
- // Although Job ID and task index are not really applicable for spark,
- // set them here to overcome PIG-4827
- jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
- jobConf.set(PigConstants.TASK_INDEX, "0");
- } catch (IOException ioe) {
- throw new RuntimeException("Problem while configuring UDFContext from FilterConverter.", ioe);
- }
- }
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Wed Aug 24 16:04:01 2016
@@ -17,29 +17,18 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
-import java.util.UUID;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.PigConstants;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
@@ -49,18 +38,12 @@ import org.apache.spark.rdd.RDD;
@SuppressWarnings({"serial" })
public class ForEachConverter implements RDDConverter<Tuple, Tuple, POForEach> {
- private byte[] confBytes;
-
- public ForEachConverter(byte[] confBytes) {
- this.confBytes = confBytes;
- }
-
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POForEach physicalOperator) {
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
- ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, this.confBytes);
+ ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
}
@@ -68,34 +51,12 @@ public class ForEachConverter implements
FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
private POForEach poForEach;
- private byte[] confBytes;
- private transient JobConf jobConf;
- private ForEachFunction(POForEach poForEach, byte[] confBytes) {
+ private ForEachFunction(POForEach poForEach) {
this.poForEach = poForEach;
- this.confBytes = confBytes;
- }
-
- void initializeJobConf() {
- if (this.jobConf != null) {
- return;
- }
- this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
- PigMapReduce.sJobConfInternal.set(jobConf);
- try {
- MapRedUtil.setupUDFContext(jobConf);
- PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
- SchemaTupleBackend.initialize(jobConf, pc);
- jobConf.set(MRConfiguration.JOB_ID, UUID.randomUUID().toString());
- jobConf.set(PigConstants.TASK_INDEX, "0");
- } catch (IOException ioe) {
- String msg = "Problem while configuring UDFContext from ForEachConverter.";
- throw new RuntimeException(msg, ioe);
- }
}
public Iterable<Tuple> call(final Iterator<Tuple> input) {
- initializeJobConf();
// Initialize a reporter as the UDF might want to report progress.
PhysicalOperator.setReporter(new ProgressableReporter());
PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Wed Aug 24 16:04:01 2016
@@ -33,14 +33,11 @@ import scala.runtime.AbstractFunction1;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
@@ -58,11 +55,6 @@ import org.apache.spark.rdd.RDD;
public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> {
private static final Log LOG = LogFactory
.getLog(JoinGroupSparkConverter.class);
- private byte[] confBytes;
-
- public JoinGroupSparkConverter(byte[] confBytes) {
- this.confBytes = confBytes;
- }
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark op) throws IOException {
@@ -92,7 +84,7 @@ public class JoinGroupSparkConverter imp
RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
(RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
- return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp, this.confBytes)).rdd();
+ return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp)).rdd();
}
}
@@ -317,26 +309,13 @@ public class JoinGroupSparkConverter imp
Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
private final POPackage pkgOp;
- private byte[] confBytes;
- private JobConf jobConf = null;
- public GroupPkgFunction(POPackage pkgOp, byte[] confBytes) {
+ public GroupPkgFunction(POPackage pkgOp) {
this.pkgOp = pkgOp;
- this.confBytes = confBytes;
- }
-
- void initializeJobConf() {
- jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
- jobConf.set("pig.cachedbag.type", "default");
- PigMapReduce.sJobConfInternal.set(jobConf);
}
@Override
public Tuple call(final Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
- if( jobConf == null) {
- initializeJobConf();
- }
-
try {
if (LOG.isDebugEnabled()) {
LOG.debug("GroupPkgFunction in " + input);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Aug 24 16:04:01 2016
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
@@ -67,26 +68,20 @@ public class LoadConverter implements RD
private PigContext pigContext;
private PhysicalPlan physicalPlan;
private SparkContext sparkContext;
+ private JobConf jobConf;
public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan,
- SparkContext sparkContext) {
+ SparkContext sparkContext, JobConf jobConf) {
this.pigContext = pigContext;
this.physicalPlan = physicalPlan;
this.sparkContext = sparkContext;
+ this.jobConf = jobConf;
}
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessorRdds, POLoad op)
throws IOException {
-
- // This configuration will be "broadcasted" by Spark, one to every
- // node. Since we are changing the config here, the safe approach is
- // to create a new conf for a new RDD.
- JobConf jobConf = SparkUtil.newJobConf(pigContext);
configureLoader(physicalPlan, op, jobConf);
- // need to serialize the configuration loaded in jobConf
- // to make sure we can access the right config later
- UDFContext.getUDFContext().serialize(jobConf);
// Set the input directory for input formats that are backed by a
// filesystem. (Does not apply to HBase, for example).
@@ -99,6 +94,13 @@ public class LoadConverter implements RD
jobConf.set("pig.noSplitCombination", "true");
}
+
+ //serialize the UDFContext#udfConfs in jobConf
+ UDFContext.getUDFContext().serialize(jobConf);
+
+ //SparkContext.newAPIHadoop will broadcast the jobConf to other worker nodes.
+ //Later in PigInputFormatSpark#createRecordReader, jobConf will be used to
+ //initialize PigContext,UDFContext and SchemaTupleBackend.
RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD(
jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
@@ -216,11 +218,11 @@ public class LoadConverter implements RD
inpSignatures.add(poLoad.getSignature());
inpLimits.add(poLoad.getLimit());
- jobConf.set("pig.inputs", ObjectSerializer.serialize(pigInputs));
- jobConf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
- jobConf.set("pig.inpSignatures",
+ jobConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(pigInputs));
+ jobConf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
+ jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
ObjectSerializer.serialize(inpSignatures));
- jobConf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+ jobConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));
return jobConf;
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java Wed Aug 24 16:04:01 2016
@@ -22,13 +22,9 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -39,18 +35,13 @@ import org.apache.spark.rdd.RDD;
public class MergeJoinConverter implements
RDDConverter<Tuple, Tuple, POMergeJoin> {
- private byte[] confBytes;
- public MergeJoinConverter(byte[] confBytes) {
- this.confBytes = confBytes;
- }
-
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POMergeJoin poMergeJoin) throws IOException {
SparkUtil.assertPredecessorSize(predecessors, poMergeJoin, 1);
RDD<Tuple> rdd = predecessors.get(0);
- MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin, confBytes);
+ MergeJoinFunction mergeJoinFunction = new MergeJoinFunction(poMergeJoin);
return rdd.toJavaRDD().mapPartitions(mergeJoinFunction, true).rdd();
}
@@ -59,24 +50,12 @@ public class MergeJoinConverter implemen
FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
private POMergeJoin poMergeJoin;
- private byte[] confBytes;
-
- private transient JobConf jobConf;
- private MergeJoinFunction(POMergeJoin poMergeJoin, byte[] confBytes) {
+ private MergeJoinFunction(POMergeJoin poMergeJoin) {
this.poMergeJoin = poMergeJoin;
- this.confBytes = confBytes;
- }
-
- void initializeJobConf() {
- if (this.jobConf == null) {
- this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
- PigMapReduce.sJobConfInternal.set(jobConf);
- }
}
public Iterable<Tuple> call(final Iterator<Tuple> input) {
- initializeJobConf();
return new Iterable<Tuple>() {
@Override
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java Wed Aug 24 16:04:01 2016
@@ -26,13 +26,10 @@ import scala.runtime.AbstractFunction1;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
@@ -43,13 +40,6 @@ import org.apache.spark.rdd.RDD;
public class PackageConverter implements RDDConverter<Tuple, Tuple, POPackage> {
private static final Log LOG = LogFactory.getLog(PackageConverter.class);
- private transient JobConf jobConf;
- private byte[] confBytes;
-
- public PackageConverter(byte[] confBytes) {
- this.confBytes = confBytes;
- }
-
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POPackage physicalOperator) throws IOException {
@@ -57,7 +47,7 @@ public class PackageConverter implements
RDD<Tuple> rdd = predecessors.get(0);
// package will generate the group from the result of the local
// rearrange
- return rdd.map(new PackageFunction(physicalOperator, this.confBytes),
+ return rdd.map(new PackageFunction(physicalOperator),
SparkUtil.getManifest(Tuple.class));
}
@@ -65,23 +55,13 @@ public class PackageConverter implements
AbstractFunction1<Tuple, Tuple> implements Serializable {
private final POPackage physicalOperator;
- private byte[] confBytes;
- public PackageFunction(POPackage physicalOperator, byte[] confBytes) {
+ public PackageFunction(POPackage physicalOperator) {
this.physicalOperator = physicalOperator;
- this.confBytes = confBytes;
- }
-
- void initializeJobConf() {
- JobConf jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
- jobConf.set("pig.cachedbag.type", "default");
- PigMapReduce.sJobConfInternal.set(jobConf);
}
@Override
public Tuple apply(final Tuple t) {
- initializeJobConf();
-
// (key, Seq<Tuple>:{(index, key, value without key)})
if (LOG.isDebugEnabled())
LOG.debug("PackageFunction in " + t);
@@ -159,7 +139,6 @@ public class PackageConverter implements
LOG.debug("PackageFunction out " + out);
return out;
}
-
}
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java Wed Aug 24 16:04:01 2016
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
@@ -43,7 +42,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
@@ -59,12 +57,11 @@ import com.google.common.collect.Lists;
public class StoreConverter implements
RDDConverter<Tuple, Tuple2<Text, Tuple>, POStore> {
- private static final Log LOG = LogFactory.getLog(StoreConverter.class);
+ private static final Log LOG = LogFactory.getLog(StoreConverter.class);
- private PigContext pigContext;
-
- public StoreConverter(PigContext pigContext) {
- this.pigContext = pigContext;
+ private JobConf jobConf = null;
+ public StoreConverter(JobConf jobConf) {
+ this.jobConf = jobConf;
}
@Override
@@ -84,7 +81,6 @@ public class StoreConverter implements
rddPairs.rdd(), SparkUtil.getManifest(Text.class),
SparkUtil.getManifest(Tuple.class), null);
- JobConf jobConf = SparkUtil.newJobConf(pigContext);
POStore poStore = configureStorer(jobConf, op);
if ("true".equalsIgnoreCase(jobConf
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java Wed Aug 24 16:04:01 2016
@@ -22,69 +22,35 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
public class StreamConverter implements
RDDConverter<Tuple, Tuple, POStream> {
- private byte[] confBytes;
-
- public StreamConverter(byte[] confBytes) {
- this.confBytes = confBytes;
- }
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POStream poStream) throws IOException {
SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
RDD<Tuple> rdd = predecessors.get(0);
- StreamFunction streamFunction = new StreamFunction(poStream, confBytes);
+ StreamFunction streamFunction = new StreamFunction(poStream);
return rdd.toJavaRDD().mapPartitions(streamFunction, true).rdd();
}
private static class StreamFunction implements
FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
private POStream poStream;
- private transient JobConf jobConf;
- private byte[] confBytes;
- private StreamFunction(POStream poStream, byte[] confBytes) {
+ private StreamFunction(POStream poStream) {
this.poStream = poStream;
- this.confBytes = confBytes;
- }
-
- void initializeJobConf() {
- if (this.jobConf == null) {
- this.jobConf = KryoSerializer
- .deserializeJobConf(this.confBytes);
- PigMapReduce.sJobConfInternal.set(jobConf);
- try {
- MapRedUtil.setupUDFContext(jobConf);
- PigContext pc = (PigContext) ObjectSerializer
- .deserialize(jobConf.get("pig.pigContext"));
- SchemaTupleBackend.initialize(jobConf, pc);
-
- } catch (IOException ioe) {
- String msg = "Problem while configuring UDFContext from ForEachConverter.";
- throw new RuntimeException(msg, ioe);
- }
- }
}
public Iterable<Tuple> call(final Iterator<Tuple> input) {
- initializeJobConf();
return new Iterable<Tuple>() {
@Override
public Iterator<Tuple> iterator() {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1757545&r1=1757544&r2=1757545&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed Aug 24 16:04:01 2016
@@ -31,8 +31,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -42,7 +46,7 @@ public class PigInputFormatSpark extends
public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
- init();
+ initLogger();
resetUDFContext();
//PigSplit#conf is the default hadoop configuration, we need get the configuration
//from context.getConfigration() to retrieve pig properties
@@ -55,14 +59,24 @@ public class PigInputFormatSpark extends
PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID());
}
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex());
+ // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and
+ // SchemaTupleBackend by reading properties from JobConf
+ initialize(conf);
return super.createRecordReader(split, context);
}
- private void resetUDFContext() {
+ private void initialize(Configuration jobConf) throws IOException {
+ MapRedUtil.setupUDFContext(jobConf);
+ PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+ SchemaTupleBackend.initialize(jobConf, pc);
+ PigMapReduce.sJobConfInternal.set(jobConf);
+ }
+
+ private void resetUDFContext() {
UDFContext.getUDFContext().reset();
}
- private void init() {
+ private void initLogger() {
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
PhysicalOperator.setPigLogger(pigHadoopLogger);