You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2015/02/23 10:40:27 UTC

svn commit: r1661625 - in /pig/branches/spark/src/org/apache/pig: backend/hadoop/executionengine/spark/ backend/hadoop/executionengine/spark/operator/ backend/hadoop/executionengine/spark/plan/ tools/pigstats/spark/

Author: praveen
Date: Mon Feb 23 09:40:27 2015
New Revision: 1661625

URL: http://svn.apache.org/r1661625
Log:
PIG-4374: Add SparkPlan in spark package (liyunzhang via praveen)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java?rev=1661625&r1=1661624&r2=1661625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java Mon Feb 23 09:40:27 2015
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
 import java.util.UUID;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1661625&r1=1661624&r2=1661625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Feb 23 09:40:27 2015
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
 import java.io.File;
@@ -7,6 +24,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -18,10 +36,10 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.collect.Lists;
-
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,9 +48,6 @@ import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -51,7 +66,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
@@ -69,21 +83,29 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOper;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
 import org.apache.spark.scheduler.StatsReportListener;
-import org.apache.spark.api.java.JavaSparkContext;
 
 /**
  * Main class that launches pig for Spark
@@ -117,13 +139,7 @@ public class SparkLauncher extends Launc
         c.set(PigConstants.LOCAL_CODE_DIR, System.getProperty("java.io.tmpdir"));
 
         SchemaTupleBackend.initialize(c, pigContext);
-
-        // Code pulled from MapReduceLauncher
-        MRCompiler mrCompiler = new MRCompiler(physicalPlan, pigContext);
-        mrCompiler.compile();
-        MROperPlan plan = mrCompiler.getMRPlan();
-        POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
-        pkgAnnotator.visit();
+        SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
 
         if (System.getenv("BROADCAST_PORT") == null
                 && System.getenv("BROADCAST_MASTER_IP") == null) {
@@ -182,16 +198,7 @@ public class SparkLauncher extends Launc
         convertMap.put(PORank.class, new RankConverter());
         convertMap.put(POStreamSpark.class,new StreamConverter(confBytes));
 
-        Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey, RDD<Tuple>>();
-
-        Set<Integer> seenJobIDs = new HashSet<Integer>();
-        for (POStore poStore : stores) {
-            physicalToRDD(physicalPlan, poStore, rdds, convertMap);
-            for (int jobID : getJobIDs(seenJobIDs)) {
-                SparkStatsUtil.waitForJobAddStats(jobID, poStore,
-                    jobMetricsListener, sparkContext, sparkStats, c);
-            }
-        }
+        sparkPlanToRDD(sparkplan,convertMap, sparkStats, c);
 
         cleanUpSparkJob(pigContext,currentDirectoryPath);
         sparkStats.finish();
@@ -339,6 +346,17 @@ public class SparkLauncher extends Launc
         }
     }
 
+    private SparkOperPlan compile(PhysicalPlan physicalPlan, PigContext pigContext) throws PlanException, IOException, VisitorException {
+        SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan, pigContext);
+        sparkCompiler.compile();
+        SparkOperPlan plan = sparkCompiler.getSparkPlan();
+
+        // optimize key - value handling in package
+        SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator(plan);
+        pkgAnnotator.visit();
+        return plan;
+    }
+
     private static void startSparkIfNeeded() throws PigException {
         if (sparkContext == null) {
             String master = System.getenv("SPARK_MASTER");
@@ -412,26 +430,76 @@ public class SparkLauncher extends Launc
         }
     }
 
+    private void sparkPlanToRDD(SparkOperPlan sparkPlan, Map<Class<? extends PhysicalOperator>, POConverter> convertMap, SparkPigStats sparkStats, JobConf c) throws IOException , InterruptedException {
+        Set<Integer> seenJobIDs = new HashSet<Integer>();
+        if (sparkPlan != null) {
+            List<SparkOper> leaves = sparkPlan.getLeaves();
+            Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new HashMap();
+            for (SparkOper leaf : leaves) {
+                Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new HashMap();
+                sparkOperToRDD(sparkPlan, leaf, sparkOpRdds, physicalOpRdds, convertMap, seenJobIDs, sparkStats, c);
+
+            }
+        }
+    }
+
+    private void sparkOperToRDD(SparkOperPlan sparkPlan,
+                                SparkOper sparkOper,Map<OperatorKey, RDD<Tuple>>  sparkOpRdds, Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
+                                Map<Class<? extends PhysicalOperator>, POConverter> convertMap,
+                                Set<Integer> seenJobIDs, SparkPigStats sparkStats, JobConf c ) throws IOException, InterruptedException {
+
+        List<SparkOper> predecessors = sparkPlan.getPredecessors(sparkOper);
+        List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+        if (predecessors != null) {
+            for (SparkOper prede : predecessors) {
+                if (sparkOpRdds.get(prede.getOperatorKey()) == null) {
+                    sparkOperToRDD(sparkPlan, prede, sparkOpRdds, physicalOpRdds, convertMap,seenJobIDs, sparkStats, c);
+                }
+                predecessorRDDs.add(sparkOpRdds.get(prede.getOperatorKey()));
+            }
+        }
+
+        List<PhysicalOperator> leafPOs = sparkOper.plan.getLeaves();
+        if (leafPOs != null && leafPOs.size() != 1) {
+            throw new IllegalArgumentException(String.format("sparkOper.plan should have 1 leaf, but sparkOper.plan.getLeaves() not equals 1, sparkOper:{}" + sparkOper.name()));
+        } else {
+            PhysicalOperator leafPO = leafPOs.get(0);
+            physicalToRDD(sparkOper.plan, leafPO, physicalOpRdds, predecessorRDDs, convertMap);
+            sparkOpRdds.put(sparkOper.getOperatorKey(),physicalOpRdds.get(leafPO.getOperatorKey()));
+        }
+
+        List<POStore> poStores =  PlanHelper.getPhysicalOperators(
+                sparkOper.plan, POStore.class);
+        if( poStores!=null && poStores.size() ==1){
+            POStore poStore = poStores.get(0);
+            for (int jobID : getJobIDs(seenJobIDs)) {
+                SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+                        jobMetricsListener, sparkContext, sparkStats, c);
+            }
+        }  else{
+            LOG.info(String.format("sparkOper:{} does not have POStore or sparkOper has more than 1 POStore",sparkOper.name()));
+        }
+
+    }
+
     private void physicalToRDD(PhysicalPlan plan,
-            PhysicalOperator physicalOperator,
-            Map<OperatorKey, RDD<Tuple>> rdds,
-            Map<Class<? extends PhysicalOperator>, POConverter> convertMap)
+                               PhysicalOperator physicalOperator,
+                               Map<OperatorKey, RDD<Tuple>> rdds,  List<RDD<Tuple>> rddsFromPredeSparkOper,
+                               Map<Class<? extends PhysicalOperator>, POConverter> convertMap)
             throws IOException {
-
         RDD<Tuple> nextRDD = null;
         List<PhysicalOperator> predecessors = plan
                 .getPredecessors(physicalOperator);
         List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
         if (predecessors != null) {
             for (PhysicalOperator predecessor : predecessors) {
-                physicalToRDD(plan, predecessor, rdds, convertMap);
+                physicalToRDD(plan, predecessor, rdds,rddsFromPredeSparkOper, convertMap);
                 predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
             }
-        }
-
-        if( physicalOperator instanceof  POStream ){
-            POStream poStream = (POStream)physicalOperator;
-            physicalOperator = new POStreamSpark(poStream);
+        }else{
+            if( rddsFromPredeSparkOper!=null && rddsFromPredeSparkOper.size()>0 ){
+                predecessorRdds.addAll(rddsFromPredeSparkOper);
+            }
         }
 
         POConverter converter = convertMap.get(physicalOperator.getClass());
@@ -445,10 +513,6 @@ public class SparkLauncher extends Launc
                 + physicalOperator);
         nextRDD = converter.convert(predecessorRdds, physicalOperator);
 
-        if (POStore.class.equals(physicalOperator.getClass())) {
-            return;
-        }
-
         if (nextRDD == null) {
             throw new IllegalArgumentException(
                     "RDD should not be null after PhysicalOperator: "
@@ -460,7 +524,33 @@ public class SparkLauncher extends Launc
 
     @Override
     public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
-            String format, boolean verbose) throws IOException {
+                        String format, boolean verbose) throws IOException {
+        SparkOperPlan sparkPlan = compile(pp, pc);
+        ps.println("#-----------------------------------------------------#");
+        ps.println("# Spark plan is A DAG, the Spark node relations are:");
+        ps.println("#-----------------------------------------------------#");
+        Map<OperatorKey, SparkOper> allOperKeys= sparkPlan.getKeys();
+        List<OperatorKey> operKeyList = new ArrayList(allOperKeys.keySet());
+        Collections.sort(operKeyList);
+        for(OperatorKey operatorKey: operKeyList){
+            SparkOper op = sparkPlan.getOperator(operatorKey);
+            ps.print(op.getOperatorKey());
+            List<SparkOper> successors = sparkPlan.getSuccessors(op);
+            if( successors!=null) {
+                ps.print("->");
+                for (SparkOper suc : successors) {
+                    ps.print(suc.getOperatorKey() + " ");
+                }
+            }
+            ps.println();
+        }
+        if (format.equals("text")) {
+            SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
+            printer.setVerbose(verbose);
+            printer.visit();
+        } else {  // TODO: add support for other file format
+            throw new IOException("Non-text    output of explain is not supported.");
+        }
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1661625&r1=1661624&r2=1661625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Mon Feb 23 09:40:27 2015
@@ -1,27 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
+import java.io.IOException;
+import java.util.List;
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
 import org.apache.hadoop.mapred.JobConf;
+
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 
-import scala.Tuple2;
-import scala.Product2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-//import scala.reflect.ClassManifest;
-//import scala.reflect.ClassManifest$;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
 import org.apache.spark.rdd.RDD;
 
-import java.io.IOException;
-import java.util.List;
-
 public class SparkUtil {
 
     public static <T> ClassTag<T> getManifest(Class<T> clazz) {

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOper;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * NativeSparkOper:
+ */
+public class NativeSparkOper  extends SparkOper {
+    private static final long serialVersionUID = 1L;
+    private static int countJobs = 0;
+    private String nativeSparkJar;
+    private String[] params;
+    private String jobId;
+
+    public NativeSparkOper(OperatorKey k, String sparkJar, String[] parameters) {
+        super(k);
+        nativeSparkJar = sparkJar;
+        params = parameters;
+        jobId = sparkJar + "_" + getJobNumber();
+    }
+
+    private static int getJobNumber() {
+        countJobs++;
+        return countJobs;
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The compiler that compiles a given physical physicalPlan
+ * into a DAG of Spark operators
+ */
+public class SparkCompiler extends PhyPlanVisitor {
+    private PigContext pigContext;
+
+    //The physicalPlan that is being compiled
+    private PhysicalPlan physicalPlan;
+
+    //The physicalPlan of Spark Operators
+    private SparkOperPlan sparkPlan;
+
+    private SparkOper curSparkOp;
+
+    private String scope;
+
+    private SparkOper[] compiledInputs  = null;
+
+    private Map<OperatorKey, SparkOper> splitsSeen;
+
+    private NodeIdGenerator nig;
+
+    private Map<PhysicalOperator,SparkOper> phyToSparkOpMap;
+    private UDFFinder udfFinder;
+
+    public SparkCompiler(PhysicalPlan physicalPlan,
+                      PigContext pigContext){
+        super(physicalPlan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(physicalPlan));
+        this.physicalPlan = physicalPlan;
+        this.pigContext = pigContext;
+        this.sparkPlan = new SparkOperPlan();
+        this.phyToSparkOpMap = new HashMap<PhysicalOperator,SparkOper>();
+        this.udfFinder = new UDFFinder();
+        this.nig = NodeIdGenerator.getGenerator();
+        this.splitsSeen = new HashMap<OperatorKey, SparkOper>();
+
+    }
+
+    public void compile() throws IOException, PlanException, VisitorException {
+        List<PhysicalOperator> roots = physicalPlan.getRoots();
+        if((roots == null) || (roots.size() <= 0)) {
+            int errCode = 2053;
+            String msg = "Internal error. Did not find roots in the physical physicalPlan.";
+            throw new SparkCompilerException(msg, errCode, PigException.BUG);
+        }
+        scope = roots.get(0).getOperatorKey().getScope();
+        List<PhysicalOperator> leaves = physicalPlan.getLeaves();
+
+        if (!pigContext.inIllustrator)
+            for (PhysicalOperator op : leaves) {
+                if (!(op instanceof POStore)) {
+                    int errCode = 2025;
+                    String msg = "Expected leaf of reduce physicalPlan to " +
+                            "always be POStore. Found " + op.getClass().getSimpleName();
+                    throw new SparkCompilerException(msg, errCode, PigException.BUG);
+                }
+            }
+
+        // get all stores and nativeSpark operators, sort them in order(operator id)
+        // and compile their plans
+        List<POStore> stores = PlanHelper.getPhysicalOperators(physicalPlan, POStore.class);
+        List<PONative> nativeSparks= PlanHelper.getPhysicalOperators(physicalPlan, PONative.class);
+        List<PhysicalOperator> ops;
+        if (!pigContext.inIllustrator) {
+            ops = new ArrayList<PhysicalOperator>(stores.size() + nativeSparks.size());
+            ops.addAll(stores);
+        } else {
+            ops = new ArrayList<PhysicalOperator>(leaves.size() + nativeSparks.size());
+            ops.addAll(leaves);
+        }
+        ops.addAll(nativeSparks);
+        Collections.sort(ops);
+
+        for (PhysicalOperator op : ops) {
+            compile(op);
+        }
+    }
+
+    /**
+     * Compiles the physicalPlan below op into a Spark Operator
+     * and stores it in curSparkOp.
+     * @param op
+     * @throws IOException
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    private void compile(PhysicalOperator op) throws IOException, PlanException, VisitorException {
+        SparkOper[] prevCompInp = compiledInputs;
+
+        List<PhysicalOperator> predecessors = physicalPlan.getPredecessors(op);
+        if(op instanceof PONative){
+            // the predecessor (store) has already been processed
+            // don't process it again
+        }
+        else if (predecessors != null && predecessors.size() > 0) {
+            // When processing an entire script (multiquery), we can
+            // get into a situation where a load has
+            // predecessors. This means that it depends on some store
+            // earlier in the physicalPlan. We need to take that dependency
+            // and connect the respective Spark operators, while at the
+            // same time removing the connection between the Physical
+            // operators. That way the jobs will run in the right
+            // order.
+            if (op instanceof POLoad) {
+
+                if (predecessors.size() != 1) {
+                    int errCode = 2125;
+                    String msg = "Expected at most one predecessor of load. Got "+predecessors.size();
+                    throw new PlanException(msg, errCode, PigException.BUG);
+                }
+
+                PhysicalOperator p = predecessors.get(0);
+                SparkOper oper = null;
+                if(p instanceof POStore || p instanceof PONative){
+                    oper = phyToSparkOpMap.get(p);
+                }else{
+                    int errCode = 2126;
+                    String msg = "Predecessor of load should be a store or spark operator. Got "+p.getClass();
+                    throw new PlanException(msg, errCode, PigException.BUG);
+                }
+
+                // Need new operator
+                curSparkOp = getSparkOp();
+                curSparkOp.add(op);
+                sparkPlan.add(curSparkOp);
+                physicalPlan.disconnect(op, p);
+                sparkPlan.connect(oper, curSparkOp);
+                phyToSparkOpMap.put(op, curSparkOp);
+                return;
+            }
+
+            Collections.sort(predecessors);
+            compiledInputs = new SparkOper[predecessors.size()];
+            int i = -1;
+            for (PhysicalOperator pred : predecessors) {
+                if(pred instanceof POSplit && splitsSeen.containsKey(pred.getOperatorKey())){
+                    compiledInputs[++i] = startNew(((POSplit)pred).getSplitStore(), splitsSeen.get(pred.getOperatorKey()));
+                    continue;
+                }
+                compile(pred);
+                compiledInputs[++i] = curSparkOp;
+            }
+        } else {
+            //No predecessors. Mostly a load. But this is where
+            //we start. We create a new sparkOp and add its first
+            //operator op. Also this should be added to the sparkPlan.
+            curSparkOp = getSparkOp();
+            curSparkOp.add(op);
+            if (op !=null && op instanceof POLoad)
+            {
+                if (((POLoad)op).getLFile()!=null && ((POLoad)op).getLFile().getFuncSpec()!=null)
+                    curSparkOp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString());
+            }
+            sparkPlan.add(curSparkOp);
+            phyToSparkOpMap.put(op, curSparkOp);
+            return;
+        }
+        op.visit(this);
+        compiledInputs = prevCompInp;
+    }
+
+
+    private SparkOper getSparkOp() {
+        return new SparkOper(OperatorKey.genOpKey(scope));
+    }
+
+    public SparkOperPlan getSparkPlan() {
+        return sparkPlan;
+    }
+
+    public void connectSoftLink() throws PlanException, IOException {
+        for (PhysicalOperator op : physicalPlan) {
+            if (physicalPlan.getSoftLinkPredecessors(op)!=null) {
+                for (PhysicalOperator pred : physicalPlan.getSoftLinkPredecessors(op)) {
+                    SparkOper from = phyToSparkOpMap.get(pred);
+                    SparkOper to = phyToSparkOpMap.get(op);
+                    if (from==to)
+                        continue;
+                    if (sparkPlan.getPredecessors(to)==null || !sparkPlan.getPredecessors(to).contains(from)) {
+                        sparkPlan.connect(from, to);
+                    }
+                }
+            }
+        }
+    }
+
+    private SparkOper startNew(FileSpec fSpec, SparkOper old) throws PlanException{
+        POLoad ld = getLoad();
+        ld.setLFile(fSpec);
+        SparkOper ret = getSparkOp();
+        ret.add(ld);
+        sparkPlan.add(ret);
+        sparkPlan.connect(old, ret);
+        return ret;
+    }
+
+
+    private POLoad getLoad(){
+        POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        ld.setPc(pigContext);
+        ld.setIsTmpLoad(true);
+        return ld;
+    }
+
+    @Override
+    public void visitSplit(POSplit op) throws VisitorException{
+        try{
+            FileSpec fSpec = op.getSplitStore();
+            SparkOper sparkOp = endSingleInputPlanWithStr(fSpec);
+            sparkOp.setSplitter(true);
+            splitsSeen.put(op.getOperatorKey(), sparkOp);
+            curSparkOp = startNew(fSpec, sparkOp);
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    public void visitDistinct(PODistinct op) throws VisitorException{
+        try{
+            nonBlocking(op);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    private SparkOper endSingleInputPlanWithStr(FileSpec fSpec) throws PlanException{
+        if(compiledInputs.length>1) {
+            int errCode = 2023;
+            String msg = "Received a multi input physicalPlan when expecting only a single input one.";
+            throw new PlanException(msg, errCode, PigException.BUG);
+        }
+        SparkOper sparkOp = compiledInputs[0];  //  Load
+        POStore str = getStore();
+        str.setSFile(fSpec);
+        sparkOp.plan.addAsLeaf(str);
+        return sparkOp;
+    }
+
+    private POStore getStore(){
+        POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        // mark store as tmp store. These could be removed by the
+        // optimizer, because it wasn't the user requesting it.
+        st.setIsTmpStore(true);
+        return st;
+    }
+
+    @Override
+    public void visitLoad(POLoad op) throws VisitorException{
+        try{
+            nonBlocking(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitNative(PONative op) throws VisitorException{
+        try{
+            SparkOper nativesparkOpper = getNativeSparkOp(op.getNativeMRjar(), op.getParams());
+            sparkPlan.add(nativesparkOpper);
+            sparkPlan.connect(curSparkOp, nativesparkOpper);
+            phyToSparkOpMap.put(op, nativesparkOpper);
+            curSparkOp = nativesparkOpper;
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    private NativeSparkOper getNativeSparkOp(String sparkJar, String[] parameters) {
+        return new NativeSparkOper(new OperatorKey(scope,nig.getNextNodeId(scope)), sparkJar, parameters);
+    }
+
+    @Override
+    public void visitStore(POStore op) throws VisitorException{
+        try{
+            nonBlocking(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+            if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null)
+                curSparkOp.UDFs.add(op.getSFile().getFuncSpec().toString());
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitFilter(POFilter op) throws VisitorException{
+        try{
+            nonBlocking(op);
+            processUDFs(op.getPlan());
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitCross(POCross op) throws VisitorException {
+        try {
+            nonBlocking(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitStream(POStream op) throws VisitorException{
+        try{
+            POStreamSpark poStreamSpark = new POStreamSpark(op);
+            nonBlocking(poStreamSpark);
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitSort(POSort op) throws  VisitorException{
+        try{
+            nonBlocking(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitLimit(POLimit op) throws VisitorException {
+        try {
+            nonBlocking(op);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
+        try{
+            nonBlocking(op);
+            List<PhysicalPlan> plans = op.getPlans();
+            if(plans!=null)
+                for(PhysicalPlan ep : plans)
+                    processUDFs(ep);
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
+        List<PhysicalOperator> roots = curSparkOp.plan.getRoots();
+        if(roots.size() != 1){
+            int errCode = 2171;
+            String errMsg = "Expected one but found more then one root physical operator in physical physicalPlan.";
+            throw new SparkCompilerException(errMsg,errCode,PigException.BUG);
+        }
+
+        PhysicalOperator phyOp = roots.get(0);
+        if(! (phyOp instanceof POLoad)){
+            int errCode = 2172;
+            String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
+            throw new SparkCompilerException(errMsg,errCode,PigException.BUG);
+        }
+
+        LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc();
+        try {
+            if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+                int errCode = 2249;
+                throw new SparkCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.", errCode);
+            }
+            ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit();
+        } catch (SparkCompilerException e){
+            throw (e);
+        } catch (IOException e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+
+        try{
+            nonBlocking(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitPOForEach(POForEach op) throws VisitorException{
+        try{
+            nonBlocking(op);
+            List<PhysicalPlan> plans = op.getInputPlans();
+            if (plans != null) {
+                for (PhysicalPlan ep : plans) {
+                    processUDFs(ep);
+                }
+            }
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
+        try{
+            blocking(op);
+            curSparkOp.customPartitioner = op.getCustomPartitioner();
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitPackage(POPackage op) throws VisitorException{
+        try{
+            nonBlocking(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+            if (op.getPkgr().getPackageType() == Packager.PackageType.JOIN) {
+                curSparkOp.markRegularJoin();
+            } else if (op.getPkgr().getPackageType() == Packager.PackageType.GROUP) {
+                if (op.getNumInps() == 1) {
+                    curSparkOp.markGroupBy();
+                } else if (op.getNumInps() > 1) {
+                    curSparkOp.markCogroup();
+                }
+            }
+
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visitUnion(POUnion op) throws VisitorException{
+        try{
+            nonBlocking(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+
+    @Override
+    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+     //TODO
+    }
+
+    @Override
+    public void visitFRJoin(POFRJoin op) throws VisitorException {
+      //TODO
+    }
+
+    @Override
+    public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
+      //TODO
+    }
+
+    private void processUDFs(PhysicalPlan plan) throws VisitorException{
+        if(plan!=null){
+            //Process Scalars (UDF with referencedOperators)
+            ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
+            scalarPhyFinder.visit();
+            curSparkOp.scalars.addAll(scalarPhyFinder.getScalars());
+
+            //Process UDFs
+            udfFinder.setPlan(plan);
+            udfFinder.visit();
+            curSparkOp.UDFs.addAll(udfFinder.getUDFs());
+        }
+    }
+
+    private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
+        SparkOper sparkOp=null;
+        if (compiledInputs.length == 1) {
+            sparkOp = compiledInputs[0];
+        } else {
+           sparkOp = merge(compiledInputs);
+        }
+        sparkOp.plan.addAsLeaf(op);
+        curSparkOp = sparkOp;
+    }
+
+    private void blocking(PhysicalOperator op) throws PlanException, IOException{
+        SparkOper sparkOp = getSparkOp();
+        sparkPlan.add(sparkOp);
+        for(SparkOper compileInput: compiledInputs){
+            sparkPlan.connect(compileInput, sparkOp);
+        }
+        sparkOp.plan.addAsLeaf(op);
+        curSparkOp = sparkOp;
+    }
+
+    private SparkOper merge(SparkOper[] compiledInputs)throws PlanException  {
+        SparkOper ret = getSparkOp();
+        sparkPlan.add(ret);
+
+        Set<SparkOper> toBeConnected = new HashSet<SparkOper>();
+        List<SparkOper> toBeRemoved = new ArrayList<SparkOper>();
+
+        List<PhysicalPlan> toBeMerged = new ArrayList<PhysicalPlan>();
+
+        for (SparkOper sparkOp : compiledInputs) {
+             toBeRemoved.add(sparkOp);
+             toBeMerged.add(sparkOp.plan);
+             List<SparkOper> predecessors = sparkPlan.getPredecessors(sparkOp);
+             if( predecessors != null){
+                  for( SparkOper predecessorSparkOp: predecessors){
+                      toBeConnected.add(predecessorSparkOp);
+                  }
+             }
+        }
+        merge(ret.plan, toBeMerged);
+
+        Iterator<SparkOper> it = toBeConnected.iterator();
+        while(it.hasNext())
+            sparkPlan.connect(it.next(), ret);
+        for(SparkOper removeSparkOp : toBeRemoved){
+            if(removeSparkOp.requestedParallelism > ret.requestedParallelism)
+                ret.requestedParallelism = removeSparkOp.requestedParallelism;
+            for (String udf:removeSparkOp.UDFs)
+            {
+                if (!ret.UDFs.contains(udf))
+                    ret.UDFs.add(udf);
+            }
+            // We also need to change scalar marking
+            for(PhysicalOperator physOp: removeSparkOp.scalars) {
+                if(!ret.scalars.contains(physOp)) {
+                    ret.scalars.add(physOp);
+                }
+            }
+            Set<PhysicalOperator> opsToChange = new HashSet<PhysicalOperator>();
+            for (Map.Entry<PhysicalOperator, SparkOper> entry : phyToSparkOpMap.entrySet()) {
+                if (entry.getValue()==removeSparkOp) {
+                    opsToChange.add(entry.getKey());
+                }
+            }
+            for (PhysicalOperator op : opsToChange) {
+                phyToSparkOpMap.put(op, ret);
+            }
+
+            sparkPlan.remove(removeSparkOp);
+        }
+        return ret;
+    }
+
+    /**
+     * The merge of a list of plans into a single physicalPlan
+     * @param <O>
+     * @param <E>
+     * @param finPlan - Final Plan into which the list of plans is merged
+     * @param plans - list of plans to be merged
+     * @throws PlanException
+     */
+    private <O extends Operator<?>, E extends OperatorPlan<O>> void merge(
+            E finPlan, List<E> plans) throws PlanException {
+        for (E e : plans) {
+            finPlan.merge(e);
+        }
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Create a new SparkCompilerException with null as the error message.
+ */
+public class SparkCompilerException extends VisitorException  {
+    private static final long serialVersionUID = 2L;
+
+    /**
+     * Create a new SparkCompilerException with null as the error message.
+     */
+    public SparkCompilerException() {
+        super();
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     */
+    public SparkCompilerException(String message) {
+        super(message);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified cause.
+     *
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+     */
+    public SparkCompilerException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+     */
+    public SparkCompilerException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     */
+    public SparkCompilerException(String message, int errCode) {
+        super(message, errCode);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown. 
+     */
+    public SparkCompilerException(String message, int errCode, Throwable cause) {
+        super(message, errCode, cause);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc) {
+        super(message, errCode, errSrc);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown. 
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc,
+                               Throwable cause) {
+        super(message, errCode, errSrc, cause);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param retry - If the exception is retriable or not
+     */
+    public SparkCompilerException(String message, int errCode, boolean retry) {
+        super(message, errCode, retry);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc,
+                               boolean retry) {
+        super(message, errCode, errSrc, retry);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message, error code, error source, retriable or not, detalied message for the developer and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     * @param detailedMsg - The detailed message shown to the developer 
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc,
+                               boolean retry, String detailedMsg) {
+        super(message, errCode, errSrc, retry, detailedMsg);
+    }
+
+    /**
+     * Create a new SparkCompilerException with the specified message, error code, error source, retriable or not, detalied message for the developer and cause.
+     *
+     * @param message - The error message (which is saved for later retrieval by the <link>Throwable.getMessage()</link> method) shown to the user 
+     * @param errCode - The error code shown to the user 
+     * @param errSrc - The error source 
+     * @param retry - If the exception is retriable or not
+     * @param detailedMsg - The detailed message shown to the developer 
+     * @param cause - The cause (which is saved for later retrieval by the <link>Throwable.getCause()</link> method) indicating the source of this exception. A null value is permitted, and indicates that the cause is nonexistent or unknown.
+     */
+    public SparkCompilerException(String message, int errCode, byte errSrc,
+                               boolean retry, String detailedMsg, Throwable cause) {
+        super(message, errCode, errSrc, retry, detailedMsg, cause);
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor for the SparkOperPlan class
+ */
+public class SparkOpPlanVisitor  extends PlanVisitor<SparkOper, SparkOperPlan> {
+
+    public SparkOpPlanVisitor(SparkOperPlan plan, PlanWalker<SparkOper, SparkOperPlan> walker) {
+        super(plan, walker);
+        // TODO Auto-generated constructor stub
+    }
+
+    public void visitSparkOp(SparkOper sparkOper) throws VisitorException {
+        // TODO Auto-generated method stub
+    }
+
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * An operator model for a Spark job.
+ * Acts as a host to the plans that will
+ * execute in spark.
+ */
+public class SparkOper extends Operator<SparkOpPlanVisitor> {
+    private static enum OPER_FEATURE {
+        NONE,
+        // Indicate if this job is a sampling job
+        SAMPLER,
+        // Indicate if this job is a merge indexer
+        INDEXER,
+        // Indicate if this job is a group by job
+        GROUPBY,
+        // Indicate if this job is a cogroup job
+        COGROUP,
+        // Indicate if this job is a regular join job
+        HASHJOIN;
+    };
+    public PhysicalPlan plan;
+
+    public Set<String> UDFs;
+
+    /* Name of the Custom Partitioner used */
+    public String customPartitioner = null;
+
+    public Set<PhysicalOperator> scalars;
+
+    public boolean isUDFComparatorUsed = false;
+
+    public int requestedParallelism = -1;
+
+    private OPER_FEATURE feature = OPER_FEATURE.NONE;
+
+    private boolean splitter = false;
+
+    // Name of the partition file generated by sampling process,
+    // Used by Skewed Join
+    private String skewedJoinPartitionFile;
+
+    private boolean usingTypedComparator = false;
+
+    private boolean combineSmallSplits = true;
+
+    public SparkOper(OperatorKey k) {
+        super(k);
+        plan = new PhysicalPlan();
+        UDFs = new HashSet<String>();
+        scalars = new HashSet<PhysicalOperator>();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        String udfStr = getUDFsAsStr();
+        StringBuilder sb = new StringBuilder("Spark" + "(" + requestedParallelism +
+                (udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString());
+        return sb.toString();
+    }
+
+    private String getUDFsAsStr() {
+        StringBuilder sb = new StringBuilder();
+        if(UDFs!=null && UDFs.size()>0){
+            for (String str : UDFs) {
+                sb.append(str.substring(str.lastIndexOf('.')+1));
+                sb.append(',');
+            }
+            sb.deleteCharAt(sb.length()-1);
+        }
+        return sb.toString();
+    }
+
+    public void add(PhysicalOperator physicalOper){
+        this.plan.add(physicalOper);
+    }
+
+    @Override
+    public void visit(SparkOpPlanVisitor v) throws VisitorException {
+        v.visitSparkOp(this);
+    }
+
+
+    public boolean isGroupBy() {
+        return (feature == OPER_FEATURE.GROUPBY);
+    }
+
+    public void markGroupBy() {
+        feature = OPER_FEATURE.GROUPBY;
+    }
+
+    public boolean isCogroup() {
+        return (feature == OPER_FEATURE.COGROUP);
+    }
+
+    public void markCogroup() {
+        feature = OPER_FEATURE.COGROUP;
+    }
+
+    public boolean isRegularJoin() {
+        return (feature == OPER_FEATURE.HASHJOIN);
+    }
+
+    public void markRegularJoin() {
+        feature = OPER_FEATURE.HASHJOIN;
+    }
+
+    public int getRequestedParallelism() {
+        return requestedParallelism;
+    }
+
+    public void setSplitter(boolean spl) {
+        splitter = spl;
+    }
+
+    public boolean isSplitter() {
+        return splitter;
+    }
+
+    public boolean isSampler() {
+        return (feature == OPER_FEATURE.SAMPLER);
+    }
+
+    public void markSampler() {
+        feature = OPER_FEATURE.SAMPLER;
+    }
+
+    public void setSkewedJoinPartitionFile(String file) {
+        skewedJoinPartitionFile = file;
+    }
+
+    public String getSkewedJoinPartitionFile() {
+        return skewedJoinPartitionFile;
+    }
+
+    protected boolean usingTypedComparator() {
+        return usingTypedComparator;
+    }
+
+    protected void useTypedComparator(boolean useTypedComparator) {
+        this.usingTypedComparator = useTypedComparator;
+    }
+
+    protected void noCombineSmallSplits() {
+        combineSmallSplits = false;
+    }
+
+    public boolean combineSmallSplits() {
+        return combineSmallSplits;
+    }
+
+    public boolean isIndexer() {
+        return (feature == OPER_FEATURE.INDEXER);
+    }
+
+    public void markIndexer() {
+        feature = OPER_FEATURE.INDEXER;
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A Plan used to create the plan of
+ * Spark Operators
+ */
+public class SparkOperPlan extends OperatorPlan<SparkOper> {
+
+    @Override
+    public String toString() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        SparkPrinter printer = new SparkPrinter(ps, this);
+        printer.setVerbose(true);
+        try {
+            printer.visit();
+        } catch (VisitorException e) {
+            // TODO Auto-generated catch block
+            throw new RuntimeException("Unable to get String representation of plan:"+e, e );
+        }
+        return baos.toString();
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This visitor visits the SparkPlan and does the following
+ * for each SparkOper
+ * - visits the POPackage in the plan and finds the corresponding
+ * POLocalRearrange(s). It then annotates the POPackage
+ * with information about which columns in the "value" are present in the
+ * "key" and will need to stitched in to the "value"
+ */
+public class SparkPOPackageAnnotator extends SparkOpPlanVisitor {
+    public SparkPOPackageAnnotator(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOper, SparkOperPlan>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOper sparkOp) throws VisitorException {
+        if(!sparkOp.plan.isEmpty()) {
+            PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(sparkOp.plan);
+            pkgDiscoverer.visit();
+            POPackage pkg = pkgDiscoverer.getPkg();
+            if(pkg != null) {
+                handlePackage(sparkOp, pkg);
+            }
+        }
+    }
+
+    private void handlePackage(SparkOper pkgSparkOp, POPackage pkg) throws VisitorException {
+        int lrFound = 0;
+        List<SparkOper> predecessors = this.mPlan.getPredecessors(pkgSparkOp);
+        if (predecessors != null && predecessors.size() > 0) {
+            for (SparkOper pred : predecessors) {
+                lrFound += patchPackage(pred, pkgSparkOp, pkg);
+                if(lrFound == pkg.getNumInps()) {
+                    break;
+                }
+            }
+        }
+        if (lrFound != pkg.getNumInps()) {
+            int errCode = 2086;
+            String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    private int patchPackage(SparkOper pred , SparkOper pkgSparkOp, POPackage pkg) throws VisitorException {
+        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(pred.plan, pkg);
+        lrDiscoverer.visit();
+        // let our caller know if we managed to patch
+        // the package
+        return lrDiscoverer.getLoRearrangeFound();
+    }
+
+
+    static class PackageDiscoverer extends PhyPlanVisitor {
+
+        private POPackage pkg;
+
+        public PackageDiscoverer(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        }
+
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            this.pkg = pkg;
+        };
+
+        /**
+         * @return the pkg
+         */
+        public POPackage getPkg() {
+            return pkg;
+        }
+
+    }
+
+
+    static class LoRearrangeDiscoverer extends PhyPlanVisitor {
+
+        private int loRearrangeFound = 0;
+        private POPackage pkg;
+
+        public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+            this.pkg = pkg;
+        }
+
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
+            loRearrangeFound++;
+            Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+            if (pkg.getPkgr() instanceof LitePackager) {
+                if(lrearrange.getIndex() != 0) {
+                    // Throw some exception here
+                    throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
+                }
+            }
+
+            // annotate the package with information from the LORearrange
+            // update the keyInfo information if already present in the POPackage
+            keyInfo = pkg.getPkgr().getKeyInfo();
+            if(keyInfo == null)
+                keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+            if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+                // something is wrong - we should not be getting key info
+                // for the same index from two different Local Rearranges
+                int errCode = 2087;
+                String msg = "Unexpected problem during optimization." +
+                        " Found index:" + lrearrange.getIndex() +
+                        " in multiple LocalRearrange operators.";
+                throw new OptimizerException(msg, errCode, PigException.BUG);
+
+            }
+            keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+                    new Pair<Boolean, Map<Integer, Integer>>(
+                            lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+            pkg.getPkgr().setKeyInfo(keyInfo);
+            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+        }
+
+        /**
+         * @return the loRearrangeFound
+         */
+        public int getLoRearrangeFound() {
+            return loRearrangeFound;
+        }
+
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1661625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java Mon Feb 23 09:40:27 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.PrintStream;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOper;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism printing out the logical plan.
+ */
+public class SparkPrinter extends SparkOpPlanVisitor {
+
+
+    private PrintStream mStream = null;
+    private boolean isVerbose = true;
+
+    public SparkPrinter(PrintStream ps, SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOper, SparkOperPlan>(plan));
+        mStream = ps;
+        mStream.println("#--------------------------------------------------");
+        mStream.println("# Spark Plan                                  ");
+        mStream.println("#--------------------------------------------------");
+    }
+
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
+
+    @Override
+    public void visitSparkOp(SparkOper sparkOp) throws VisitorException {
+        mStream.println("");
+        mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
+        if(sparkOp instanceof NativeSparkOper) {
+            mStream.println("--------");
+            mStream.println();
+            return;
+        }
+        if (sparkOp.plan != null && sparkOp.plan.size() > 0) {
+            PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>(sparkOp.plan, mStream);
+            printer.setVerbose(isVerbose);
+            printer.visit();
+            mStream.println("--------");
+        }
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1661625&r1=1661624&r2=1661625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Mon Feb 23 09:40:27 2015
@@ -18,7 +18,12 @@
 
 package org.apache.pig.tools.pigstats.spark;
 
+import java.util.List;
+import java.util.Map;
+import scala.Option;
+
 import com.google.common.collect.Maps;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -28,13 +33,10 @@ import org.apache.pig.newplan.PlanVisito
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
+
 import org.apache.spark.executor.ShuffleReadMetrics;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.executor.TaskMetrics;
-import scala.Option;
-
-import java.util.List;
-import java.util.Map;
 
 public class SparkJobStats extends JobStats {
 
@@ -59,7 +61,9 @@ public class SparkJobStats extends JobSt
           bytes, 1, success);
       outputStats.setPOStore(poStore);
       outputStats.setConf(conf);
-      outputs.add(outputStats);
+      if( !poStore.isTmpStore()) {
+          outputs.add(outputStats);
+      }
   }
 
   public void collectStats(JobMetricsListener jobMetricsListener) {