You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/09/14 04:01:13 UTC

svn commit: r1760625 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: converter/ operator/ optimizer/

Author: xuefu
Date: Wed Sep 14 04:01:13 2016
New Revision: 1760625

URL: http://svn.apache.org/viewvc?rev=1760625&view=rev
Log:
PIG-4969: Optimize combine case for spark mode (Liyun via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1760625&r1=1760624&r2=1760625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Wed Sep 14 04:01:13 2016
@@ -23,17 +23,14 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
 
 import scala.Product2;
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 import scala.collection.Seq;
-import scala.runtime.AbstractFunction1;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -44,9 +41,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.rdd.CoGroupedRDD;
 import org.apache.spark.rdd.RDD;
@@ -69,11 +63,12 @@ public class JoinGroupSparkConverter imp
 
         for (int i = 0; i < predecessors.size(); i++) {
             RDD<Tuple> rdd = predecessors.get(i);
-            rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp),
+            rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp.isUseSecondaryKey(), glaOp
+                            .getSecondarySortOrder()),
                     SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
         }
         if (rddAfterLRA.size() == 1 && useSecondaryKey) {
-            return handleSecondarySort(rddAfterLRA.get(0), pkgOp);
+            return SecondaryKeySortUtil.handleSecondarySort(rddAfterLRA.get(0), pkgOp);
         } else {
 
             CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
@@ -88,218 +83,6 @@ public class JoinGroupSparkConverter imp
         }
     }
 
