You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/01/29 04:45:26 UTC

svn commit: r1727472 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/backend/hadoop/executionengine/spark/operator/ src/org/apache/...

Author: xuefu
Date: Fri Jan 29 03:45:26 2016
New Revision: 1727472

URL: http://svn.apache.org/viewvc?rev=1727472&view=rev
Log:
PIG-4709: Improve performance of GROUPBY operator on Spark (Pallavi via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
    pig/branches/spark/test/org/apache/pig/test/TestCombiner.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri Jan 29 03:45:26 2016
@@ -45,6 +45,7 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -60,6 +61,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
@@ -82,6 +84,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.RankConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.ReduceByConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.SkewedJoinConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.SortConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
@@ -90,7 +93,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
@@ -206,6 +211,8 @@ public class SparkLauncher extends Launc
         convertMap.put(PORank.class, new RankConverter());
         convertMap.put(POStream.class, new StreamConverter(confBytes));
         convertMap.put(POFRJoin.class, new FRJoinConverter());
+        convertMap.put(POReduceBySpark.class, new ReduceByConverter());
+        convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter());
 
         sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
         cleanUpSparkJob();
@@ -215,14 +222,27 @@ public class SparkLauncher extends Launc
     }
 
     private void optimize(PigContext pc, SparkOperPlan plan) throws IOException {
-        String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
-        if (!pc.inIllustrator && !("true".equals(prop))) {
+
+        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+
+        // Should be the first optimizer as it introduces new operators to the plan.
+        boolean noCombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false);
+        if (!pc.inIllustrator && !noCombiner)  {
+            CombinerOptimizer combinerOptimizer = new CombinerOptimizer(plan);
+            combinerOptimizer.visit();
+            if (LOG.isDebugEnabled()) {
+                System.out.println("after combiner optimization:");
+                explain(plan, System.out, "text", true);
+            }
+        }
+
+        boolean noSecondaryKey = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, false);
+        if (!pc.inIllustrator && !noSecondaryKey) {
             SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan);
             skOptimizer.visit();
         }
 
-        boolean isAccum =
-                Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator", "true"));
+        boolean isAccum = conf.getBoolean("opt.accumulator", true);
         if (isAccum) {
             AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
             accum.visit();
@@ -233,8 +253,7 @@ public class SparkLauncher extends Launc
         NoopFilterRemover fRem = new NoopFilterRemover(plan);
         fRem.visit();
 
-        boolean isMultiQuery =
-                Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "true"));
+        boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
 
         if (LOG.isDebugEnabled()) {
             System.out.println("before multiquery optimization:");

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri Jan 29 03:45:26 2016
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 
@@ -35,7 +34,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.spark.HashPartitioner;
@@ -174,82 +172,6 @@ public class GlobalRearrangeConverter im
         }
     }
 
-    private static class PigSecondaryKeyComparatorSpark implements Comparator, Serializable {
-        private static final long serialVersionUID = 1L;
-
-        private static boolean[] secondarySortOrder;
-
-        public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) {
-            secondarySortOrder = pSecondarySortOrder;
-        }
-
-        @Override
-        public int compare(Object o1, Object o2) {
-            Tuple t1 = (Tuple) o1;
-            Tuple t2 = (Tuple) o2;
-            try {
-                if ((t1.size() < 3) || (t2.size() < 3)) {
-                    throw new RuntimeException("tuple size must bigger than 3, tuple[0] stands for index, tuple[1]" +
-                            "stands for the compound key, tuple[3] stands for the value");
-                }
-                Tuple compoundKey1 = (Tuple) t1.get(1);
-                Tuple compoundKey2 = (Tuple) t2.get(1);
-                if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
-                    throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," +
-                            "compoundKey[1] stands for secondaryKey");
-                }
-                Object secondaryKey1 = compoundKey1.get(1);
-                Object secondaryKey2 = compoundKey2.get(1);
-                int res = compareSecondaryKeys(secondaryKey1, secondaryKey2, secondarySortOrder);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res);
-                }
-                return res;
-            } catch (ExecException e) {
-                throw new RuntimeException("Fail to get the compoundKey", e);
-            }
-        }
-
-        private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
-            int rc = 0;
-            if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
-                // objects are Tuples, we may need to apply sort order inside them
-                Tuple t1 = (Tuple) o1;
-                Tuple t2 = (Tuple) o2;
-                int sz1 = t1.size();
-                int sz2 = t2.size();
-                if (sz2 < sz1) {
-                    return 1;
-                } else if (sz2 > sz1) {
-                    return -1;
-                } else {
-                    for (int i = 0; i < sz1; i++) {
-                        try {
-                            rc = DataType.compare(t1.get(i), t2.get(i));
-                            if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
-                                rc *= -1;
-                            if ((t1.get(i) == null) || (t2.get(i) == null)) {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("t1.get(i) is:" + t1.get(i) + " t2.get(i) is:" + t2.get(i));
-                                }
-                            }
-                            if (rc != 0) break;
-                        } catch (ExecException e) {
-                            throw new RuntimeException("Unable to compare tuples", e);
-                        }
-                    }
-                }
-            } else {
-                // objects are NOT Tuples, delegate to DataType.compare()
-                rc = DataType.compare(o1, o2);
-            }
-            // apply sort order for keys that are not tuples or for whole tuples
-            if (asc != null && asc.length == 1 && !asc[0])
-                rc *= -1;
-            return rc;
-        }
-    }
-
     /**
      * Function that extract keys from locally rearranged tuples.
      */

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java Fri Jan 29 03:45:26 2016
@@ -27,21 +27,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.spark.rdd.RDD;
 
 @SuppressWarnings({ "serial" })
 public class LocalRearrangeConverter implements
