You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/26 14:20:15 UTC

svn commit: r1641806 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine: physicalLayer/expressionOperators/ physicalLayer/relationalOperators/ spark/ spark/converter/ spark/running/

Author: praveen
Date: Wed Nov 26 13:20:14 2014
New Revision: 1641806

URL: http://svn.apache.org/r1641806
Log:
PIG-4232: UDFContext not initialized in executors with spark cluster (liyunzhang via praveen)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Wed Nov 26 13:20:14 2014
@@ -120,6 +120,10 @@ public class POUserFunc extends Expressi
         instantiateFunc(funcSpec);
     }
 
+    public void setFuncInputSchema(){
+        setFuncInputSchema(signature);
+    }
+
     private void instantiateFunc(FuncSpec fSpec) {
         this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
         this.setSignature(signature);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Nov 26 13:20:14 2014
@@ -781,4 +781,9 @@ public class POForEach extends PhysicalO
             return (Tuple) out;
         }
     }
+
+    public PhysicalOperator[] getPlanLeafOps() {
+        return planLeafOps;
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Nov 26 13:20:14 2014
@@ -128,7 +128,7 @@ public class SparkLauncher extends Launc
         convertMap.put(POLoad.class, new LoadConverter(pigContext,
                 physicalPlan, sparkContext));
         convertMap.put(POStore.class, new StoreConverter(pigContext));
-        convertMap.put(POForEach.class, new ForEachConverter());
+        convertMap.put(POForEach.class, new ForEachConverter(confBytes));
         convertMap.put(POFilter.class, new FilterConverter());
         convertMap.put(POPackage.class, new PackageConverter(confBytes
         ));

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java Wed Nov 26 13:20:14 2014
@@ -1,29 +1,46 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.RDD;
 
 /**
  * Convert that is able to convert an RRD to another RRD using a POForEach
  */
-@SuppressWarnings({ "serial" })
+@SuppressWarnings({"serial" })
 public class ForEachConverter implements POConverter<Tuple, Tuple, POForEach> {
+    private byte[] confBytes;
+
+    public ForEachConverter(byte[] confBytes) {
+        this.confBytes = confBytes;
+    }
 
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
             POForEach physicalOperator) {
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
-        ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
+        ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, this.confBytes);
         return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
     }
 
@@ -31,12 +48,42 @@ public class ForEachConverter implements
             FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
 
         private POForEach poForEach;
+        private byte[] confBytes;
+        private transient JobConf jobConf;
 
-        private ForEachFunction(POForEach poForEach) {
+        private ForEachFunction(POForEach poForEach, byte[] confBytes) {
             this.poForEach = poForEach;
+            this.confBytes = confBytes;
+        }
+
+        void initializeJobConf() {
+            if (this.jobConf == null) {
+                this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+                PigMapReduce.sJobConfInternal.set(jobConf);
+                try {
+                    MapRedUtil.setupUDFContext(jobConf);
+                    PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+                    SchemaTupleBackend.initialize(jobConf, pc);
+
+                } catch (IOException ioe) {
+                    String msg = "Problem while configuring UDFContext from ForEachConverter.";
+                    throw new RuntimeException(msg, ioe);
+                }
+            }
         }
 
         public Iterable<Tuple> call(final Iterator<Tuple> input) {
+            initializeJobConf();
+            PhysicalOperator[] planLeafOps= poForEach.getPlanLeafOps();
+            if (planLeafOps != null) {
+                for (PhysicalOperator op : planLeafOps) {
+                    if (op.getClass() == POUserFunc.class) {
+                        POUserFunc udf = (POUserFunc) op;
+                          udf.setFuncInputSchema();
+                    }
+                }
+            }
+
 
             return new Iterable<Tuple>() {
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1641806&r1=1641805&r2=1641806&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Nov 26 13:20:14 2014
@@ -1,33 +1,36 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.running.PigInputFormatSpark;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
-
+import com.google.common.collect.Lists;
 import scala.Function1;
 import scala.Tuple2;
 import scala.runtime.AbstractFunction1;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.SparkContext;
-
-import com.google.common.collect.Lists;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 /**
  * Converter that loads data via POLoad and converts it to RRD&lt;Tuple>. Abuses
  * the interface a bit in that there is no inoput RRD to convert in this case.
@@ -38,7 +41,7 @@ import com.google.common.collect.Lists;
 public class LoadConverter implements POConverter<Tuple, Tuple, POLoad> {
 
     private static final ToTupleFunction TO_TUPLE_FUNCTION = new ToTupleFunction();
-
+    private static Log log = LogFactory.getLog(LoadConverter.class);
     private PigContext pigContext;
     private PhysicalPlan physicalPlan;
     private SparkContext sparkContext;
@@ -63,14 +66,30 @@ public class LoadConverter implements PO
 
         // don't know why but just doing this cast for now
         RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopFile(
-                poLoad.getLFile().getFileName(), PigInputFormat.class,
+                poLoad.getLFile().getFileName(), PigInputFormatSpark.class,
                 Text.class, Tuple.class, loadJobConf);
 
+        registerUdfFiles();
         // map to get just RDD<Tuple>
         return hadoopRDD.map(TO_TUPLE_FUNCTION,
                 SparkUtil.getManifest(Tuple.class));
     }
 
+    private void registerUdfFiles() {
+        Map<String, File> scriptFiles = pigContext.getScriptFiles();
+        for (Map.Entry<String, File> scriptFile : scriptFiles.entrySet()) {
+            try {
+                File script = scriptFile.getValue();
+                if (script.exists()) {
+                    sparkContext.addFile(script.toURI().toURL().toExternalForm());
+                }
+            } catch (MalformedURLException e) {
+                String msg = "Problem while registering UDF jars and files in LoadConverter.";
+                throw new RuntimeException(msg, e);
+            }
+        }
+    }
+
     private static class ToTupleFunction extends
             AbstractFunction1<Tuple2<Text, Tuple>, Tuple> implements
             Function1<Tuple2<Text, Tuple>, Tuple>, Serializable {

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1641806&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Wed Nov 26 13:20:14 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.running;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+import java.io.IOException;
+
+public class PigInputFormatSpark extends PigInputFormat {
+	@Override
+	public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+			TaskAttemptContext context) throws IOException,
+			InterruptedException {
+		init();
+		resetUDFContext();
+		return super.createRecordReader(split, context);
+	}
+
+	private void resetUDFContext() {
+		UDFContext.getUDFContext().reset();
+	}
+
+	private void init() {
+		PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
+		PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+		pigHadoopLogger.setReporter(pigStatusReporter);
+		PhysicalOperator.setPigLogger(pigHadoopLogger);
+	}
+}