-    private RDD<Tuple> handleSecondarySort(
-            RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
-        JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class),
-                SparkUtil.getManifest(Tuple.class));
-
-        int partitionNums = pairRDD.partitions().size();
-        //repartition to group tuples with same indexedkey to same partition
-        JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
-                new IndexedKeyPartitioner(partitionNums));
-        //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
-        return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
-    }
-
-    //Group tuples with same IndexKey into same partition
-    private static class IndexedKeyPartitioner extends Partitioner {
-        private int partition;
-        public IndexedKeyPartitioner(int partition) {
-            this.partition = partition;
-        }
-        @Override
-        public int getPartition(Object obj) {
-            IndexedKey indexedKey = (IndexedKey) obj;
-            Tuple key = (Tuple) indexedKey.getKey();
-
-            int hashCode = 0;
-            try {
-                hashCode = Objects.hashCode(key.get(0));
-            } catch (ExecException e) {
-                throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
-            }
-            return Math.abs(hashCode) % partition;
-        }
-
-        @Override
-        public int numPartitions() {
-            return partition;
-        }
-    }
-
-    //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
-    //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result
-    private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
-            Serializable {
-        private POPackage pkgOp;
-
-        public AccumulateByKey(POPackage pkgOp) {
-            this.pkgOp = pkgOp;
-        }
-
-        @Override
-        public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception {
-            return new Iterable<Tuple>() {
-                Object curKey = null;
-                ArrayList curValues = new ArrayList();
-
-                @Override
-                public Iterator<Tuple> iterator() {
-                    return new Iterator<Tuple>() {
-
-                        @Override
-                        public boolean hasNext() {
-                            return it.hasNext() || curKey != null;
-                        }
-
-                        @Override
-                        public Tuple next() {
-                            while (it.hasNext()) {
-                                Tuple2<IndexedKey, Tuple> t = it.next();
-                                //key changes, restruct the last tuple by curKey, curValues and return
-                                Object tMainKey = null;
-                                try {
-                                    tMainKey = ((Tuple) (t._1()).getKey()).get(0);
-                                    if (curKey != null && !curKey.equals(tMainKey)) {
-                                        Tuple result = restructTuple(curKey, new ArrayList(curValues));
-                                        curValues.clear();
-                                        curKey = tMainKey;
-                                        curValues.add(t._2());
-                                        return result;
-                                    }
-                                    curKey = tMainKey;
-                                    //if key does not change, just append the value to the same key
-                                    curValues.add(t._2());
-
-                                } catch (ExecException e) {
-                                    throw new RuntimeException("AccumulateByKey throw exception: ", e);
-                                }
-                            }
-                            if (curKey == null) {
-                                throw new RuntimeException("AccumulateByKey curKey is null");
-                            }
-
-                            //if we get here, this should be the last record
-                            Tuple res = restructTuple(curKey, curValues);
-                            curKey = null;
-                            return res;
-                        }
-
-
-                        @Override
-                        public void remove() {
-                            // Not implemented.
-                            // throw Unsupported Method Invocation Exception.
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-            };
-        }
-
-        private Tuple restructTuple(final Object curKey, final ArrayList<Tuple> curValues) {
-            try {
-                Tuple retVal = null;
-                PigNullableWritable retKey = new PigNullableWritable() {
-
-                    public Object getValueAsPigType() {
-                        return curKey;
-                    }
-                };
-
-                //Here restruct a tupleIterator, later POPackage#tupIter will use it.
-                final Iterator<Tuple> tupleItearator = curValues.iterator();
-                Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() {
-                    public boolean hasNext() {
-                        return tupleItearator.hasNext();
-                    }
-
-                    public NullableTuple next() {
-                        Tuple t = tupleItearator.next();
-                        return new NullableTuple(t);
-                    }
-
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-                pkgOp.setInputs(null);
-                pkgOp.attachInput(retKey, iterator);
-                Result res = pkgOp.getNextTuple();
-                if (res.returnStatus == POStatus.STATUS_OK) {
-                    retVal = (Tuple) res.result;
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("AccumulateByKey out: " + retVal);
-                }
-                return retVal;
-            } catch (ExecException e) {
-                throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e);
-            }
-        }
-    }
-
-    private static class LocalRearrangeFunction extends
-            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
-
-        private final POLocalRearrange lra;
-
-        private boolean useSecondaryKey;
-        private boolean[] secondarySortOrder;
-
-        public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) {
-            if( glaOp.isUseSecondaryKey()) {
-                this.useSecondaryKey = glaOp.isUseSecondaryKey();
-                this.secondarySortOrder = glaOp.getSecondarySortOrder();
-            }
-            this.lra = lra;
-        }
-
-        @Override
-        public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("LocalRearrangeFunction in " + t);
-            }
-            Result result;
-            try {
-                lra.setInputs(null);
-                lra.attachInput(t);
-                result = lra.getNextTuple();
-
-                if (result == null) {
-                    throw new RuntimeException(
-                            "Null response found for LocalRearange on tuple: "
-                                    + t);
-                }
-
-                switch (result.returnStatus) {
-                    case POStatus.STATUS_OK:
-                        // (index, key, value without keys)
-                        Tuple resultTuple = (Tuple) result.result;
-                        Object key = resultTuple.get(1);
-                        IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
-                        if( useSecondaryKey) {
-                            indexedKey.setUseSecondaryKey(useSecondaryKey);
-                            indexedKey.setSecondarySortOrder(secondarySortOrder);
-                        }
-                        Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
-                                (Tuple) resultTuple.get(2));
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("LocalRearrangeFunction out " + out);
-                        }
-                        return out;
-                    default:
-                        throw new RuntimeException(
-                                "Unexpected response code from operator "
-                                        + lra + " : " + result);
-                }
-            } catch (ExecException e) {
-                throw new RuntimeException(
-                        "Couldn't do LocalRearange on tuple: " + t, e);
-            }
-        }
-
-    }
 
     /**
      * Send cogroup output where each element is {key, bag[]} to PoPackage

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java?rev=1760625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeFunction.java Wed Sep 14 04:01:13 2016
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Used by JoinGroupSparkConverter and ReduceByConverter to convert incoming locally rearranged tuple, which is of the
+ * form Tuple(index, key, value) into Tuple2<key, Tuple(key, value)>
+ */
+public class LocalRearrangeFunction extends
+        AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+    private static final Log LOG = LogFactory
+            .getLog(LocalRearrangeFunction.class);
+    private final POLocalRearrange lra;
+
+    private boolean useSecondaryKey;
+    private boolean[] secondarySortOrder;
+
+    public LocalRearrangeFunction(POLocalRearrange lra, boolean useSecondaryKey, boolean[] secondarySortOrder) {
+        this.useSecondaryKey = useSecondaryKey;
+        this.secondarySortOrder = secondarySortOrder;
+        this.lra = lra;
+    }
+
+    //in:Tuple(index,key,value)
+    //out:<IndexedKey, value>
+    @Override
+    public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("LocalRearrangeFunction in " + t);
+        }
+        Result result;
+        try {
+            lra.setInputs(null);
+            lra.attachInput(t);
+            result = lra.getNextTuple();
+
+            if (result == null) {
+                throw new RuntimeException(
+                        "Null response found for LocalRearange on tuple: "
+                                + t);
+            }
+
+            switch (result.returnStatus) {
+                case POStatus.STATUS_OK:
+                    // (index, key, value without keys)
+                    Tuple resultTuple = (Tuple) result.result;
+                    Object key = resultTuple.get(1);
+                    IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+                    if( useSecondaryKey) {
+                        indexedKey.setUseSecondaryKey(useSecondaryKey);
+                        indexedKey.setSecondarySortOrder(secondarySortOrder);
+                    }
+                    Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+                            (Tuple) resultTuple.get(2));
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("LocalRearrangeFunction out " + out);
+                    }
+                    return out;
+                default:
+                    throw new RuntimeException(
+                            "Unexpected response code from operator "
+                                    + lra + " : " + result);
+            }
+        } catch (ExecException e) {
+            throw new RuntimeException(
+                    "Couldn't do LocalRearange on tuple: " + t, e);
+        }
+    }
+
+}
+

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1760625&r1=1760624&r2=1760625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Wed Sep 14 04:01:13 2016
@@ -55,25 +55,24 @@ public class ReduceByConverter implement
         int parallelism = SparkUtil.getParallelism(predecessors, op);
 
         RDD<Tuple> rdd = predecessors.get(0);