-        RDDConverter<Tuple, Tuple, POLocalRearrange> {
+        RDDConverter<Tuple, Tuple, PhysicalOperator> {
     private static final Log LOG = LogFactory
-            .getLog(GlobalRearrangeConverter.class);
+            .getLog(LocalRearrangeConverter.class);
 
     @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
-            POLocalRearrange physicalOperator) throws IOException {
+            PhysicalOperator physicalOperator) throws IOException {
         SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         // call local rearrange to get key and value
@@ -53,14 +53,17 @@ public class LocalRearrangeConverter imp
     private static class LocalRearrangeFunction extends
             AbstractFunction1<Tuple, Tuple> implements Serializable {
 
-        private final POLocalRearrange physicalOperator;
+        private final PhysicalOperator physicalOperator;
 
-        public LocalRearrangeFunction(POLocalRearrange physicalOperator) {
+        public LocalRearrangeFunction(PhysicalOperator physicalOperator) {
             this.physicalOperator = physicalOperator;
         }
 
         @Override
         public Tuple apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LocalRearrangeFunction in " + t);
+            }
             Result result;
             try {
                 physicalOperator.setInputs(null);

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1727472&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Fri Jan 29 03:45:26 2016
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Utility class that handles secondary key for sorting.
+ */
+class PigSecondaryKeyComparatorSpark implements Comparator, Serializable {
+    private static final Log LOG = LogFactory.getLog(PigSecondaryKeyComparatorSpark.class);
+    private static final long serialVersionUID = 1L;
+
+    private static boolean[] secondarySortOrder;
+
+    public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) {
+        secondarySortOrder = pSecondarySortOrder;
+    }
+
+    @Override
+    public int compare(Object o1, Object o2) {
+        Tuple t1 = (Tuple) o1;
+        Tuple t2 = (Tuple) o2;
+        try {
+            if ((t1.size() < 3) || (t2.size() < 3)) {
+                throw new RuntimeException("tuple size must bigger than 3, tuple[0] stands for index, tuple[1]" +
+                        "stands for the compound key, tuple[3] stands for the value");
+            }
+            Tuple compoundKey1 = (Tuple) t1.get(1);
+            Tuple compoundKey2 = (Tuple) t2.get(1);
+            if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
+                throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," +
+                        "compoundKey[1] stands for secondaryKey");
+            }
+            Object secondaryKey1 = compoundKey1.get(1);
+            Object secondaryKey2 = compoundKey2.get(1);
+            int res = compareSecondaryKeys(secondaryKey1, secondaryKey2, secondarySortOrder);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res);
+            }
+            return res;
+        } catch (ExecException e) {
+            throw new RuntimeException("Fail to get the compoundKey", e);
+        }
+    }
+
+    private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+        int rc = 0;
+        if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
+            // objects are Tuples, we may need to apply sort order inside them
+            Tuple t1 = (Tuple) o1;
+            Tuple t2 = (Tuple) o2;
+            int sz1 = t1.size();
+            int sz2 = t2.size();
+            if (sz2 < sz1) {
+                return 1;
+            } else if (sz2 > sz1) {
+                return -1;
+            } else {
+                for (int i = 0; i < sz1; i++) {
+                    try {
+                        rc = DataType.compare(t1.get(i), t2.get(i));
+                        if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
+                            rc *= -1;
+                        if ((t1.get(i) == null) || (t2.get(i) == null)) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("t1.get(i) is:" + t1.get(i) + " t2.get(i) is:" + t2.get(i));
+                            }
+                        }
+                        if (rc != 0) break;
+                    } catch (ExecException e) {
+                        throw new RuntimeException("Unable to compare tuples", e);
+                    }
+                }
+            }
+        } else {
+            // objects are NOT Tuples, delegate to DataType.compare()
+            rc = DataType.compare(o1, o2);
+        }
+        // apply sort order for keys that are not tuples or for whole tuples
+        if (asc != null && asc.length == 1 && !asc[0])
+            rc *= -1;
+        return rc;
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java?rev=1727472&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java Fri Jan 29 03:45:26 2016
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.AbstractFunction2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.rdd.PairRDDFunctions;
+import org.apache.spark.rdd.RDD;
+
+@SuppressWarnings({"serial"})
+public class ReduceByConverter implements RDDConverter<Tuple, Tuple, POReduceBySpark> {
+    private static final Log LOG = LogFactory.getLog(ReduceByConverter.class);
+
+    private static final TupleFactory tf = TupleFactory.getInstance();
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark op) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, op, 1);
+        int parallelism = SparkUtil.getParallelism(predecessors, op);
+
+        RDD<Tuple> rdd = predecessors.get(0);
+
+        JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair;
+        if (op.isUseSecondaryKey()) {
+            rddPair = handleSecondarySort(rdd, op, parallelism);
+        } else {
+            JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
+            rddPair = jrdd.map(new ToKeyValueFunction(op));
+        }
+        PairRDDFunctions<IndexedKey, Tuple> pairRDDFunctions
+                = new PairRDDFunctions<>(rddPair.rdd(),
+                SparkUtil.getManifest(IndexedKey.class),
+                SparkUtil.getManifest(Tuple.class), null);
+
+        RDD<Tuple2<IndexedKey, Tuple>> tupleRDD = pairRDDFunctions.reduceByKey(
+                SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism),
+                new MergeValuesFunction(op));
+        LOG.debug("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism);
+
+        return tupleRDD.map(new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class));
+    }
+
+    private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
+            RDD<Tuple> rdd, POReduceBySpark op, int parallelism) {
+
+        RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
+                SparkUtil.<Tuple, Object>getTuple2Manifest());
+
+        JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
+                SparkUtil.getManifest(Tuple.class),
+                SparkUtil.getManifest(Object.class));
+
+        //first sort the tuple by secondary key if enable useSecondaryKey sort
+        JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(
+                new HashPartitioner(parallelism),
+                new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
+        JavaRDD<Tuple> jrdd = sorted.keys();
+        JavaRDD<Tuple2<IndexedKey, Tuple>> jrddPair = jrdd.map(new ToKeyValueFunction(op));
+        return jrddPair;
+    }
+
+    private static class ToKeyNullValueFunction extends
+            AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+            Serializable {
+
+        @Override
+        public Tuple2<Tuple, Object> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ToKeyNullValueFunction in " + t);
+            }
+
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(t, null);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ToKeyNullValueFunction out " + out);
+            }
+
+            return out;
+        }
+    }
+
+    /**
+     * Converts incoming locally rearranged tuple, which is of the form
+     * (index, key, value) into Tuple2<key, Tuple(key, value)>
+     */
+    private static class ToKeyValueFunction implements
+            Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
+
+        private POReduceBySpark poReduce = null;
+
+        public ToKeyValueFunction(POReduceBySpark poReduce) {
+            this.poReduce = poReduce;
+        }
+
+        @Override
+        public Tuple2<IndexedKey, Tuple> call(Tuple t) {
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToKeyValueFunction in " + t);
+                }
+
+                Object key;
+                if ((poReduce != null) && (poReduce.isUseSecondaryKey())) {
+                    key = ((Tuple) t.get(1)).get(0);
+                } else {
+                    key = t.get(1);
+                }
+
+                Tuple tupleWithKey = tf.newTuple();
+                tupleWithKey.append(key);
+                tupleWithKey.append(t.get(2));
+
+                Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte) t.get(0), key), tupleWithKey);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToKeyValueFunction out " + out);
+                }
+
+                return out;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Given two input tuples, this function outputs the resultant tuple.
+     * Additionally, it packages the input tuples to ensure the Algebraic Functions can work on them.
+     */
+    private static final class MergeValuesFunction extends AbstractFunction2<Tuple, Tuple, Tuple>
+            implements Serializable {
+        private final POReduceBySpark poReduce;
+
+        public MergeValuesFunction(POReduceBySpark poReduce) {
+            this.poReduce = poReduce;
+        }
+
+        @Override
+        public Tuple apply(Tuple v1, Tuple v2) {
+            LOG.debug("MergeValuesFunction in : " + v1 + " , " + v2);
+            Tuple result = tf.newTuple();
+            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
+            Tuple t = new DefaultTuple();
+            try {
+                // Package the input tuples so they can be processed by Algebraic functions.
+                Object key = v1.get(0);
+                bag.add((Tuple) v1.get(1));
+                bag.add((Tuple) v2.get(1));
+                t.append(key);
+                t.append(bag);
+
+                poReduce.getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+                Tuple packagedTuple = (Tuple) poReduce.getPkgr().getNext().result;
+
+                // Perform the operation
+                LOG.debug("MergeValuesFunction packagedTuple : " + t);
+                poReduce.attachInput(packagedTuple);
+                Result r = poReduce.getNext(poReduce.getResultType());
+
+                // Ensure output is consistent with the output of KeyValueFunction
+                // If we return r.result, the result will be something like this:
+                // (ABC,(2),(3)) - A tuple with key followed by values.
+                // But, we want the result to look like this:
+                // (ABC,((2),(3))) - A tuple with key and a value tuple (containing values).
+                // Hence, the construction of a new value tuple
+                result.append(t.get(0));
+                Tuple valueTuple = tf.newTuple();
+                for (Object o : ((Tuple) r.result).getAll()) {
+                    if (!o.equals(key)) {
+                        valueTuple.append(o);
+                    }
+                }
+                result.append(valueTuple);
+                LOG.debug("MergeValuesFunction out : " + result);
+                return result;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * This function transforms the Tuple to ensure it is packaged as per requirements of the Operator's packager.
+     */
+    private static final class ToTupleFunction extends AbstractFunction1<Tuple2<IndexedKey, Tuple>, Tuple>
+            implements Serializable {
+
+        private final POReduceBySpark poReduce;
+
+        public ToTupleFunction(POReduceBySpark poReduce) {
+            this.poReduce = poReduce;
+        }
+
+        @Override
+        public Tuple apply(Tuple2<IndexedKey, Tuple> v1) {
+            LOG.debug("ToTupleFunction in : " + v1);
+            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
+            Tuple t = new DefaultTuple();
+            Tuple packagedTuple = null;
+            try {
+                Object key = v1._2().get(0);
+                bag.add((Tuple) v1._2().get(1));
+                t.append(key);
+                t.append(bag);
+                poReduce.getPkgr().attachInput(key, new DataBag[]{(DataBag) t.get(1)}, new boolean[]{true});
+                packagedTuple = (Tuple) poReduce.getPkgr().getNext().result;
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+            LOG.debug("ToTupleFunction out : " + packagedTuple);
+            return packagedTuple;
+        }
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java?rev=1727472&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java Fri Jan 29 03:45:26 2016
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * ReduceBy operator that maps to Sparks ReduceBy.
+ * Extends ForEach and adds packager, secondary sort and partitioner support.
+ */
+public class POReduceBySpark extends POForEach {
+    private String customPartitioner;
+
+    protected Packager pkgr;
+
+    public POReduceBySpark(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened, Packager pkgr){
+        super(k, rp, inp, isToBeFlattened);
+        this.pkgr = pkgr;
+    }
+
+    public Packager getPkgr() {
+        return pkgr;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "Reduce By" + "(" + getFlatStr() + ")" + "["
+                + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
+    }
+
+    protected String getFlatStr() {
+        if(isToBeFlattenedArray ==null) {
+            return "";
+        }
+        StringBuilder sb = new StringBuilder();
+        for (Boolean b : isToBeFlattenedArray) {
+            sb.append(b);
+            sb.append(',');
+        }
+        if(sb.length()>0){
+            sb.deleteCharAt(sb.length()-1);
+        }
+        return sb.toString();
+    }
+
+    // Use secondary key
+    private boolean useSecondaryKey;
+    // Sort order for secondary keys;
+    private boolean[] secondarySortOrder;
+
+    public boolean isUseSecondaryKey() {
+        return useSecondaryKey;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public boolean[] getSecondarySortOrder() {
+        return secondarySortOrder;
+    }
+
+    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+        this.secondarySortOrder = secondarySortOrder;
+    }
+
+    public String getCustomPartitioner() {
+        return customPartitioner;
+    }
+
+    public void setCustomPartitioner(String customPartitioner) {
+        this.customPartitioner = customPartitioner;
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java?rev=1727472&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java Fri Jan 29 03:45:26 2016
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * This class goes through the physical plan are replaces GlobalRearrange with ReduceBy
+ * where there are algebraic operations.
+ */
+public class CombinerOptimizer extends SparkOpPlanVisitor {
+
+    private static Log LOG = LogFactory.getLog(CombinerOptimizer.class);
+
+    public CombinerOptimizer(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        try {
+            addCombiner(sparkOp.physicalPlan);
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+    }
+
+    // Checks for algebraic operations and if they exist.
+    // Replaces global rearrange (cogroup) with reduceBy as follows:
+    // Input:
+    // foreach (using algebraicOp)
+    //   -> packager
+    //      -> globalRearrange
+    //          -> localRearrange
+    // Output:
+    // foreach (using algebraicOp.Final)
+    //   -> reduceBy (uses algebraicOp.Intermediate)
+    //      -> localRearrange
+    //         -> foreach (using algebraicOp.Initial)
+    //             -> CombinerRearrange
+    private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException {
+
+        List<PhysicalOperator> leaves = phyPlan.getLeaves();
+        if (leaves == null || leaves.size() != 1) {
+            return;
+        }
+
+        // Ensure there is grouping.
+        List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(phyPlan, POGlobalRearrange.class);
+        if (glrs == null || glrs.size() == 0) {
+            return;
+        }
+        for (POGlobalRearrange glr : glrs) {
+            List<PhysicalOperator> glrSuccessors = phyPlan.getSuccessors(glr);
+            if (glrSuccessors == null || glrSuccessors.isEmpty()) {
+                continue;
+            }
+
+            if (!(glrSuccessors.get(0) instanceof POPackage)) {
+                continue;
+            }
+            POPackage poPackage = (POPackage) glrSuccessors.get(0);
+
+            List<PhysicalOperator> poPackageSuccessors = phyPlan.getSuccessors(poPackage);
+            if (poPackageSuccessors == null || poPackageSuccessors.size() != 1) {
+                continue;
+            }
+            PhysicalOperator successor = poPackageSuccessors.get(0);
+
+            if (successor instanceof POLimit) {
+                // POLimit is acceptable, as long as it has a single foreach as
+                // successor
+                List<PhysicalOperator> limitSucs = phyPlan.getSuccessors(successor);
+                if (limitSucs != null && limitSucs.size() == 1 &&
+                        limitSucs.get(0) instanceof POForEach) {
+                    // the code below will now further examine the foreach
+                    successor = limitSucs.get(0);
+                }
+            }
+            if (successor instanceof POForEach) {
+                POForEach foreach = (POForEach) successor;
+                List<PhysicalOperator> foreachSuccessors = phyPlan.getSuccessors(foreach);
+                // multi-query
+                if (foreachSuccessors == null || foreachSuccessors.size() != 1) {
+                    continue;
+                }
+                List<PhysicalPlan> feInners = foreach.getInputPlans();
+
+                // find algebraic operators and also check if the foreach statement
+                // is suitable for combiner use
+                List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = findAlgebraicOps(feInners);
+                if (algebraicOps == null || algebraicOps.size() == 0) {
+                    // the plan is not combinable or there is nothing to combine
+                    // we're done
+                    continue;
+                }
+                try {
+                    List<PhysicalOperator> glrPredecessors = phyPlan.getPredecessors(glr);
+                    if (glrPredecessors == null || glrPredecessors.isEmpty()) {
+                        continue;
+                    }
+
+                    if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) {
+                        continue;
+                    }
+                    LOG.info("Algebraic operations found. Optimizing plan to use combiner.");
+
+                    POLocalRearrange rearrange = (POLocalRearrange) glrPredecessors.get(0);
+                    PhysicalOperator foreachSuccessor = foreachSuccessors.get(0);
+                    // Clone foreach so it can be modified to an operation post-reduce.
+                    POForEach postReduceFE = foreach.clone();
+
+                    // Trim the global rearrange and the preceeding package.
+                    convertToMapSideForEach(phyPlan, poPackage);
+
+                    // replace PODistinct->Project[*] with distinct udf (which is Algebraic)
+                    for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
+                        if (!(op2plan.first instanceof PODistinct)) {
+                            continue;
+                        }
+                        CombinerOptimizerUtil.DistinctPatcher distinctPatcher
+                                = new CombinerOptimizerUtil.DistinctPatcher(op2plan.second);
+                        distinctPatcher.visit();
+                        if (distinctPatcher.getDistinct() == null) {
+                            int errCode = 2073;
+                            String msg = "Problem with replacing distinct operator with distinct built-in function.";
+                            throw new PlanException(msg, errCode, PigException.BUG);
+                        }
+                        op2plan.first = distinctPatcher.getDistinct();
+                    }
+
+                    // create new map foreach -
+                    POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(foreach, poPackage.getPkgr()
+                            .getKeyType());
+                    Map<PhysicalOperator, Integer> op2newpos = Maps.newHashMap();
+                    Integer pos = 1;
+                    // create plan for each algebraic udf and add as inner plan in map-foreach
+                    for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
+                        PhysicalPlan udfPlan = CombinerOptimizerUtil.createPlanWithPredecessors(op2plan.first,
+                                op2plan.second);
+                        mfe.addInputPlan(udfPlan, false);
+                        op2newpos.put(op2plan.first, pos++);
+                    }
+                    CombinerOptimizerUtil.changeFunc(mfe, POUserFunc.INITIAL);
+
+                    // since we will only be creating SingleTupleBag as input to
+                    // the map foreach, we should flag the POProjects in the map
+                    // foreach inner plans to also use SingleTupleBag
+                    for (PhysicalPlan mpl : mfe.getInputPlans()) {
+                        try {
+                            new CombinerOptimizerUtil.fixMapProjects(mpl).visit();
+                        } catch (VisitorException e) {
+                            int errCode = 2089;
+                            String msg = "Unable to flag project operator to use single tuple bag.";
+                            throw new PlanException(msg, errCode, PigException.BUG, e);
+                        }
+                    }
+
+                    // create new combine foreach
+                    POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(foreach, poPackage.getPkgr()
+                            .getKeyType());
+                    // add algebraic functions with appropriate projection
+                    CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, op2newpos);
+
+                    // we have modified the foreach inner plans - so set them again
+                    // for the foreach so that foreach can do any re-initialization
+                    // around them.
+                    mfe.setInputPlans(mfe.getInputPlans());
+                    cfe.setInputPlans(cfe.getInputPlans());
+
+                    // tell POCombinerPackage which fields need projected and which
+                    // placed in bags. First field is simple project rest need to go
+                    // into bags
+                    int numFields = algebraicOps.size() + 1; // algebraic funcs + group key
+                    boolean[] bags = new boolean[numFields];
+                    bags[0] = false;
+                    for (int i = 1; i < numFields; i++) {
+                        bags[i] = true;
+                    }
+
+                    // Use the POCombiner package in the combine plan
+                    // as it needs to act differently than the regular
+                    // package operator.
+                    CombinerPackager pkgr = new CombinerPackager(poPackage.getPkgr(), bags);
+                    POPackage combinePack = poPackage.clone();
+                    combinePack.setPkgr(pkgr);
+
+                    // A specialized local rearrange operator will replace
+                    // the normal local rearrange in the map plan.
+                    POLocalRearrange newRearrange = CombinerOptimizerUtil.getNewRearrange(rearrange);
+                    POPreCombinerLocalRearrange combinerLocalRearrange = CombinerOptimizerUtil.getPreCombinerLR
+                            (rearrange);
+                    phyPlan.replace(rearrange, combinerLocalRearrange);
+
+                    // Create a reduceBy operator.
+                    POReduceBySpark reduceOperator = new POReduceBySpark(cfe.getOperatorKey(), cfe
+                            .getRequestedParallelism(),
+                            cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack.getPkgr());
+                    reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
+                    fixReduceSideFE(postReduceFE, cfe);
+                    CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
+                    updatePackager(reduceOperator, newRearrange);
+
+                    // Add the new operators
+                    phyPlan.add(reduceOperator);
+                    phyPlan.add(newRearrange);
+                    phyPlan.add(postReduceFE);
+                    // Reconnect as follows :
+                    // foreach (using algebraicOp.Final)
+                    //   -> reduceBy (uses algebraicOp.Intermediate)
+                    //      -> foreach (using algebraicOp.Initial)
+                    phyPlan.disconnect(foreach, foreachSuccessor);
+                    phyPlan.connect(foreach, newRearrange);
+                    phyPlan.connect(newRearrange, reduceOperator);
+                    phyPlan.connect(reduceOperator, postReduceFE);
+                    phyPlan.replace(foreach, mfe);
+                    phyPlan.connect(postReduceFE, foreachSuccessor);
+
+                } catch (Exception e) {
+                    int errCode = 2018;
+                    String msg = "Internal error. Unable to introduce the combiner for optimization.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG, e);
+                }
+            }
+        }
+    }
+
+    // Modifies the map side of foreach (before reduce).
+    private void convertToMapSideForEach(PhysicalPlan physicalPlan, POPackage poPackage)
+            throws PlanException {
+        LinkedList<PhysicalOperator> operatorsToRemove = new LinkedList<>();
+        for (PhysicalOperator physicalOperator : physicalPlan.getPredecessors(poPackage)) {
+            if (physicalOperator instanceof POGlobalRearrangeSpark) {
+                operatorsToRemove.add(physicalOperator);
+                break;
+            }
+        }
+        // Remove global rearranges preceeding POPackage
+        for (PhysicalOperator po : operatorsToRemove) {
+            physicalPlan.removeAndReconnect(po);
+        }
+        // Remove POPackage itself.
+        physicalPlan.removeAndReconnect(poPackage);
+    }
+
+
+    // TODO: Modify the post reduce plan in case of nested algebraic(ExpressionOperator) or logical operations.
+    private void fixReduceSideFE(POForEach postReduceFE, POForEach cfe) throws PlanException,
+            CloneNotSupportedException {
+        List<PhysicalPlan> plans = cfe.getInputPlans();
+        List<PhysicalPlan> newPlans = new ArrayList<>();
+        for (int i = 0; i < plans.size(); i++) {
+            PhysicalPlan inputPlan = plans.get(i).clone();
+            newPlans.add(inputPlan);
+        }
+        postReduceFE.setInputPlans(newPlans);
+        CombinerOptimizerUtil.changeFunc(postReduceFE, POUserFunc.FINAL);
+        postReduceFE.setResultType(DataType.TUPLE);
+    }
+
+    // Update the ReduceBy Operator with the packaging used by Local rearrange.
+    private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException {
+        Packager pkgr = reduceOperator.getPkgr();
+        // annotate the package with information from the LORearrange
+        // update the keyInfo information if already present in the POPackage
+        Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo();
+        if (keyInfo == null)
+            keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+        if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+            // something is wrong - we should not be getting key info
+            // for the same index from two different Local Rearranges
+            int errCode = 2087;
+            String msg = "Unexpected problem during optimization." +
+                    " Found index:" + lrearrange.getIndex() +
+                    " in multiple LocalRearrange operators.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
+
+        }
+        keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+                new Pair<Boolean, Map<Integer, Integer>>(
+                        lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+        pkgr.setKeyInfo(keyInfo);
+        pkgr.setKeyTuple(lrearrange.isKeyTuple());
+        pkgr.setKeyCompound(lrearrange.isKeyCompound());
+    }
+
+    /**
+     * find algebraic operators and also check if the foreach statement is
+     * suitable for combiner use
+     *
+     * @param feInners inner plans of foreach
+     * @return null if plan is not combinable, otherwise list of combinable operators
+     * @throws VisitorException
+     */
+    // TODO : Since all combinable cases are not handled, not using the utility method in CombinerOptimizerUtil
+    private static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners)
+            throws VisitorException {
+        List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = Lists.newArrayList();
+
+        // check each foreach inner plan
+        for (PhysicalPlan pplan : feInners) {
+            // check for presence of non combinable operators
+            CombinerOptimizerUtil.AlgebraicPlanChecker algChecker = new CombinerOptimizerUtil.AlgebraicPlanChecker
+                    (pplan);
+            algChecker.visit();
+            if (algChecker.sawNonAlgebraic) {
+                return null;
+            }
+
+            // TODO : Distinct is combinable. Handle it.
+            if (algChecker.sawDistinctAgg) {
+                return null;
+            }
+
+            List<PhysicalOperator> roots = pplan.getRoots();
+            // combinable operators have to be attached to POProject root(s)
+            for (PhysicalOperator root : roots) {
+                if (root instanceof ConstantExpression) {
+                    continue;
+                }
+                if (!(root instanceof POProject)) {
+                    // how can this happen? - expect root of inner plan to be
+                    // constant or project. not combining it
+                    return null;
+                }
+                POProject proj = (POProject) root;
+                POUserFunc combineUdf = getAlgebraicSuccessor(pplan);
+                if (combineUdf == null) {
+                    if (proj.isProjectToEnd()) {
+                        // project-star or project to end
+                        // not combinable
+                        return null;
+                    }
+                    // Check to see if this is a projection of the grouping column.
+                    // If so, it will be a projection of col 0
+                    List<Integer> cols = proj.getColumns();
+                    if (cols != null && cols.size() == 1 && cols.get(0) == 0) {
+                        // it is project of grouping column, so the plan is
+                        // still combinable
+                        continue;
+                    } else {
+                        //not combinable
+                        return null;
+                    }
+                }
+
+                // The algebraic udf can have more than one input. Add the udf only once
+                boolean exist = false;
+                for (Pair<PhysicalOperator, PhysicalPlan> pair : algebraicOps) {
+                    if (pair.first.equals(combineUdf)) {
+                        exist = true;
+                        break;
+                    }
+                }
+                if (!exist)
+                    algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
+            }
+        }
+
+        return algebraicOps;
+    }
+
+    /**
+     * Look for a algebraic POUserFunc that is the leaf of an input plan.
+     *
+     * @param pplan physical plan
+     * @return null if any operator other POProject or non-algebraic POUserFunc is
+     * found while going down the plan, otherwise algebraic POUserFunc is returned
+     */
+    private static POUserFunc getAlgebraicSuccessor(PhysicalPlan pplan) {
+        // check if it ends in an UDF
+        List<PhysicalOperator> leaves = pplan.getLeaves();
+        if (leaves == null || leaves.size() != 1) {
+            return null;
+        }
+
+        PhysicalOperator succ = leaves.get(0);
+        if (succ instanceof POUserFunc && ((POUserFunc) succ).combinable()) {
+            return (POUserFunc) succ;
+        }
+
+        // some other operator ? can't combine
+        return null;
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Fri Jan 29 03:45:26 2016
@@ -430,7 +430,7 @@ public class CombinerOptimizerUtil {
      * @param keyType type for group-by key
      * @return new POForeach
      */
-    private static POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
+    public static POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
         String scope = foreach.getOperatorKey().scope;
         POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());
         newFE.addOriginalLocation(foreach.getAlias(), foreach.getOriginalLocations());
@@ -454,7 +454,7 @@ public class CombinerOptimizerUtil {
      * @throws CloneNotSupportedException
      * @throws PlanException
      */
-    private static PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan)
+    public static PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan)
             throws CloneNotSupportedException, PlanException {
         PhysicalPlan newplan = new PhysicalPlan();
         addPredecessorsToPlan(algeOp, pplan, newplan);
@@ -491,7 +491,7 @@ public class CombinerOptimizerUtil {
      * @throws CloneNotSupportedException
      * @throws PlanException
      */
-    private static void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)
+    public static void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)
             throws CloneNotSupportedException, PlanException {
         // an array that we will first populate with physical operators in order
         // of their position in input. Used while adding plans to combine
@@ -561,7 +561,7 @@ public class CombinerOptimizerUtil {
      * @param rearrange
      * @return
      */
-    private static POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
+    public static POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
         String scope = rearrange.getOperatorKey().scope;
         POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(
                 createOperatorKey(scope),
@@ -602,7 +602,7 @@ public class CombinerOptimizerUtil {
      * @param type
      * @throws PlanException
      */
-    private static void changeFunc(POForEach fe, byte type) throws PlanException {
+    public static void changeFunc(POForEach fe, byte type) throws PlanException {
         for (PhysicalPlan plan : fe.getInputPlans()) {
             List<PhysicalOperator> leaves = plan.getLeaves();
             if (leaves == null || leaves.size() != 1) {
@@ -640,7 +640,7 @@ public class CombinerOptimizerUtil {
      * @throws PlanException
      * @throws CloneNotSupportedException
      */
-    private static POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
+    public static POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
             throws PlanException, CloneNotSupportedException {
         POLocalRearrange newRearrange = rearrange.clone();
 
@@ -663,13 +663,13 @@ public class CombinerOptimizerUtil {
      * Checks if there is something that prevents the use of algebraic interface,
      * and looks for the PODistinct that can be used as algebraic
      */
-    private static class AlgebraicPlanChecker extends PhyPlanVisitor {
-        boolean sawNonAlgebraic = false;
-        boolean sawDistinctAgg = false;
+    public static class AlgebraicPlanChecker extends PhyPlanVisitor {
+        public boolean sawNonAlgebraic = false;
+        public boolean sawDistinctAgg = false;
         private boolean sawForeach = false;
         private PODistinct distinct = null;
 
-        AlgebraicPlanChecker(PhysicalPlan plan) {
+        public AlgebraicPlanChecker(PhysicalPlan plan) {
             super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
         }
 
@@ -818,7 +818,7 @@ public class CombinerOptimizerUtil {
      * with
      * POUserFunc(org.apache.pig.builtin.Distinct)[DataBag]
      */
-    private static class DistinctPatcher extends PhyPlanVisitor {
+    public static class DistinctPatcher extends PhyPlanVisitor {
         private POUserFunc distinct = null;
         /**
          * @param plan
@@ -884,12 +884,12 @@ public class CombinerOptimizerUtil {
             }
         }
 
-        POUserFunc getDistinct() {
+        public POUserFunc getDistinct() {
             return distinct;
         }
     }
 
-    private static class fixMapProjects extends PhyPlanVisitor {
+    public static class fixMapProjects extends PhyPlanVisitor {
         public fixMapProjects(PhysicalPlan plan) {
             this(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Fri Jan 29 03:45:26 2016
@@ -43,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -432,6 +433,10 @@ public class SecondaryKeyOptimizerUtil {
             } else if (root instanceof POPackage) {
                 POPackage pack = (POPackage) root;
                 pack.getPkgr().setUseSecondaryKey(true);
+            } else if (root instanceof POReduceBySpark) {
+                POReduceBySpark reduceBySpark = (POReduceBySpark) root;
+                reduceBySpark.setUseSecondaryKey(true);
+                reduceBySpark.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
             }
         }
         return secKeyOptimizerInfo;

Modified: pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCombiner.java?rev=1727472&r1=1727471&r2=1727472&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCombiner.java Fri Jan 29 03:45:26 2016
@@ -118,11 +118,15 @@ public class TestCombiner {
 
         pig.registerQuery("B = group A by ($0, $1);");
         pig.registerQuery("C = foreach B generate flatten(group), COUNT($1);");
+        // Since the input has no schema, using Util.getTuplesFromConstantTupleStrings fails assert.
+        List<String> resultTuples = new ArrayList<>();
+        resultTuples.add("(a,b,2)");
+        resultTuples.add("(a,c,1)");
         Iterator<Tuple> resultIterator = pig.openIterator("C");
         Tuple tuple = resultIterator.next();
-        assertEquals("(a,b,2)", tuple.toString());
+        assertTrue(resultTuples.contains(tuple.toString()));
         tuple = resultIterator.next();
-        assertEquals("(a,c,1)", tuple.toString());
+        assertTrue(resultTuples.contains(tuple.toString()));
 
         return inputFileName;
     }
@@ -185,7 +189,7 @@ public class TestCombiner {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
         pigServer.explain("c", ps);
-        assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+        checkCombinerUsed(pigServer, "c", true);
 
         Iterator<Tuple> it = pigServer.openIterator("c");
         Tuple t = it.next();
@@ -235,7 +239,7 @@ public class TestCombiner {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
         pigServer.explain("c", ps);
-        assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+        checkCombinerUsed(pigServer, "c", true);
 
         HashMap<String, Object[]> results = new HashMap<String, Object[]>();
         results.put("pig1", new Object[] { "pig1", 3L, 57L, 5.2, 75L, 9.4, 3L, 3L, 57L });
@@ -256,6 +260,56 @@ public class TestCombiner {
     }
 
     @Test
+    public void testGroupAndUnion() throws Exception {
+        // test use of combiner when group elements are accessed in the foreach
+        String input1[] = {
+                "ABC\t1\ta\t1",
+                "ABC\t1\tb\t2",
+                "ABC\t1\ta\t3",
+                "ABC\t2\tb\t4",
+        };
+
+        Util.createInputFile(cluster, "testGroupElements1.txt", input1);
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
+        pigServer.debugOn();
+        pigServer.registerQuery("a1 = load 'testGroupElements1.txt' " +
+                "as (str:chararray, num1:int, alph : chararray, num2 : int);");
+        pigServer.registerQuery("b1 = group a1 by str;");
+
+        // check if combiner is present or not for various forms of foreach
+        pigServer.registerQuery("c1 = foreach b1  generate flatten(group), COUNT(a1.alph), SUM(a1.num2); ");
+
+        String input2[] = {
+                "DEF\t2\ta\t3",
+                "DEF\t2\td\t5",
+        };
+
+        Util.createInputFile(cluster, "testGroupElements2.txt", input2);
+        pigServer.registerQuery("a2 = load 'testGroupElements2.txt' " +
+                "as (str:chararray, num1:int, alph : chararray, num2 : int);");
+        pigServer.registerQuery("b2 = group a2 by str;");
+
+        // check if combiner is present or not for various forms of foreach
+        pigServer.registerQuery("c2 = foreach b2  generate flatten(group), COUNT(a2.alph), SUM(a2.num2); ");
+
+        pigServer.registerQuery("c = union c1, c2;");
+
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[]{
+                                "('ABC',4L,10L)",
+                                "('DEF',2L,8L)",
+                        });
+
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+        Util.deleteFile(cluster, "testGroupElements1.txt");
+        Util.deleteFile(cluster, "testGroupElements2.txt");
+        pigServer.shutdown();
+    }
+
+    @Test
     public void testGroupElements() throws Exception {
         // test use of combiner when group elements are accessed in the foreach
         String input[] = {
@@ -352,12 +406,12 @@ public class TestCombiner {
         pigServer.shutdown();
     }
 
-    private void checkCombinerUsed(PigServer pigServer, String string, boolean combineExpected)
+    private void checkCombinerUsed(PigServer pigServer, String variable, boolean combineExpected)
             throws IOException {
         // make sure there is a combine plan in the explain output
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
-        pigServer.explain("c", ps);
+        pigServer.explain(variable, ps);
         boolean combinerFound = baos.toString().matches("(?si).*combine plan.*");
         System.out.println(baos.toString());
         assertEquals("is combiner present as expected", combineExpected, combinerFound);