You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/29 15:16:24 UTC

svn commit: r1682452 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: ./ converter/ plan/

Author: xuefu
Date: Fri May 29 13:16:24 2015
New Revision: 1682452

URL: http://svn.apache.org/r1682452
Log:
PIG-4565: Support custom MR partitioners for Spark engine (Mohit via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java?rev=1682452&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java Fri May 29 13:16:24 2015
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.lang.reflect.Method;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.Partitioner;
+
+/**
+ * Spark Partitioner that wraps a custom partitioner that implements
+ * org.apache.hadoop.mapreduce.Partitioner interface.
+ *
+ * Since Spark's shuffle API takes a different parititioner class
+ * (@see org.apache.spark.Partitioner) compared to MapReduce, we need to
+ * wrap custom partitioners written for MapReduce inside this Spark Partitioner.
+ *
+ * MR Custom partitioners are expected to implement getPartition() with
+ * specific arguments:
+ *   public int getPartition(PigNullableWritable key, Writable value, int numPartitions)
+ * For an example of such a partitioner,
+ * @see org.apache.pig.test.utils.SimpleCustomPartitioner
+ */
+public class MapReducePartitionerWrapper extends Partitioner {
+    private static final Log LOG = LogFactory.getLog(MapReducePartitionerWrapper.class);
+
+    private int numPartitions;
+    private String partitionerName;
+    // MR's Partitioner interface is not serializable.
+    // And since it is not serializable, it cannot be initialized in the constructor
+    // (in Spark's DAG scheduler thread in Spark Driver),
+    // To workaround this, It will be lazily initialized inside the map task
+    // (Executor thread) first time that getPartitions() gets called.
+    transient private org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>
+            mapredPartitioner = null;
+    transient private Method getPartitionMethod = null;
+
+    public MapReducePartitionerWrapper(String partitionerName,
+                      int numPartitions) {
+        if (partitionerName == null) {
+            throw new RuntimeException("MapReduce Partitioner cannot be null.");
+        }
+
+        this.partitionerName = partitionerName;
+        this.numPartitions = numPartitions;
+    }
+
+    public int numPartitions() {
+        return numPartitions;
+    }
+
+    public int getPartition(final Object key) {
+        try {
+
+            PigNullableWritable writeableKey = new PigNullableWritable() {
+                public Object getValueAsPigType() {
+                    return key;
+                }
+            };
+
+            // Lazy initialization
+            // Synchronized because multiple (map) tasks in the same Spark Executor
+            // may call getPartition, attempting to initialize at the same time.
+            if (mapredPartitioner == null) {
+                synchronized (this) {
+                    // check again for race condition
+                    if (mapredPartitioner == null) {
+                        Class<?> mapredPartitionerClass =
+                                PigContext.resolveClassName(partitionerName);
+                        Configuration conf = new Configuration();
+                        mapredPartitioner = (org.apache.hadoop.mapreduce.Partitioner<PigNullableWritable, Writable>)
+                                ReflectionUtils.newInstance(mapredPartitionerClass, conf);
+                        getPartitionMethod = mapredPartitionerClass.getMethod(
+                                "getPartition",
+                                PigNullableWritable.class,
+                                org.apache.hadoop.io.Writable.class,
+                                int.class);
+                    }
+                }
+            }
+
+            // MR Parititioner getPartition takes a value argument as well, but
+            // Spark's Partitioner only accepts the key argument.
+            // In practice, MR Partitioners ignore the value. However, it's
+            // possible that some don't.
+            // TODO: We could handle this case by packaging the value inside the
+            // key (conditioned on some config option, since this will balloon
+            // memory usage). PIG-4575.
+            int partition = (Integer) getPartitionMethod.invoke(mapredPartitioner,
+                    writeableKey, null, numPartitions);
+
+            if (LOG.isDebugEnabled())
+                LOG.debug("MapReduce Partitioner partition number for key " + key +
+                        " is " + partition);
+            return partition;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean equals(Object other) {
+        boolean var4;
+        if(other instanceof MapReducePartitionerWrapper) {
+            MapReducePartitionerWrapper var3 = (MapReducePartitionerWrapper)other;
+            var4 = var3.numPartitions() == this.numPartitions();
+        } else {
+            var4 = false;
+        }
+
+        return var4;
+    }
+
+    public int hashCode() {
+        return this.numPartitions();
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java Fri May 29 13:16:24 2015
@@ -19,24 +19,25 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.List;
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
 
 import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
-
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.Partitioner;
 import org.apache.spark.rdd.RDD;
 
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
 public class SparkUtil {
 
     public static <T> ClassTag<T> getManifest(Class<T> clazz) {
@@ -88,12 +89,27 @@ public class SparkUtil {
 
     public static int getParallelism(List<RDD<Tuple>> predecessors,
             PhysicalOperator physicalOperator) {
-      int parallelism = physicalOperator.getRequestedParallelism();
-      if (parallelism <= 0) {
-        // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
-        // is reasonable.
-        parallelism = predecessors.get(0).context().defaultParallelism();
-      }
-      return parallelism;
+
+        String numReducers = System.getenv("SPARK_REDUCERS");
+        if (numReducers != null) {
+            return Integer.parseInt(numReducers);
+        }
+
+        int parallelism = physicalOperator.getRequestedParallelism();
+        if (parallelism <= 0) {
+            // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
+            // is reasonable.
+            parallelism = predecessors.get(0).context().defaultParallelism();
+        }
+
+        return parallelism;
+    }
+
+    public static Partitioner getPartitioner(String customPartitioner, int parallelism) {
+        if (customPartitioner == null) {
+            return new HashPartitioner(parallelism);
+        } else {
+            return new MapReducePartitionerWrapper(customPartitioner, parallelism);
+        }
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java Fri May 29 13:16:24 2015
@@ -29,10 +29,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.spark.rdd.PairRDDFunctions;
 import org.apache.spark.rdd.RDD;
 
-import scala.Function1;
-import scala.Function2;
 import scala.Tuple2;
-import scala.reflect.ClassTag;
 import scala.runtime.AbstractFunction1;
 import scala.runtime.AbstractFunction2;
 
@@ -40,30 +37,30 @@ import scala.runtime.AbstractFunction2;
 public class DistinctConverter implements POConverter<Tuple, Tuple, PODistinct> {
     private static final Log LOG = LogFactory.getLog(DistinctConverter.class);
 
-    private static final Function1<Tuple, Tuple2<Tuple, Object>> TO_KEY_VALUE_FUNCTION = new ToKeyValueFunction();
-    private static final Function2<Object, Object, Object> MERGE_VALUES_FUNCTION = new MergeValuesFunction();
-    private static final Function1<Tuple2<Tuple, Object>, Tuple> TO_VALUE_FUNCTION = new ToValueFunction();
-
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
-            PODistinct poDistinct) throws IOException {
-        SparkUtil.assertPredecessorSize(predecessors, poDistinct, 1);
+            PODistinct op) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, op, 1);
         RDD<Tuple> rdd = predecessors.get(0);
 
-        ClassTag<Tuple2<Tuple, Object>> tuple2ClassManifest = SparkUtil
-                .<Tuple, Object> getTuple2Manifest();
-
-        RDD<Tuple2<Tuple, Object>> rddPairs = rdd.map(TO_KEY_VALUE_FUNCTION,
-                tuple2ClassManifest);
+        // In DISTINCT operation, the key is the entire tuple.
+        // RDD<Tuple> -> RDD<Tuple2<Tuple, null>>
+        RDD<Tuple2<Tuple, Object>> keyValRDD = rdd.map(new ToKeyValueFunction(),
+                SparkUtil.<Tuple, Object> getTuple2Manifest());
         PairRDDFunctions<Tuple, Object> pairRDDFunctions
-          = new PairRDDFunctions<Tuple, Object>(
-                rddPairs, SparkUtil.getManifest(Tuple.class),
+          = new PairRDDFunctions<Tuple, Object>(keyValRDD,
+                SparkUtil.getManifest(Tuple.class),
                 SparkUtil.getManifest(Object.class), null);
-        int parallelism = SparkUtil.getParallelism(predecessors, poDistinct);
-        return pairRDDFunctions.reduceByKey(MERGE_VALUES_FUNCTION, parallelism)
-                .map(TO_VALUE_FUNCTION, SparkUtil.getManifest(Tuple.class));
+        int parallelism = SparkUtil.getParallelism(predecessors, op);
+        return pairRDDFunctions.reduceByKey(
+                SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
+                new MergeValuesFunction())
+                .map(new ToValueFunction(), SparkUtil.getManifest(Tuple.class));
     }
 
+    /**
+     * Tuple -> Tuple2<Tuple, null>
+     */
     private static final class ToKeyValueFunction extends
             AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
             Serializable {
@@ -71,16 +68,20 @@ public class DistinctConverter implement
         public Tuple2<Tuple, Object> apply(Tuple t) {
             if (LOG.isDebugEnabled())
                 LOG.debug("DistinctConverter.ToKeyValueFunction in " + t);
+
             Tuple key = t;
-            Object value = null; // value
-            // (key, value)
+            Object value = null;
             Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+
             if (LOG.isDebugEnabled())
                 LOG.debug("DistinctConverter.ToKeyValueFunction out " + out);
             return out;
         }
     }
 
+    /**
+     * No-op
+     */
     private static final class MergeValuesFunction extends
             AbstractFunction2<Object, Object, Object> implements Serializable {
         @Override
@@ -89,6 +90,9 @@ public class DistinctConverter implement
         }
     }
 
+    /**
+     * Tuple2<Tuple, null> -> Tuple
+     */
     private static final class ToValueFunction extends
             AbstractFunction1<Tuple2<Tuple, Object>, Tuple> implements
             Serializable {
@@ -97,4 +101,4 @@ public class DistinctConverter implement
             return input._1;
         }
     }
-}
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri May 29 13:16:24 2015
@@ -29,14 +29,15 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.builtin.LOG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.rdd.CoGroupedRDD;
 import org.apache.spark.rdd.RDD;
 
@@ -53,65 +54,84 @@ public class GlobalRearrangeConverter im
             .getLog(GlobalRearrangeConverter.class);
 
     private static final TupleFactory tf = TupleFactory.getInstance();
-  @Override
+ 
+    @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
-                              POGlobalRearrangeSpark physicalOperator) throws IOException {
+                              POGlobalRearrangeSpark op) throws IOException {
         SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
-                physicalOperator, 0);
+                op, 0);
         int parallelism = SparkUtil.getParallelism(predecessors,
-                physicalOperator);
+                op);
 
-        String reducers = System.getenv("SPARK_REDUCERS");
-        if (reducers != null) {
-            parallelism = Integer.parseInt(reducers);
-        }
-        LOG.info("Parallelism for Spark groupBy: " + parallelism);
-
-        if (predecessors.size() == 1) {
-            // GROUP
-            JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
-            if (physicalOperator.isUseSecondaryKey()) {
-                RDD<Tuple> rdd = predecessors.get(0);
-                RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
-                        SparkUtil.<Tuple, Object>getTuple2Manifest());
-
-                JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
-                        SparkUtil.getManifest(Tuple.class),
-                        SparkUtil.getManifest(Object.class));
-
-                //first sort the tuple by secondary key if enable useSecondaryKey sort
-                JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));
-                JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction());
-                prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);
-            } else {
-                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
-                prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism);
-            }
+//         TODO: Figure out the tradeoff of using CoGroupRDD (even for 1 input),
+//         vs using groupBy (like we do in this commented code), vs using
+//         reduceByKey(). This is a pending task in Pig on Spark Milestone 1
+//         Once we figure that out, we can allow custom partitioning for
+//         secondary sort case as well.
+//        if (predecessors.size() == 1) {
+//            // GROUP BY
+//            JavaPairRDD<Object, Iterable<Tuple>> prdd;
+//            if (op.isUseSecondaryKey()) {
+//                prdd = handleSecondarySort(predecessors.get(0), op, parallelism);
+//            } else {
+//                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
+//                prdd = jrdd.groupBy(new GetKeyFunction(op), parallelism);
+//                prdd.groupByKey(new CustomPartitioner(op.getCustomPartitioner(),
+//                        parallelism));
+//            }
+//            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
+//            return jrdd2.rdd();
+//
+//        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
+//            return handleSecondarySort(predecessors.get(0), op, parallelism);
+//        }
+
+        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
+            return handleSecondarySort(predecessors.get(0), op, parallelism);
+        }
+
+        List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
+        for (RDD<Tuple> rdd : predecessors) {
+            JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
+            JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
+            rddPairs.add(rddPair.rdd());
+        }
+
+        // Something's wrong with the type parameters of CoGroupedRDD
+        // key and value are the same type ???
+        CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
+                (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
+                        .asScalaBuffer(rddPairs).toSeq()),
+                SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism)
+        );
+
+        RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+            (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+        return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
+    }
 
