You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/26 14:20:15 UTC
svn commit: r1641806 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine:
physicalLayer/expressionOperators/ physicalLayer/relationalOperators/
spark/ spark/converter/ spark/running/
Author: praveen
Date: Wed Nov 26 13:20:14 2014
New Revision: 1641806
URL: http://svn.apache.org/r1641806
Log:
PIG-4232: UDFContext not initialized in executors with spark cluster (liyunzhang via praveen)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Wed Nov 26 13:20:14 2014
@@ -120,6 +120,10 @@ public class POUserFunc extends Expressi
instantiateFunc(funcSpec);
}
+ public void setFuncInputSchema(){
+ setFuncInputSchema(signature);
+ }
+
private void instantiateFunc(FuncSpec fSpec) {
this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
this.setSignature(signature);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Nov 26 13:20:14 2014
@@ -781,4 +781,9 @@ public class POForEach extends PhysicalO
return (Tuple) out;
}
}
+
+ public PhysicalOperator[] getPlanLeafOps() {
+ return planLeafOps;
+ }
+
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Nov 26 13:20:14 2014
@@ -128,7 +128,7 @@ public class SparkLauncher extends Launc
convertMap.put(POLoad.class, new LoadConverter(pigContext,
physicalPlan, sparkContext));
convertMap.put(POStore.class, new StoreConverter(pigContext));
- convertMap.put(POForEach.class, new ForEachConverter());
+ convertMap.put(POForEach.class, new ForEachConverter(confBytes));
convertMap.put(POFilter.class, new FilterConverter());
convertMap.put(POPackage.class, new PackageConverter(confBytes
));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Wed Nov 26 13:20:14 2014
@@ -1,29 +1,46 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
/**
* Convert that is able to convert an RRD to another RRD using a POForEach
*/
-@SuppressWarnings({ "serial" })
+@SuppressWarnings({"serial" })
public class ForEachConverter implements POConverter<Tuple, Tuple, POForEach> {
+ private byte[] confBytes;
+
+ public ForEachConverter(byte[] confBytes) {
+ this.confBytes = confBytes;
+ }
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POForEach physicalOperator) {
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
- ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
+ ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, this.confBytes);
return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
}
@@ -31,12 +48,42 @@ public class ForEachConverter implements
FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
private POForEach poForEach;
+ private byte[] confBytes;
+ private transient JobConf jobConf;
- private ForEachFunction(POForEach poForEach) {
+ private ForEachFunction(POForEach poForEach, byte[] confBytes) {
this.poForEach = poForEach;
+ this.confBytes = confBytes;
+ }
+
+ void initializeJobConf() {
+ if (this.jobConf == null) {
+ this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+ PigMapReduce.sJobConfInternal.set(jobConf);
+ try {
+ MapRedUtil.setupUDFContext(jobConf);
+ PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+ SchemaTupleBackend.initialize(jobConf, pc);
+
+ } catch (IOException ioe) {
+ String msg = "Problem while configuring UDFContext from ForEachConverter.";
+ throw new RuntimeException(msg, ioe);
+ }
+ }
}
public Iterable<Tuple> call(final Iterator<Tuple> input) {
+ initializeJobConf();
+ PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
+ if (planLeafOps != null) {
+ for (PhysicalOperator op : planLeafOps) {
+ if (op.getClass() == POUserFunc.class) {
+ POUserFunc udf = (POUserFunc) op;
+ udf.setFuncInputSchema();
+ }
+ }
+ }
+
return new Iterable<Tuple>() {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Nov 26 13:20:14 2014
@@ -1,33 +1,36 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
-
+import com.google.common.collect.Lists;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import org.apache.spark.rdd.RDD;
import org.apache.spark.SparkContext;
-
-import com.google.common.collect.Lists;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Converter that loads data via POLoad and converts it to RRD<Tuple>. Abuses
* the interface a bit in that there is no inoput RRD to convert in this case.
@@ -38,7 +41,7 @@ import com.google.common.collect.Lists;
public class LoadConverter implements POConverter<Tuple, Tuple, POLoad> {
private static final ToTupleFunction TO_TUPLE_FUNCTION = new ToTupleFunction();
-
+ private static Log log = LogFactory.getLog(LoadConverter.class);
private PigContext pigContext;
private PhysicalPlan physicalPlan;
private SparkContext sparkContext;
@@ -63,14 +66,30 @@ public class LoadConverter implements PO
// don't know why but just doing this cast for now
RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopFile(
- poLoad.getLFile().getFileName(), PigInputFormat.class,
+ poLoad.getLFile().getFileName(), PigInputFormatSpark.class,
Text.class, Tuple.class, loadJobConf);
+ registerUdfFiles();
// map to get just RDD<Tuple>
return hadoopRDD.map(TO_TUPLE_FUNCTION,
SparkUtil.getManifest(Tuple.class));
}
+ private void registerUdfFiles() {
+ Map<String, File> scriptFiles = pigContext.getScriptFiles();
+ for (Map.Entry<String, File> scriptFile : scriptFiles.entrySet()) {
+ try {
+ File script = scriptFile.getValue();
+ if (script.exists()) {
+ sparkContext.addFile(script.toURI().toURL().toExternalForm());
+ }
+ } catch (MalformedURLException e) {
+ String msg = "Problem while registering UDF jars and files in LoadConverter.";
+ throw new RuntimeException(msg, e);
+ }
+ }
+ }
+
private static class ToTupleFunction extends
AbstractFunction1<Tuple2<Text, Tuple>, Tuple> implements
Function1<Tuple2<Text, Tuple>, Tuple>, Serializable {
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1641806&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed Nov 26 13:20:14 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.running;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+import java.io.IOException;
+
+public class PigInputFormatSpark extends PigInputFormat {
+ @Override
+ public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+ TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ init();
+ resetUDFContext();
+ return super.createRecordReader(split, context);
+ }
+
+ private void resetUDFContext() {
+ UDFContext.getUDFContext().reset();
+ }
+
+ private void init() {
+ PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setReporter(pigStatusReporter);
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
+ }
+}