-
-        JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair;
+        RDD<Tuple2<IndexedKey, Tuple>> rddPair
+                = rdd.map(new LocalRearrangeFunction(op.getLgr(), op.isUseSecondaryKey(), op.getSecondarySortOrder())
+                , SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
         if (op.isUseSecondaryKey()) {
-            rddPair = handleSecondarySort(rdd, op, parallelism);
+            return SecondaryKeySortUtil.handleSecondarySort(rddPair, op.getPkg());
         } else {
-            JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
-            rddPair = jrdd.map(new ToKeyValueFunction(op));
-        }
-        PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
-                = new PairRDDFunctions<>(rddPair.rdd(),
-                SparkUtil.getManifest(IndexedKey.class),
-                SparkUtil.getManifest(Tuple.class), null);
-
-        RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = pairRDDFunctions.reduceByKey(
-                SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
-                new MergeValuesFunction(op));
-        LOG.debug("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism);
+            PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
+                    = new PairRDDFunctions<>(rddPair,
+                    SparkUtil.getManifest(IndexedKey.class),
+                    SparkUtil.getManifest(Tuple.class), null);
+
+            RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = pairRDDFunctions.reduceByKey(
+                    SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
+                    new MergeValuesFunction(op));
+            LOG.debug("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism);
 
-        return tupleRDD.map(new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class));
+            return tupleRDD.map(new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class));
+        }
     }
 
     private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
@@ -188,8 +187,8 @@ public class ReduceByConverter implement
                 t.append(key);
                 t.append(bag);
 
-                poReduce.getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
-                Tuple packagedTuple = (Tuple) poReduce.getPkgr().getNext().result;
+                poReduce.getPkg().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+                Tuple packagedTuple = (Tuple) poReduce.getPkg().getPkgr().getNext().result;
 
                 // Perform the operation
                 LOG.debug("MergeValuesFunction packagedTuple : " + t);
@@ -241,8 +240,8 @@ public class ReduceByConverter implement
                 bag.add((Tuple) v1._2().get(1));
                 t.append(key);
                 t.append(bag);