-            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
-            return jrdd2.rdd();
-        } else {
-            List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
-            for (RDD<Tuple> rdd : predecessors) {
-                JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
-                JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
-                rddPairs.add(rddPair.rdd());
-            }
-
-            // Something's wrong with the type parameters of CoGroupedRDD
-            // key and value are the same type ???
-            CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
-                    (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
-                            .asScalaBuffer(rddPairs).toSeq()),
-                    new HashPartitioner(parallelism));
-
-            RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
-                (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
-            return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
-        }
+    private RDD<Tuple> handleSecondarySort(
+            RDD<Tuple> rdd, POGlobalRearrangeSpark op, int parallelism) {
+
+        RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
+                SparkUtil.<Tuple, Object>getTuple2Manifest());
+
+        JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
+                SparkUtil.getManifest(Tuple.class),
+                SparkUtil.getManifest(Object.class));
+
+        //first sort the tuple by secondary key if enable useSecondaryKey sort
+        JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(
+                new HashPartitioner(parallelism),
+                new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
+        JavaRDD<Tuple> mapped = sorted.mapPartitions(new RemoveValueFunction());
+        JavaPairRDD<Object, Iterable<Tuple>> prdd = mapped.groupBy(
+                new GetKeyFunction(op), parallelism);
+        JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
+        return jrdd2.rdd();
     }
 
-    private static class ToValueFunction implements
+    private static class RemoveValueFunction implements
             FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
 
         private class Tuple2TransformIterable implements Iterable<Tuple> {
@@ -145,15 +165,14 @@ public class GlobalRearrangeConverter im
         @Override
         public Tuple2<Tuple, Object> apply(Tuple t) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Sort ToKeyValueFunction in " + t);
+                LOG.debug("ToKeyNullValueFunction in " + t);
             }
-            Tuple key = t;
-            Object value = null;
-            // (key, value)
-            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Sort ToKeyValueFunction out " + out);
+                LOG.debug("ToKeyNullValueFunction out " + out);
             }
+
             return out;
         }
     }
@@ -234,6 +253,9 @@ public class GlobalRearrangeConverter im
         }
     }
 
+    /**
+     * Function that extract keys from locally rearranged tuples.
+     */
     private static class GetKeyFunction implements Function<Tuple, Object>, Serializable {
         public final POGlobalRearrangeSpark glrSpark;
 
@@ -246,16 +268,18 @@ public class GlobalRearrangeConverter im
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("GetKeyFunction in " + t);
                 }
-                // see PigGenericMapReduce For the key
-                Object key = null;
+
+                Object key;
                 if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) {
                     key = ((Tuple) t.get(1)).get(0);
                 } else {
                     key = t.get(1);
                 }
+
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("GetKeyFunction out " + key);
                 }
+
                 return key;
             } catch (ExecException e) {
                 throw new RuntimeException(e);
@@ -263,8 +287,16 @@ public class GlobalRearrangeConverter im
         }
     }
 
-    private static class GroupTupleFunction implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
-        Serializable {
+    /**
+     * Function that converts elements of PairRDD to regular RDD.
+     * - Input (PairRDD) contains elements of the form
+     * Tuple2<key, Iterable<(index, key, value)>>.
+     * - Output (regular RDD) contains elements of the form
+     * Tuple<(key, iterator to (index, key, value))>
+     */
+    private static class GroupTupleFunction
+            implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
+            Serializable {
         public final POGlobalRearrangeSpark glrSpark;
 
         public GroupTupleFunction(POGlobalRearrangeSpark globalRearrangeSpark) {
@@ -276,12 +308,16 @@ public class GlobalRearrangeConverter im
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("GroupTupleFunction in " + v1);
                 }
+
                 Tuple tuple = tf.newTuple(2);
-                tuple.set(0, v1._1()); // the (index, key) tuple
-                tuple.set(1, v1._2().iterator()); // the Seq<Tuple> aka bag of values
+                tuple.set(0, v1._1()); // key
+                // Note that v1._2() is (index, key, value) tuple, and
+                // v1._2().iterator() is iterator to Seq<Tuple> (aka bag of values)
+                tuple.set(1, v1._2().iterator());
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("GroupTupleFunction out " + tuple);
                 }
+
                 return tuple;
             } catch (ExecException e) {
                 throw new RuntimeException(e);
@@ -290,7 +326,10 @@ public class GlobalRearrangeConverter im
     }
 
     /**
-     * IndexedKey records the index and key info of a tuple.
+     * IndexedKey records the index and key info.
+     * This is used as key for JOINs. It addresses the case where key is
+     * either empty (or is a tuple with one or more emoty fields). In this case,
+     * we must respect the SQL standard as documented in the equals() method.
      */
     public static class IndexedKey implements Serializable {
         private byte index;
@@ -403,24 +442,27 @@ public class GlobalRearrangeConverter im
         }
     }
 
+    /**
+     * Converts incoming locally rearranged tuple, which is of the form
+     * (index, key, value) into Tuple2<key, value>
+     */
     private static class ToKeyValueFunction implements
             Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
 
         @Override
         public Tuple2<IndexedKey, Tuple> call(Tuple t) {
             try {
-                // (index, key, value)
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("ToKeyValueFunction in " + t);
                 }
-                IndexedKey indexedKey = new IndexedKey((Byte) t.get(0), t.get(1));
-                Tuple value = (Tuple) t.get(2); // value
-                // (key, value)
-                Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
-                        value);
+
+                Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(
+                        new IndexedKey((Byte) t.get(0), t.get(1)),
+                        (Tuple) t.get(2));
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("ToKeyValueFunction out " + out);
                 }
+
                 return out;
             } catch (ExecException e) {
                 throw new RuntimeException(e);
@@ -428,6 +470,11 @@ public class GlobalRearrangeConverter im
         }
     }
 
+    /**
+     * Converts cogroup output where each element is {key, bag[]} represented
+     * as Tuple2<Object, Seq<Seq<Tuple>>> into tuple of form
+     * (key, Iterator<(index, key, value)>)
+     */
     private static class ToGroupKeyValueFunction implements
             Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
 
@@ -435,15 +482,16 @@ public class GlobalRearrangeConverter im
         public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
             try {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("ToGroupKeyValueFunction2 in " + input);
+                    LOG.debug("ToGroupKeyValueFunction in " + input);
                 }
+
                 final Object key = input._1().getKey();
                 Object obj = input._2();
                 // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
                 Seq<Tuple>[] bags = (Seq<Tuple>[])obj;
                 int i = 0;
                 List<Iterator<Tuple>> tupleIterators = new ArrayList<Iterator<Tuple>>();
-                for (int j=0; j<bags.length; j++) {
+                for (int j = 0; j < bags.length; j ++) {
                     Seq<Tuple> bag = bags[j];
                     Iterator<Tuple> iterator = JavaConversions
                             .asJavaCollection(bag).iterator();
@@ -463,14 +511,16 @@ public class GlobalRearrangeConverter im
                             }
                         }
                     });
-                    ++i;
+                    ++ i;
                 }
+
                 Tuple out = tf.newTuple(2);
                 out.set(0, key);
                 out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("ToGroupKeyValueFunction2 out " + out);
+                    LOG.debug("ToGroupKeyValueFunction out " + out);
                 }
+
                 return out;
             } catch (Exception e) {
                 throw new RuntimeException(e);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java Fri May 29 13:16:24 2015
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -36,9 +35,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.rdd.RDD;
 
 import scala.runtime.AbstractFunction1;
-import org.apache.spark.rdd.RDD;
 
 @SuppressWarnings({ "serial" })
 public class PackageConverter implements POConverter<Tuple, Tuple, POPackage> {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1682452&r1=1682451&r2=1682452&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Fri May 29 13:16:24 2015
@@ -30,14 +30,11 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.CollectableLoadFunc;
-import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;