You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2015/02/23 10:40:27 UTC
svn commit: r1661625 - in /pig/branches/spark/src/org/apache/pig:
backend/hadoop/executionengine/spark/
backend/hadoop/executionengine/spark/operator/
backend/hadoop/executionengine/spark/plan/ tools/pigstats/spark/
Author: praveen
Date: Mon Feb 23 09:40:27 2015
New Revision: 1661625
URL: http://svn.apache.org/r1661625
Log:
PIG-4374: Add SparkPlan in spark package (liyunzhang via praveen)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java?rev=1661625&r1=1661624&r2=1661625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java Mon Feb 23 09:40:27 2015
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig.backend.hadoop.executionengine.spark;
import java.util.UUID;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1661625&r1=1661624&r2=1661625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Feb 23 09:40:27 2015
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig.backend.hadoop.executionengine.spark;
import java.io.File;
@@ -7,6 +24,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -18,10 +36,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.collect.Lists;
-
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,9 +48,6 @@ import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -51,7 +66,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
@@ -69,21 +83,29 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOper;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;
-import org.apache.spark.api.java.JavaSparkContext;
/**
* Main class that launches pig for Spark
@@ -117,13 +139,7 @@ public class SparkLauncher extends Launc
c.set(PigConstants.LOCAL_CODE_DIR, System.getProperty("java.io.tmpdir"));
SchemaTupleBackend.initialize(c, pigContext);
-
- // Code pulled from MapReduceLauncher
- MRCompiler mrCompiler = new MRCompiler(physicalPlan, pigContext);
- mrCompiler.compile();
- MROperPlan plan = mrCompiler.getMRPlan();
- POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
- pkgAnnotator.visit();
+ SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
if (System.getenv("BROADCAST_PORT") == null
&& System.getenv("BROADCAST_MASTER_IP") == null) {
@@ -182,16 +198,7 @@ public class SparkLauncher extends Launc
convertMap.put(PORank.class, new RankConverter());
convertMap.put(POStreamSpark.class,new StreamConverter(confBytes));
- Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey, RDD<Tuple>>();
-
- Set<Integer> seenJobIDs = new HashSet<Integer>();
- for (POStore poStore : stores) {
- physicalToRDD(physicalPlan, poStore, rdds, convertMap);
- for (int jobID : getJobIDs(seenJobIDs)) {
- SparkStatsUtil.waitForJobAddStats(jobID, poStore,
- jobMetricsListener, sparkContext, sparkStats, c);
- }
- }
+ sparkPlanToRDD(sparkplan,convertMap, sparkStats, c);
cleanUpSparkJob(pigContext,currentDirectoryPath);
sparkStats.finish();
@@ -339,6 +346,17 @@ public class SparkLauncher extends Launc
}
}
+ private SparkOperPlan compile(PhysicalPlan physicalPlan, PigContext pigContext) throws PlanException, IOException, VisitorException {
+ SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan, pigContext);
+ sparkCompiler.compile();
+ SparkOperPlan plan = sparkCompiler.getSparkPlan();
+
+ // optimize key - value handling in package
+ SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator(plan);
+ pkgAnnotator.visit();
+ return plan;
+ }
+
private static void startSparkIfNeeded() throws PigException {
if (sparkContext == null) {
String master = System.getenv("SPARK_MASTER");
@@ -412,26 +430,76 @@ public class SparkLauncher extends Launc
}
}
+ private void sparkPlanToRDD(SparkOperPlan sparkPlan, Map<Class<? extends PhysicalOperator>, POConverter> convertMap, SparkPigStats sparkStats, JobConf c) throws IOException , InterruptedException {
+ Set<Integer> seenJobIDs = new HashSet<Integer>();
+ if (sparkPlan != null) {
+ List<SparkOper> leaves = sparkPlan.getLeaves();
+ Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new HashMap();
+ for (SparkOper leaf : leaves) {
+ Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new HashMap();
+ sparkOperToRDD(sparkPlan, leaf, sparkOpRdds, physicalOpRdds, convertMap, seenJobIDs, sparkStats, c);
+
+ }
+ }
+ }
+
+ private void sparkOperToRDD(SparkOperPlan sparkPlan,
+ SparkOper sparkOper,Map<OperatorKey, RDD<Tuple>> sparkOpRdds, Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
+ Map<Class<? extends PhysicalOperator>, POConverter> convertMap,
+ Set<Integer> seenJobIDs, SparkPigStats sparkStats, JobConf c ) throws IOException, InterruptedException {
+
+ List<SparkOper> predecessors = sparkPlan.getPredecessors(sparkOper);
+ List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+ if (predecessors != null) {
+ for (SparkOper prede : predecessors) {
+ if (sparkOpRdds.get(prede.getOperatorKey()) == null) {
+ sparkOperToRDD(sparkPlan, prede, sparkOpRdds, physicalOpRdds, convertMap,seenJobIDs, sparkStats, c);
+ }
+ predecessorRDDs.add(sparkOpRdds.get(prede.getOperatorKey()));
+ }
+ }
+
+ List<PhysicalOperator> leafPOs = sparkOper.plan.getLeaves();
+ if (leafPOs != null && leafPOs.size() != 1) {
+ throw new IllegalArgumentException(String.format("sparkOper.plan should have 1 leaf, but sparkOper.plan.getLeaves() not equals 1, sparkOper:{}" + sparkOper.name()));
+ } else {
+ PhysicalOperator leafPO = leafPOs.get(0);
+ physicalToRDD(sparkOper.plan, leafPO, physicalOpRdds, predecessorRDDs, convertMap);
+ sparkOpRdds.put(sparkOper.getOperatorKey(),physicalOpRdds.get(leafPO.getOperatorKey()));
+ }
+
+ List<POStore> poStores = PlanHelper.getPhysicalOperators(
+ sparkOper.plan, POStore.class);
+ if( poStores!=null && poStores.size() ==1){
+ POStore poStore = poStores.get(0);
+ for (int jobID : getJobIDs(seenJobIDs)) {
+ SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+ jobMetricsListener, sparkContext, sparkStats, c);
+ }
+ } else{
+ LOG.info(String.format("sparkOper:{} does not have POStore or sparkOper has more than 1 POStore",sparkOper.name()));
+ }
+
+ }
+
private void physicalToRDD(PhysicalPlan plan,
- PhysicalOperator physicalOperator,
- Map<OperatorKey, RDD<Tuple>> rdds,
- Map<Class<? extends PhysicalOperator>, POConverter> convertMap)
+ PhysicalOperator physicalOperator,
+ Map<OperatorKey, RDD<Tuple>> rdds, List<RDD<Tuple>> rddsFromPredeSparkOper,
+ Map<Class<? extends PhysicalOperator>, POConverter> convertMap)
throws IOException {
-
RDD<Tuple> nextRDD = null;
List<PhysicalOperator> predecessors = plan
.getPredecessors(physicalOperator);
List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
if (predecessors != null) {
for (PhysicalOperator predecessor : predecessors) {
- physicalToRDD(plan, predecessor, rdds, convertMap);
+ physicalToRDD(plan, predecessor, rdds,rddsFromPredeSparkOper, convertMap);
predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
}
- }
-
- if( physicalOperator instanceof POStream ){
- POStream poStream = (POStream)physicalOperator;
- physicalOperator = new POStreamSpark(poStream);
+ }else{
+ if( rddsFromPredeSparkOper!=null && rddsFromPredeSparkOper.size()>0 ){
+ predecessorRdds.addAll(rddsFromPredeSparkOper);
+ }
}
POConverter converter = convertMap.get(physicalOperator.getClass());
@@ -445,10 +513,6 @@ public class SparkLauncher extends Launc
+ physicalOperator);
nextRDD = converter.convert(predecessorRdds, physicalOperator);
- if (POStore.class.equals(physicalOperator.getClass())) {
- return;
- }
-
if (nextRDD == null) {
throw new IllegalArgumentException(
"RDD should not be null after PhysicalOperator: "
@@ -460,7 +524,33 @@ public class SparkLauncher extends Launc
@Override
public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
- String format, boolean verbose) throws IOException {
+ String format, boolean verbose) throws IOException {
+ SparkOperPlan sparkPlan = compile(pp, pc);
+ ps.println("#-----------------------------------------------------#");
+ ps.println("# Spark plan is A DAG, the Spark node relations are:");
+ ps.println("#-----------------------------------------------------#");
+ Map<OperatorKey, SparkOper> allOperKeys= sparkPlan.getKeys();
+ List<OperatorKey> operKeyList = new ArrayList(allOperKeys.keySet());
+ Collections.sort(operKeyList);
+ for(OperatorKey operatorKey: operKeyList){
+ SparkOper op = sparkPlan.getOperator(operatorKey);
+ ps.print(op.getOperatorKey());
+ List<SparkOper> successors = sparkPlan.getSuccessors(op);
+ if( successors!=null) {
+ ps.print("->");
+ for (SparkOper suc : successors) {
+ ps.print(suc.getOperatorKey() + " ");
+ }
+ }
+ ps.println();
+ }
+ if (format.equals("text")) {
+ SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
+ printer.setVerbose(verbose);
+ printer.visit();
+ } else { // TODO: add support for other file format
+ throw new IOException("Non-text output of explain is not supported.");
+ }
}
@Override
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1661625&r1=1661624&r2=1661625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Mon Feb 23 09:40:27 2015
@@ -1,27 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.pig.backend.hadoop.executionengine.spark;
+import java.io.IOException;
+import java.util.List;
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
import org.apache.hadoop.mapred.JobConf;
+
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
-import scala.Tuple2;
-import scala.Product2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-//import scala.reflect.ClassManifest;
-//import scala.reflect.ClassManifest$;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
import org.apache.spark.rdd.RDD;
-import java.io.IOException;
-import java.util.List;
-
public class SparkUtil {
public static <T> ClassTag<T> getManifest(Class<T> clazz) {
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOper;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * NativeSparkOper:
+ */
+public class NativeSparkOper extends SparkOper {
+ private static final long serialVersionUID = 1L;
+ private static int countJobs = 0;
+ private String nativeSparkJar;
+ private String[] params;
+ private String jobId;
+
+ public NativeSparkOper(OperatorKey k, String sparkJar, String[] parameters) {
+ super(k);
+ nativeSparkJar = sparkJar;
+ params = parameters;
+ jobId = sparkJar + "_" + getJobNumber();
+ }
+
+ private static int getJobNumber() {
+ countJobs++;
+ return countJobs;
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The compiler that compiles a given physical physicalPlan
+ * into a DAG of Spark operators
+ */
+public class SparkCompiler extends PhyPlanVisitor {
+ private PigContext pigContext;
+
+ //The physicalPlan that is being compiled
+ private PhysicalPlan physicalPlan;
+
+ //The physicalPlan of Spark Operators
+ private SparkOperPlan sparkPlan;
+
+ private SparkOper curSparkOp;
+
+ private String scope;
+
+ private SparkOper[] compiledInputs = null;
+
+ private Map<OperatorKey, SparkOper> splitsSeen;
+
+ private NodeIdGenerator nig;
+
+ private Map<PhysicalOperator,SparkOper> phyToSparkOpMap;
+ private UDFFinder udfFinder;
+
+ public SparkCompiler(PhysicalPlan physicalPlan,
+ PigContext pigContext){
+ super(physicalPlan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(physicalPlan));
+ this.physicalPlan = physicalPlan;
+ this.pigContext = pigContext;
+ this.sparkPlan = new SparkOperPlan();
+ this.phyToSparkOpMap = new HashMap<PhysicalOperator,SparkOper>();
+ this.udfFinder = new UDFFinder();
+ this.nig = NodeIdGenerator.getGenerator();
+ this.splitsSeen = new HashMap<OperatorKey, SparkOper>();
+
+ }
+
+ public void compile() throws IOException, PlanException, VisitorException {
+ List<PhysicalOperator> roots = physicalPlan.getRoots();
+ if((roots == null) || (roots.size() <= 0)) {
+ int errCode = 2053;
+ String msg = "Internal error. Did not find roots in the physical physicalPlan.";
+ throw new SparkCompilerException(msg, errCode, PigException.BUG);
+ }
+ scope = roots.get(0).getOperatorKey().getScope();
+ List<PhysicalOperator> leaves = physicalPlan.getLeaves();
+
+ if (!pigContext.inIllustrator)
+ for (PhysicalOperator op : leaves) {
+ if (!(op instanceof POStore)) {
+ int errCode = 2025;
+ String msg = "Expected leaf of reduce physicalPlan to " +
+ "always be POStore. Found " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ // get all stores and nativeSpark operators, sort them in order(operator id)
+ // and compile their plans
+ List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan, POStore.class);
+ List<PONative> nativeSparks= PlanHelper.getPhysicalOperators(physicalPlan, PONative.class);
+ List<PhysicalOperator> ops;
+ if (!pigContext.inIllustrator) {
+ ops = new ArrayList<PhysicalOperator>(stores.size() + nativeSparks.size());
+ ops.addAll(stores);
+ } else {
+ ops = new ArrayList<PhysicalOperator>(leaves.size() + nativeSparks.size());
+ ops.addAll(leaves);
+ }
+ ops.addAll(nativeSparks);
+ Collections.sort(ops);
+
+ for (PhysicalOperator op : ops) {
+ compile(op);
+ }
+ }
+
+ /**
+ * Compiles the physicalPlan below op into a Spark Operator
+ * and stores it in curSparkOp.
+ * @param op
+ * @throws IOException
+ * @throws PlanException
+ * @throws VisitorException
+ */
+ private void compile(PhysicalOperator op) throws IOException, PlanException, VisitorException {
+ SparkOper[] prevCompInp = compiledInputs;
+
+ List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op);
+ if(op instanceof PONative){
+ // the predecessor (store) has already been processed
+ // don't process it again
+ }
+ else if (predecessors != null && predecessors.size() > 0) {
+ // When processing an entire script (multiquery), we can
+ // get into a situation where a load has
+ // predecessors. This means that it depends on some store
+ // earlier in the physicalPlan. We need to take that dependency
+ // and connect the respective Spark operators, while at the
+ // same time removing the connection between the Physical
+ // operators. That way the jobs will run in the right
+ // order.
+ if (op instanceof POLoad) {
+
+ if (predecessors.size() != 1) {
+ int errCode = 2125;
+ String msg = "Expected at most one predecessor of load. Got "+predecessors.size();
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ PhysicalOperator p = predecessors.get(0);
+ SparkOper oper = null;
+ if(p instanceof POStore || p instanceof PONative){
+ oper = phyToSparkOpMap.get(p);
+ }else{
+ int errCode = 2126;
+ String msg = "Predecessor of load should be a store or spark operator. Got "+p.getClass();
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+
+ // Need new operator
+ curSparkOp = getSparkOp();
+ curSparkOp.add(op);
+ sparkPlan.add(curSparkOp);
+ physicalPlan.disconnect(op, p);
+ sparkPlan.connect(oper, curSparkOp);
+ phyToSparkOpMap.put(op, curSparkOp);
+ return;
+ }
+
+ Collections.sort(predecessors);
+ compiledInputs = new SparkOper[predecessors.size()];
+ int i = -1;
+ for (PhysicalOperator pred : predecessors) {
+ if(pred instanceof POSplit && splitsSeen.containsKey(pred.getOperatorKey())){
+ compiledInputs[++i] = startNew(((POSplit)pred).getSplitStore(), splitsSeen.get(pred.getOperatorKey()));
+ continue;
+ }
+ compile(pred);
+ compiledInputs[++i] = curSparkOp;
+ }
+ } else {
+ //No predecessors. Mostly a load. But this is where
+ //we start. We create a new sparkOp and add its first
+ //operator op. Also this should be added to the sparkPlan.
+ curSparkOp = getSparkOp();
+ curSparkOp.add(op);
+ if (op !=null && op instanceof POLoad)
+ {
+ if (((POLoad)op).getLFile()!=null && ((POLoad)op).getLFile().getFuncSpec()!=null)
+ curSparkOp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString());
+ }
+ sparkPlan.add(curSparkOp);
+ phyToSparkOpMap.put(op, curSparkOp);
+ return;
+ }
+ op.visit(this);
+ compiledInputs = prevCompInp;
+ }
+
+
+ private SparkOper getSparkOp() {
+ return new SparkOper(OperatorKey.genOpKey(scope));
+ }
+
+ public SparkOperPlan getSparkPlan() {
+ return sparkPlan;
+ }
+
+ public void connectSoftLink() throws PlanException, IOException {
+ for (PhysicalOperator op : physicalPlan) {
+ if (physicalPlan.getSoftLinkPredecessors(op)!=null) {
+ for (PhysicalOperator pred : physicalPlan.getSoftLinkPredecessors(op)) {
+ SparkOper from = phyToSparkOpMap.get(pred);
+ SparkOper to = phyToSparkOpMap.get(op);
+ if (from==to)
+ continue;
+ if (sparkPlan.getPredecessors(to)==null || !sparkPlan.getPredecessors(to).contains(from)) {
+ sparkPlan.connect(from, to);
+ }
+ }
+ }
+ }
+ }
+
+ private SparkOper startNew(FileSpec fSpec, SparkOper old) throws PlanException{
+ POLoad ld = getLoad();
+ ld.setLFile(fSpec);
+ SparkOper ret = getSparkOp();
+ ret.add(ld);
+ sparkPlan.add(ret);
+ sparkPlan.connect(old, ret);
+ return ret;
+ }
+
+
+ private POLoad getLoad(){
+ POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ ld.setPc(pigContext);
+ ld.setIsTmpLoad(true);
+ return ld;
+ }
+
+ @Override
+ public void visitSplit(POSplit op) throws VisitorException{
+ try{
+ FileSpec fSpec = op.getSplitStore();
+ SparkOper sparkOp = endSingleInputPlanWithStr(fSpec);
+ sparkOp.setSplitter(true);
+ splitsSeen.put(op.getOperatorKey(), sparkOp);
+ curSparkOp = startNew(fSpec, sparkOp);
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ public void visitDistinct(PODistinct op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ private SparkOper endSingleInputPlanWithStr(FileSpec fSpec) throws PlanException{
+ if(compiledInputs.length>1) {
+ int errCode = 2023;
+ String msg = "Received a multi input physicalPlan when expecting only a single input one.";
+ throw new PlanException(msg, errCode, PigException.BUG);
+ }
+ SparkOper sparkOp = compiledInputs[0]; // Load
+ POStore str = getStore();
+ str.setSFile(fSpec);
+ sparkOp.plan.addAsLeaf(str);
+ return sparkOp;
+ }
+
+ private POStore getStore(){
+ POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ // mark store as tmp store. These could be removed by the
+ // optimizer, because it wasn't the user requesting it.
+ st.setIsTmpStore(true);
+ return st;
+ }
+
+ @Override
+ public void visitLoad(POLoad op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitNative(PONative op) throws VisitorException{
+ try{
+ SparkOper nativesparkOpper = getNativeSparkOp(op.getNativeMRjar(), op.getParams());
+ sparkPlan.add(nativesparkOpper);
+ sparkPlan.connect(curSparkOp, nativesparkOpper);
+ phyToSparkOpMap.put(op, nativesparkOpper);
+ curSparkOp = nativesparkOpper;
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ private NativeSparkOper getNativeSparkOp(String sparkJar, String[] parameters) {
+ return new NativeSparkOper(new OperatorKey(scope,nig.getNextNodeId(scope)), sparkJar, parameters);
+ }
+
+ @Override
+ public void visitStore(POStore op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ phyToSparkOpMap.put(op, curSparkOp);
+ if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null)
+ curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitFilter(POFilter op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ processUDFs(op.getPlan());
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitCross(POCross op) throws VisitorException {
+ try {
+ nonBlocking(op);
+ phyToSparkOpMap.put(op, curSparkOp);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitStream(POStream op) throws VisitorException{
+ try{
+ POStreamSpark poStreamSpark = new POStreamSpark(op);
+ nonBlocking(poStreamSpark);
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitSort(POSort op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitLimit(POLimit op) throws VisitorException {
+ try {
+ nonBlocking(op);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
+ try{
+ nonBlocking(op);
+ List<PhysicalPlan> plans = op.getPlans();
+ if(plans!=null)
+ for(PhysicalPlan ep : plans)
+ processUDFs(ep);
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
+ List<PhysicalOperator> roots = curSparkOp.plan.getRoots();
+ if(roots.size() != 1){
+ int errCode = 2171;
+ String errMsg = "Expected one but found more then one root physical operator in physical physicalPlan.";
+ throw new SparkCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ PhysicalOperator phyOp = roots.get(0);
+ if(! (phyOp instanceof POLoad)){
+ int errCode = 2172;
+ String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
+ throw new SparkCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc();
+ try {
+ if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+ int errCode = 2249;
+ throw new SparkCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", errCode);
+ }
+ ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit();
+ } catch (SparkCompilerException e){
+ throw (e);
+ } catch (IOException e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+
+ try{
+ nonBlocking(op);
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitPOForEach(POForEach op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ List<PhysicalPlan> plans = op.getInputPlans();
+ if (plans != null) {
+ for (PhysicalPlan ep : plans) {
+ processUDFs(ep);
+ }
+ }
+ phyToSparkOpMap.put(op, curSparkOp);
+ } catch (Exception e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
+ try{
+ blocking(op);
+ curSparkOp.customPartitioner = op.getCustomPartitioner();
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitPackage(POPackage op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ phyToSparkOpMap.put(op, curSparkOp);
+ if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) {
+ curSparkOp.markRegularJoin();
+ } else if (op.getPkgr().getPackageType() == Packager.PackageType.GROUP) {
+ if (op.getNumInps() == 1) {
+ curSparkOp.markGroupBy();
+ } else if (op.getNumInps() > 1) {
+ curSparkOp.markCogroup();
+ }
+ }
+
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void visitUnion(POUnion op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ phyToSparkOpMap.put(op, curSparkOp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+
+ @Override
+ public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+ //TODO
+ }
+
+ @Override
+ public void visitFRJoin(POFRJoin op) throws VisitorException {
+ //TODO
+ }
+
+ @Override
+ public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+ //TODO
+ }
+
+ private void processUDFs(PhysicalPlan plan) throws VisitorException{
+ if(plan!=null){
+ //Process Scalars (UDF with referencedOperators)
+ ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
+ scalarPhyFinder.visit();
+ curSparkOp.scalars.addAll(scalarPhyFinder.getScalars());
+
+ //Process UDFs
+ udfFinder.setPlan(plan);
+ udfFinder.visit();
+ curSparkOp.UDFs.addAll(udfFinder.getUDFs());
+ }
+ }
+
+ private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
+ SparkOper sparkOp=null;
+ if (compiledInputs.length == 1) {
+ sparkOp = compiledInputs[0];
+ } else {
+ sparkOp = merge(compiledInputs);
+ }
+ sparkOp.plan.addAsLeaf(op);
+ curSparkOp = sparkOp;
+ }
+
+ private void blocking(PhysicalOperator op) throws PlanException, IOException{
+ SparkOper sparkOp = getSparkOp();
+ sparkPlan.add(sparkOp);
+ for(SparkOper compileInput: compiledInputs){
+ sparkPlan.connect(compileInput, sparkOp);
+ }
+ sparkOp.plan.addAsLeaf(op);
+ curSparkOp = sparkOp;
+ }
+
+ private SparkOper merge(SparkOper[] compiledInputs)throws PlanException {
+ SparkOper ret = getSparkOp();
+ sparkPlan.add(ret);
+
+ Set<SparkOper> toBeConnected = new HashSet<SparkOper>();
+ List<SparkOper> toBeRemoved = new ArrayList<SparkOper>();
+
+ List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
+
+ for (SparkOper sparkOp : compiledInputs) {
+ toBeRemoved.add(sparkOp);
+ toBeMerged.add(sparkOp.plan);
+ List<SparkOper> predecessors = sparkPlan.getPredecessors(sparkOp);
+ if( predecessors != null){
+ for( SparkOper predecessorSparkOp: predecessors){
+ toBeConnected.add(predecessorSparkOp);
+ }
+ }
+ }
+ merge(ret.plan, toBeMerged);
+
+ Iterator<SparkOper> it = toBeConnected.iterator();
+ while(it.hasNext())
+ sparkPlan.connect(it.next(), ret);
+ for(SparkOper removeSparkOp : toBeRemoved){
+ if(removeSparkOp.requestedParallelism > ret.requestedParallelism)
+ ret.requestedParallelism = removeSparkOp.requestedParallelism;
+ for (String udf:removeSparkOp.UDFs)
+ {
+ if (!ret.UDFs.contains(udf))
+ ret.UDFs.add(udf);
+ }
+ // We also need to change scalar marking
+ for(PhysicalOperator physOp: removeSparkOp.scalars) {
+ if(!ret.scalars.contains(physOp)) {
+ ret.scalars.add(physOp);
+ }
+ }
+ Set<PhysicalOperator> opsToChange = new HashSet<PhysicalOperator>();
+ for (Map.Entry<PhysicalOperator, SparkOper> entry : phyToSparkOpMap.entrySet()) {
+ if (entry.getValue()==removeSparkOp) {
+ opsToChange.add(entry.getKey());
+ }
+ }
+ for (PhysicalOperator op : opsToChange) {
+ phyToSparkOpMap.put(op, ret);
+ }
+
+ sparkPlan.remove(removeSparkOp);
+ }
+ return ret;
+ }
+
+ /**
+ * The merge of a list of plans into a single physicalPlan
+ * @param <O>
+ * @param <E>
+ * @param finPlan - Final Plan into which the list of plans is merged
+ * @param plans - list of plans to be merged
+ * @throws PlanException
+ */
+ private <O extends Operator<?>, E extends OperatorPlan<O>> void merge(
+ E finPlan, List<E> plans) throws PlanException {
+ for (E e : plans) {
+ finPlan.merge(e);
+ }
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Create a new SparkCompilerException with null as the error message.
+ */
+public class SparkCompilerException extends VisitorException {
+ private static final long serialVersionUID = 2L;
+
+ /**
+ * Create a new SparkCompilerException with null as the error message.
+ */
+ public SparkCompilerException() {
+ super();
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ */
+ public SparkCompilerException(String message) {
+ super(message);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified cause.
+ *
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public SparkCompilerException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public SparkCompilerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ */
+ public SparkCompilerException(String message, int errCode) {
+ super(message, errCode);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public SparkCompilerException(String message, int errCode, Throwable cause) {
+ super(message, errCode, cause);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ */
+ public SparkCompilerException(String message, int errCode, byte errSrc) {
+ super(message, errCode, errSrc);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public SparkCompilerException(String message, int errCode, byte errSrc,
+ Throwable cause) {
+ super(message, errCode, errSrc, cause);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param retry - If the exception is retriable or not
+ */
+ public SparkCompilerException(String message, int errCode, boolean retry) {
+ super(message, errCode, retry);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ * @param retry - If the exception is retriable or not
+ */
+ public SparkCompilerException(String message, int errCode, byte errSrc,
+ boolean retry) {
+ super(message, errCode, errSrc, retry);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message, error code, error source, retriable or not, detalied message for the developer and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ * @param retry - If the exception is retriable or not
+ * @param detailedMsg - The detailed message shown to the developer
+ */
+ public SparkCompilerException(String message, int errCode, byte errSrc,
+ boolean retry, String detailedMsg) {
+ super(message, errCode, errSrc, retry, detailedMsg);
+ }
+
+ /**
+ * Create a new SparkCompilerException with the specified message, error code, error source, retriable or not, detalied message for the developer and cause.
+ *
+ * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user
+ * @param errCode - The error code shown to the user
+ * @param errSrc - The error source
+ * @param retry - If the exception is retriable or not
+ * @param detailedMsg - The detailed message shown to the developer
+ * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+ */
+ public SparkCompilerException(String message, int errCode, byte errSrc,
+ boolean retry, String detailedMsg, Throwable cause) {
+ super(message, errCode, errSrc, retry, detailedMsg, cause);
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor for the SparkOperPlan class
+ */
+public class SparkOpPlanVisitor extends PlanVisitor<SparkOper, SparkOperPlan> {
+
+ public SparkOpPlanVisitor(SparkOperPlan plan, PlanWalker<SparkOper, SparkOperPlan> walker) {
+ super(plan, walker);
+ // TODO Auto-generated constructor stub
+ }
+
+ public void visitSparkOp(SparkOper sparkOper) throws VisitorException {
+ // TODO Auto-generated method stub
+ }
+
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * An operator model for a Spark job.
+ * Acts as a host to the plans that will
+ * execute in spark.
+ */
+public class SparkOper extends Operator<SparkOpPlanVisitor> {
+ private static enum OPER_FEATURE {
+ NONE,
+ // Indicate if this job is a sampling job
+ SAMPLER,
+ // Indicate if this job is a merge indexer
+ INDEXER,
+ // Indicate if this job is a group by job
+ GROUPBY,
+ // Indicate if this job is a cogroup job
+ COGROUP,
+ // Indicate if this job is a regular join job
+ HASHJOIN;
+ };
+ public PhysicalPlan plan;
+
+ public Set<String> UDFs;
+
+ /* Name of the Custom Partitioner used */
+ public String customPartitioner = null;
+
+ public Set<PhysicalOperator> scalars;
+
+ public boolean isUDFComparatorUsed = false;
+
+ public int requestedParallelism = -1;
+
+ private OPER_FEATURE feature = OPER_FEATURE.NONE;
+
+ private boolean splitter = false;
+
+ // Name of the partition file generated by sampling process,
+ // Used by Skewed Join
+ private String skewedJoinPartitionFile;
+
+ private boolean usingTypedComparator = false;
+
+ private boolean combineSmallSplits = true;
+
+ public SparkOper(OperatorKey k) {
+ super(k);
+ plan = new PhysicalPlan();
+ UDFs = new HashSet<String>();
+ scalars = new HashSet<PhysicalOperator>();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return true;
+ }
+
+ @Override
+ public String name() {
+ String udfStr = getUDFsAsStr();
+ StringBuilder sb = new StringBuilder("Spark" + "(" + requestedParallelism +
+ (udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString());
+ return sb.toString();
+ }
+
+ private String getUDFsAsStr() {
+ StringBuilder sb = new StringBuilder();
+ if(UDFs!=null && UDFs.size()>0){
+ for (String str : UDFs) {
+ sb.append(str.substring(str.lastIndexOf('.')+1));
+ sb.append(',');
+ }
+ sb.deleteCharAt(sb.length()-1);
+ }
+ return sb.toString();
+ }
+
+ public void add(PhysicalOperator physicalOper){
+ this.plan.add(physicalOper);
+ }
+
+ @Override
+ public void visit(SparkOpPlanVisitor v) throws VisitorException {
+ v.visitSparkOp(this);
+ }
+
+
+ public boolean isGroupBy() {
+ return (feature == OPER_FEATURE.GROUPBY);
+ }
+
+ public void markGroupBy() {
+ feature = OPER_FEATURE.GROUPBY;
+ }
+
+ public boolean isCogroup() {
+ return (feature == OPER_FEATURE.COGROUP);
+ }
+
+ public void markCogroup() {
+ feature = OPER_FEATURE.COGROUP;
+ }
+
+ public boolean isRegularJoin() {
+ return (feature == OPER_FEATURE.HASHJOIN);
+ }
+
+ public void markRegularJoin() {
+ feature = OPER_FEATURE.HASHJOIN;
+ }
+
+ public int getRequestedParallelism() {
+ return requestedParallelism;
+ }
+
+ public void setSplitter(boolean spl) {
+ splitter = spl;
+ }
+
+ public boolean isSplitter() {
+ return splitter;
+ }
+
+ public boolean isSampler() {
+ return (feature == OPER_FEATURE.SAMPLER);
+ }
+
+ public void markSampler() {
+ feature = OPER_FEATURE.SAMPLER;
+ }
+
+ public void setSkewedJoinPartitionFile(String file) {
+ skewedJoinPartitionFile = file;
+ }
+
+ public String getSkewedJoinPartitionFile() {
+ return skewedJoinPartitionFile;
+ }
+
+ protected boolean usingTypedComparator() {
+ return usingTypedComparator;
+ }
+
+ protected void useTypedComparator(boolean useTypedComparator) {
+ this.usingTypedComparator = useTypedComparator;
+ }
+
+ protected void noCombineSmallSplits() {
+ combineSmallSplits = false;
+ }
+
+ public boolean combineSmallSplits() {
+ return combineSmallSplits;
+ }
+
+ public boolean isIndexer() {
+ return (feature == OPER_FEATURE.INDEXER);
+ }
+
+ public void markIndexer() {
+ feature = OPER_FEATURE.INDEXER;
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A Plan used to create the plan of
+ * Spark Operators
+ */
+public class SparkOperPlan extends OperatorPlan<SparkOper> {
+
+ @Override
+ public String toString() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ SparkPrinter printer = new SparkPrinter(ps, this);
+ printer.setVerbose(true);
+ try {
+ printer.visit();
+ } catch (VisitorException e) {
+ // TODO Auto-generated catch block
+ throw new RuntimeException("Unable to get String representation of plan:"+e, e );
+ }
+ return baos.toString();
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This visitor visits the SparkPlan and does the following
+ * for each SparkOper
+ * - visits the POPackage in the plan and finds the corresponding
+ * POLocalRearrange(s). It then annotates the POPackage
+ * with information about which columns in the "value" are present in the
+ * "key" and will need to stitched in to the "value"
+ */
+public class SparkPOPackageAnnotator extends SparkOpPlanVisitor {
+ public SparkPOPackageAnnotator(SparkOperPlan plan) {
+ super(plan, new DepthFirstWalker<SparkOper, SparkOperPlan>(plan));
+ }
+
+ @Override
+ public void visitSparkOp(SparkOper sparkOp) throws VisitorException {
+ if(!sparkOp.plan.isEmpty()) {
+ PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(sparkOp.plan);
+ pkgDiscoverer.visit();
+ POPackage pkg = pkgDiscoverer.getPkg();
+ if(pkg != null) {
+ handlePackage(sparkOp, pkg);
+ }
+ }
+ }
+
+ private void handlePackage(SparkOper pkgSparkOp, POPackage pkg) throws VisitorException {
+ int lrFound = 0;
+ List<SparkOper> predecessors = this.mPlan.getPredecessors(pkgSparkOp);
+ if (predecessors != null && predecessors.size() > 0) {
+ for (SparkOper pred : predecessors) {
+ lrFound += patchPackage(pred, pkgSparkOp, pkg);
+ if(lrFound == pkg.getNumInps()) {
+ break;
+ }
+ }
+ }
+ if (lrFound != pkg.getNumInps()) {
+ int errCode = 2086;
+ String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ private int patchPackage(SparkOper pred , SparkOper pkgSparkOp, POPackage pkg) throws VisitorException {
+ LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(pred.plan, pkg);
+ lrDiscoverer.visit();
+ // let our caller know if we managed to patch
+ // the package
+ return lrDiscoverer.getLoRearrangeFound();
+ }
+
+
+ static class PackageDiscoverer extends PhyPlanVisitor {
+
+ private POPackage pkg;
+
+ public PackageDiscoverer(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ @Override
+ public void visitPackage(POPackage pkg) throws VisitorException {
+ this.pkg = pkg;
+ };
+
+ /**
+ * @return the pkg
+ */
+ public POPackage getPkg() {
+ return pkg;
+ }
+
+ }
+
+
+ static class LoRearrangeDiscoverer extends PhyPlanVisitor {
+
+ private int loRearrangeFound = 0;
+ private POPackage pkg;
+
+ public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ this.pkg = pkg;
+ }
+
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
+ loRearrangeFound++;
+ Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+ if (pkg.getPkgr() instanceof LitePackager) {
+ if(lrearrange.getIndex() != 0) {
+ // Throw some exception here
+ throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
+ }
+ }
+
+ // annotate the package with information from the LORearrange
+ // update the keyInfo information if already present in the POPackage
+ keyInfo = pkg.getPkgr().getKeyInfo();
+ if(keyInfo == null)
+ keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+ if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+ // something is wrong - we should not be getting key info
+ // for the same index from two different Local Rearranges
+ int errCode = 2087;
+ String msg = "Unexpected problem during optimization." +
+ " Found index:" + lrearrange.getIndex() +
+ " in multiple LocalRearrange operators.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+
+ }
+ keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+ new Pair<Boolean, Map<Integer, Integer>>(
+ lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+ pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+ }
+
+ /**
+ * @return the loRearrangeFound
+ */
+ public int getLoRearrangeFound() {
+ return loRearrangeFound;
+ }
+
+ }
+}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.PrintStream;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOper;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism printing out the logical plan.
+ */
+public class SparkPrinter extends SparkOpPlanVisitor {
+
+
+ private PrintStream mStream = null;
+ private boolean isVerbose = true;
+
+ public SparkPrinter(PrintStream ps, SparkOperPlan plan) {
+ super(plan, new DepthFirstWalker<SparkOper, SparkOperPlan>(plan));
+ mStream = ps;
+ mStream.println("#--------------------------------------------------");
+ mStream.println("# Spark Plan ");
+ mStream.println("#--------------------------------------------------");
+ }
+
+ public void setVerbose(boolean verbose) {
+ isVerbose = verbose;
+ }
+
+ @Override
+ public void visitSparkOp(SparkOper sparkOp) throws VisitorException {
+ mStream.println("");
+ mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
+ if(sparkOp instanceof NativeSparkOper) {
+ mStream.println("--------");
+ mStream.println();
+ return;
+ }
+ if (sparkOp.plan != null && sparkOp.plan.size() > 0) {
+ PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>(sparkOp.plan, mStream);
+ printer.setVerbose(isVerbose);
+ printer.visit();
+ mStream.println("--------");
+ }
+ }
+}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1661625&r1=1661624&r2=1661625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Mon Feb 23 09:40:27 2015
@@ -18,7 +18,12 @@
package org.apache.pig.tools.pigstats.spark;
+import java.util.List;
+import java.util.Map;
+import scala.Option;
+
import com.google.common.collect.Maps;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -28,13 +33,10 @@ import org.apache.pig.newplan.PlanVisito
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
+
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
-import scala.Option;
-
-import java.util.List;
-import java.util.Map;
public class SparkJobStats extends JobStats {
@@ -59,7 +61,9 @@ public class SparkJobStats extends JobSt
bytes, 1, success);
outputStats.setPOStore(poStore);
outputStats.setConf(conf);
- outputs.add(outputStats);
+ if( !poStore.isTmpStore()) {
+ outputs.add(outputStats);
+ }
}
public void collectStats(JobMetricsListener jobMetricsListener) {