-                poReduce.getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
-                packagedTuple = (Tuple) poReduce.getPkgr().getNext().result;
+                poReduce.getPkg().getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+                packagedTuple = (Tuple) poReduce.getPkg().getPkgr().getNext().result;
             } catch (ExecException e) {
                 throw new RuntimeException(e);
             }

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java?rev=1760625&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java Wed Sep 14 04:01:13 2016
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Objects;
+
+import scala.Tuple2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
+
+/**
+ * Provide utility functions which is used by ReducedByConverter and JoinGroupSparkConverter.
+ */
+public class SecondaryKeySortUtil {
+    private static final Log LOG = LogFactory
+            .getLog(SecondaryKeySortUtil.class);
+
+    public static RDD<Tuple> handleSecondarySort(
+            RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
+        JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class),
+                SparkUtil.getManifest(Tuple.class));
+
+        int partitionNums = pairRDD.partitions().size();
+        //repartition to group tuples with same indexedkey to same partition
+        JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
+                new IndexedKeyPartitioner(partitionNums));
+        //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+        return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
+    }
+
+    //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+    //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result
+    private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
+            Serializable {
+        private POPackage pkgOp;
+
+        public AccumulateByKey(POPackage pkgOp) {
+            this.pkgOp = pkgOp;
+        }
+
+        @Override
+        public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception {
+            return new Iterable<Tuple>() {
+                Object curKey = null;
+                ArrayList curValues = new ArrayList();
+
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new Iterator<Tuple>() {
+
+                        @Override
+                        public boolean hasNext() {
+                            return it.hasNext() || curKey != null;
+                        }
+
+                        @Override
+                        public Tuple next() {
+                            while (it.hasNext()) {
+                                Tuple2<IndexedKey, Tuple> t = it.next();
+                                //key changes, restruct the last tuple by curKey, curValues and return
+                                Object tMainKey = null;
+                                try {
+                                    tMainKey = ((Tuple) (t._1()).getKey()).get(0);
+                                    if (curKey != null && !curKey.equals(tMainKey)) {
+                                        Tuple result = restructTuple(curKey, new ArrayList(curValues));
+                                        curValues.clear();
+                                        curKey = tMainKey;
+                                        curValues.add(t._2());
+                                        return result;
+                                    }
+                                    curKey = tMainKey;
+                                    //if key does not change, just append the value to the same key
+                                    curValues.add(t._2());
+
+                                } catch (ExecException e) {
+                                    throw new RuntimeException("AccumulateByKey throw exception: ", e);
+                                }
+                            }
+                            if (curKey == null) {
+                                throw new RuntimeException("AccumulateByKey curKey is null");
+                            }
+
+                            //if we get here, this should be the last record
+                            Tuple res = restructTuple(curKey, curValues);
+                            curKey = null;
+                            return res;
+                        }
+
+
+                        @Override
+                        public void remove() {
+                            // Not implemented.
+                            // throw Unsupported Method Invocation Exception.
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+            };
+        }
+
+        private Tuple restructTuple(final Object curKey, final ArrayList<Tuple> curValues) {
+            try {
+                Tuple retVal = null;
+                PigNullableWritable retKey = new PigNullableWritable() {
+
+                    public Object getValueAsPigType() {
+                        return curKey;
+                    }
+                };
+
+                //Here restruct a tupleIterator, later POPackage#tupIter will use it.
+                final Iterator<Tuple> tupleItearator = curValues.iterator();
+                Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() {
+                    public boolean hasNext() {
+                        return tupleItearator.hasNext();
+                    }
+
+                    public NullableTuple next() {
+                        Tuple t = tupleItearator.next();
+                        return new NullableTuple(t);
+                    }
+
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+                pkgOp.setInputs(null);
+                pkgOp.attachInput(retKey, iterator);
+                Result res = pkgOp.getNextTuple();
+                if (res.returnStatus == POStatus.STATUS_OK) {
+                    retVal = (Tuple) res.result;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("AccumulateByKey out: " + retVal);
+                }
+                return retVal;
+            } catch (ExecException e) {
+                throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e);
+            }
+        }
+    }
+
+    //Group tuples with same IndexKey into same partition
+    private static class IndexedKeyPartitioner extends Partitioner {
+        private int partition;
+
+        public IndexedKeyPartitioner(int partition) {
+            this.partition = partition;
+        }
+
+        @Override
+        public int getPartition(Object obj) {
+            IndexedKey indexedKey = (IndexedKey) obj;
+            Tuple key = (Tuple) indexedKey.getKey();
+
+            int hashCode = 0;
+            try {
+                hashCode = Objects.hashCode(key.get(0));
+            } catch (ExecException e) {
+                throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
+            }
+            return Math.abs(hashCode) % partition;
+        }
+
+        @Override
+        public int numPartitions() {
+            return partition;
+        }
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1760625&r1=1760624&r2=1760625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java Wed Sep 14 04:01:13 2016
@@ -21,7 +21,8 @@ import java.util.List;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 
@@ -31,16 +32,19 @@ import org.apache.pig.impl.plan.Operator
  */
 public class POReduceBySpark extends POForEach {
     private String customPartitioner;
+    protected POLocalRearrange lr;
+    protected POPackage pkg;
 
-    protected Packager pkgr;
-
-    public POReduceBySpark(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened, Packager pkgr){
+    public POReduceBySpark(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened, POPackage
+            pkg, POLocalRearrange lr){
         super(k, rp, inp, isToBeFlattened);
-        this.pkgr = pkgr;
+        this.pkg = pkg;
+        this.lr = lr;
+        this.addOriginalLocation(lr.getAlias(), lr.getOriginalLocations());
     }
 
-    public Packager getPkgr() {
-        return pkgr;
+    public POPackage getPkg() {
+        return pkg;
     }
 
     @Override
@@ -93,4 +97,9 @@ public class POReduceBySpark extends POF
     public void setCustomPartitioner(String customPartitioner) {
         this.customPartitioner = customPartitioner;
     }
+
+    public POLocalRearrange getLgr() {
+        return lr;
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1760625&r1=1760624&r2=1760625&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Wed Sep 14 04:01:13 2016
@@ -86,7 +86,6 @@ public class CombinerOptimizer extends S
     // Output:
     // foreach (using algebraicOp.Final)
     //   -> reduceBy (uses algebraicOp.Intermediate)
-    //      -> localRearrange
     //         -> foreach (using algebraicOp.Initial)
     //             -> CombinerRearrange
     private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException, CloneNotSupportedException {
@@ -251,7 +250,7 @@ public class CombinerOptimizer extends S
                     // Create a reduceBy operator.
                     POReduceBySpark reduceOperator = new POReduceBySpark(cfe.getOperatorKey(), cfe
                             .getRequestedParallelism(),
-                            cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack.getPkgr());
+                            cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack, newRearrange);
                     reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
                     fixReduceSideFE(postReduceFE, algebraicOps);
                     CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
@@ -259,14 +258,11 @@ public class CombinerOptimizer extends S
 
                     // Add the new operators
                     phyPlan.add(reduceOperator);
-                    phyPlan.add(newRearrange);
                     phyPlan.add(mfe);
                     // Connect the new operators as follows:
                     // reduceBy (using algebraicOp.Intermediate)
-                    //   -> rearrange
                     //      -> foreach (using algebraicOp.Initial)
-                    phyPlan.connect(mfe, newRearrange);
-                    phyPlan.connect(newRearrange, reduceOperator);
+                     phyPlan.connect(mfe, reduceOperator);
 
                     // Insert the reduce stage between combiner rearrange and its successor.
                     phyPlan.disconnect(combinerLocalRearrange, packageSuccessor);
@@ -329,7 +325,7 @@ public class CombinerOptimizer extends S
 
     // Update the ReduceBy Operator with the packaging used by Local rearrange.
     private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException {
-        Packager pkgr = reduceOperator.getPkgr();
+        Packager pkgr = reduceOperator.getPkg().getPkgr();
         // annotate the package with information from the LORearrange
         // update the keyInfo information if already present in the POPackage
         Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo();