You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/15 15:00:03 UTC

svn commit: r1679557 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/backend/hadoop/executionengine/spark/operator/ src/org/apache/...

Author: xuefu
Date: Fri May 15 13:00:02 2015
New Revision: 1679557

URL: http://svn.apache.org/r1679557
Log:
PIG-4504: Enable Secondary key sort feature in spark mode (Liyun via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
    pig/branches/spark/test/org/apache/pig/spark/
    pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
    pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
    pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
    pig/branches/spark/test/spark-tests

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri May 15 13:00:02 2015
@@ -42,6 +42,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
@@ -55,7 +57,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -87,7 +88,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -173,9 +176,9 @@ public class SparkLauncher extends Launc
 		convertMap.put(POFilter.class, new FilterConverter());
 		convertMap.put(POPackage.class, new PackageConverter(confBytes));
 		convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
-		convertMap.put(POGlobalRearrange.class, new GlobalRearrangeConverter());
-		convertMap.put(POLimit.class, new LimitConverter());
-		convertMap.put(PODistinct.class, new DistinctConverter());
+        convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
+        convertMap.put(POLimit.class, new LimitConverter());
+        convertMap.put(PODistinct.class, new DistinctConverter());
 		convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
 		convertMap.put(POSort.class, new SortConverter());
 		convertMap.put(POSplit.class, new SplitConverter());
@@ -193,14 +196,20 @@ public class SparkLauncher extends Launc
 		return sparkStats;
 	}
 
-	private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException {
-		boolean isAccumulator =
-				Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
-		if (isAccumulator) {
-			AccumulatorOptimizer accumulatorOptimizer = new AccumulatorOptimizer(plan);
-			accumulatorOptimizer.visit();
-		}
-	}
+    private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException {
+        String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
+        if (!pc.inIllustrator && !("true".equals(prop))) {
+            SecondaryKeyOptimizerSpark skOptimizer = new SecondaryKeyOptimizerSpark(plan);
+            skOptimizer.visit();
+        }
+
+        boolean isAccum =
+                Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator", "true"));
+        if (isAccum) {
+            AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+            accum.visit();
+        }
+    }
 
 	/**
 	 * In Spark, currently only async actions return job id. There is no async

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri May 15 13:00:02 2015
@@ -20,20 +20,23 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.rdd.CoGroupedRDD;
 import org.apache.spark.rdd.RDD;
 
@@ -41,26 +44,18 @@ import scala.Product2;
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 import scala.collection.Seq;
-//import scala.reflect.ClassManifest;
+import scala.runtime.AbstractFunction1;
 
 @SuppressWarnings({ "serial" })
 public class GlobalRearrangeConverter implements
-        POConverter<Tuple, Tuple, POGlobalRearrange> {
+        POConverter<Tuple, Tuple, POGlobalRearrangeSpark> {
     private static final Log LOG = LogFactory
             .getLog(GlobalRearrangeConverter.class);
 
     private static final TupleFactory tf = TupleFactory.getInstance();
-
-    // GROUP FUNCTIONS
-    private static final ToKeyValueFunction TO_KEY_VALUE_FUNCTION = new ToKeyValueFunction();
-    private static final GetKeyFunction GET_KEY_FUNCTION = new GetKeyFunction();
-    // COGROUP FUNCTIONS
-    private static final GroupTupleFunction GROUP_TUPLE_FUNCTION = new GroupTupleFunction();
-    private static final ToGroupKeyValueFunction TO_GROUP_KEY_VALUE_FUNCTION = new ToGroupKeyValueFunction();
-
   @Override
     public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
-            POGlobalRearrange physicalOperator) throws IOException {
+                              POGlobalRearrangeSpark physicalOperator) throws IOException {
         SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
                 physicalOperator, 0);
         int parallelism = SparkUtil.getParallelism(predecessors,
@@ -74,15 +69,32 @@ public class GlobalRearrangeConverter im
 
         if (predecessors.size() == 1) {
             // GROUP
-            JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
-            JavaPairRDD<Object, Iterable<Tuple>> prdd = jrdd.groupBy(GET_KEY_FUNCTION, parallelism);
-            JavaRDD<Tuple> jrdd2 = prdd.map(GROUP_TUPLE_FUNCTION);
+            JavaPairRDD<Object, Iterable<Tuple>> prdd = null;
+            if (physicalOperator.isUseSecondaryKey()) {
+                RDD<Tuple> rdd = predecessors.get(0);
+                RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new ToKeyNullValueFunction(),
+                        SparkUtil.<Tuple, Object>getTuple2Manifest());
+
+                JavaPairRDD<Tuple, Object> pairRDD = new JavaPairRDD<Tuple, Object>(rddPair,
+                        SparkUtil.getManifest(Tuple.class),
+                        SparkUtil.getManifest(Object.class));
+
+                //first sort the tuple by secondary key if enable useSecondaryKey sort
+                JavaPairRDD<Tuple, Object> sorted = pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(parallelism), new PigSecondaryKeyComparatorSpark(physicalOperator.getSecondarySortOrder()));
+                JavaRDD<Tuple> mapped = sorted.mapPartitions(new ToValueFunction());
+                prdd = mapped.groupBy(new GetKeyFunction(physicalOperator), parallelism);
+            } else {
+                JavaRDD<Tuple> jrdd = predecessors.get(0).toJavaRDD();
+                prdd = jrdd.groupBy(new GetKeyFunction(physicalOperator), parallelism);
+            }
+
+            JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
             return jrdd2.rdd();
         } else {
             List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
             for (RDD<Tuple> rdd : predecessors) {
                 JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
-                JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(TO_KEY_VALUE_FUNCTION);
+                JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
                 rddPairs.add(rddPair.rdd());
             }
 
@@ -95,18 +107,155 @@ public class GlobalRearrangeConverter im
 
             RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
                 (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
-            return rdd.toJavaRDD().map(TO_GROUP_KEY_VALUE_FUNCTION).rdd();
+            return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
+        }
+    }
+
+    private static class ToValueFunction implements
+            FlatMapFunction<Iterator<Tuple2<Tuple, Object>>, Tuple>, Serializable {
+
+        private class Tuple2TransformIterable implements Iterable<Tuple> {
+
+            Iterator<Tuple2<Tuple, Object>> in;
+
+            Tuple2TransformIterable(Iterator<Tuple2<Tuple, Object>> input) {
+                in = input;
+            }
+
+            public Iterator<Tuple> iterator() {
+                return new IteratorTransform<Tuple2<Tuple, Object>, Tuple>(in) {
+                    @Override
+                    protected Tuple transform(Tuple2<Tuple, Object> next) {
+                        return next._1();
+                    }
+                };
+            }
+        }
+
+        @Override
+        public Iterable<Tuple> call(Iterator<Tuple2<Tuple, Object>> input) {
+            return new Tuple2TransformIterable(input);
+        }
+    }
+
+    private static class ToKeyNullValueFunction extends
+            AbstractFunction1<Tuple, Tuple2<Tuple, Object>> implements
+            Serializable {
+
+        @Override
+        public Tuple2<Tuple, Object> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sort ToKeyValueFunction in " + t);
+            }
+            Tuple key = t;
+            Object value = null;
+            // (key, value)
+            Tuple2<Tuple, Object> out = new Tuple2<Tuple, Object>(key, value);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sort ToKeyValueFunction out " + out);
+            }
+            return out;
+        }
+    }
+
+    private static class PigSecondaryKeyComparatorSpark implements Comparator, Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private static boolean[] secondarySortOrder;
+
+        public PigSecondaryKeyComparatorSpark(boolean[] pSecondarySortOrder) {
+            secondarySortOrder = pSecondarySortOrder;
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            Tuple t1 = (Tuple) o1;
+            Tuple t2 = (Tuple) o2;
+            try {
+                if ((t1.size() < 3) || (t2.size() < 3)) {
+                    throw new RuntimeException("tuple size must bigger than 3, tuple[0] stands for index, tuple[1]" +
+                            "stands for the compound key, tuple[3] stands for the value");
+                }
+                Tuple compoundKey1 = (Tuple) t1.get(1);
+                Tuple compoundKey2 = (Tuple) t2.get(1);
+                if ((compoundKey1.size() < 2) || (compoundKey2.size() < 2)) {
+                    throw new RuntimeException("compoundKey size must bigger than, compoundKey[0] stands for firstKey," +
+                            "compoundKey[1] stands for secondaryKey");
+                }
+                Object secondaryKey1 = compoundKey1.get(1);
+                Object secondaryKey2 = compoundKey2.get(1);
+                int res = compareSecondaryKeys(secondaryKey1, secondaryKey2, secondarySortOrder);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("t1:" + t1 + "t2:" + t2 + " res:" + res);
+                }
+                return res;
+            } catch (ExecException e) {
+                throw new RuntimeException("Fail to get the compoundKey", e);
+            }
+        }
+
+        private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+            int rc = 0;
+            if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
+                // objects are Tuples, we may need to apply sort order inside them
+                Tuple t1 = (Tuple) o1;
+                Tuple t2 = (Tuple) o2;
+                int sz1 = t1.size();
+                int sz2 = t2.size();
+                if (sz2 < sz1) {
+                    return 1;
+                } else if (sz2 > sz1) {
+                    return -1;
+                } else {
+                    for (int i = 0; i < sz1; i++) {
+                        try {
+                            rc = DataType.compare(t1.get(i), t2.get(i));
+                            if (rc != 0 && asc != null && asc.length > 1 && !asc[i])
+                                rc *= -1;
+                            if ((t1.get(i) == null) || (t2.get(i) == null)) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("t1.get(i) is:" + t1.get(i) + " t2.get(i) is:" + t2.get(i));
+                                }
+                            }
+                            if (rc != 0) break;
+                        } catch (ExecException e) {
+                            throw new RuntimeException("Unable to compare tuples", e);
+                        }
+                    }
+                }
+            } else {
+                // objects are NOT Tuples, delegate to DataType.compare()
+                rc = DataType.compare(o1, o2);
+            }
+            // apply sort order for keys that are not tuples or for whole tuples
+            if (asc != null && asc.length == 1 && !asc[0])
+                rc *= -1;
+            return rc;
         }
     }
 
     private static class GetKeyFunction implements Function<Tuple, Object>, Serializable {
+        public final POGlobalRearrangeSpark glrSpark;
+
+        public GetKeyFunction(POGlobalRearrangeSpark globalRearrangeSpark) {
+            this.glrSpark = globalRearrangeSpark;
+        }
 
         public Object call(Tuple t) {
             try {
-                LOG.debug("GetKeyFunction in " + t);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GetKeyFunction in " + t);
+                }
                 // see PigGenericMapReduce For the key
-                Object key = t.get(1);
-                LOG.debug("GetKeyFunction out " + key);
+                Object key = null;
+                if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) {
+                    key = ((Tuple) t.get(1)).get(0);
+                } else {
+                    key = t.get(1);
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GetKeyFunction out " + key);
+                }
                 return key;
             } catch (ExecException e) {
                 throw new RuntimeException(e);
@@ -116,14 +265,23 @@ public class GlobalRearrangeConverter im
 
     private static class GroupTupleFunction implements Function<Tuple2<Object, Iterable<Tuple>>, Tuple>,
         Serializable {
+        public final POGlobalRearrangeSpark glrSpark;
+
+        public GroupTupleFunction(POGlobalRearrangeSpark globalRearrangeSpark) {
+            this.glrSpark = globalRearrangeSpark;
+        }
 
         public Tuple call(Tuple2<Object, Iterable<Tuple>> v1) {
             try {
-                LOG.debug("GroupTupleFunction in " + v1);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GroupTupleFunction in " + v1);
+                }
                 Tuple tuple = tf.newTuple(2);
                 tuple.set(0, v1._1()); // the (index, key) tuple
                 tuple.set(1, v1._2().iterator()); // the Seq<Tuple> aka bag of values
-                LOG.debug("GroupTupleFunction out " + tuple);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GroupTupleFunction out " + tuple);
+                }
                 return tuple;
             } catch (ExecException e) {
                 throw new RuntimeException(e);
@@ -138,13 +296,17 @@ public class GlobalRearrangeConverter im
         public Tuple2<Object, Tuple> call(Tuple t) {
             try {
                 // (index, key, value)
-                LOG.debug("ToKeyValueFunction in " + t);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToKeyValueFunction in " + t);
+                }
                 Object key = t.get(1);
                 Tuple value = (Tuple) t.get(2); // value
                 // (key, value)
                 Tuple2<Object, Tuple> out = new Tuple2<Object, Tuple>(key,
                         value);
-                LOG.debug("ToKeyValueFunction out " + out);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToKeyValueFunction out " + out);
+                }
                 return out;
             } catch (ExecException e) {
                 throw new RuntimeException(e);
@@ -158,7 +320,9 @@ public class GlobalRearrangeConverter im
         @Override
         public Tuple call(Tuple2<Object, Seq<Seq<Tuple>>> input) {
             try {
-                LOG.debug("ToGroupKeyValueFunction2 in " + input);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToGroupKeyValueFunction2 in " + input);
+                }
                 final Object key = input._1();
                 Object obj = input._2();
                 // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
@@ -190,7 +354,9 @@ public class GlobalRearrangeConverter im
                 Tuple out = tf.newTuple(2);
                 out.set(0, key);
                 out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
-                LOG.debug("ToGroupKeyValueFunction2 out " + out);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ToGroupKeyValueFunction2 out " + out);
+                }
                 return out;
             } catch (Exception e) {
                 throw new RuntimeException(e);

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java?rev=1679557&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java Fri May 15 13:00:02 2015
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+
+/**
+ * POGlobalRearrange for spark mode
+ */
+public class POGlobalRearrangeSpark extends POGlobalRearrange {
+    // Use secondary key
+    private boolean useSecondaryKey;
+    // Sort order for secondary keys;
+    private boolean[] secondarySortOrder;
+
+    public POGlobalRearrangeSpark(POGlobalRearrange copy) {
+        super(copy.getOperatorKey());
+    }
+
+    public boolean isUseSecondaryKey() {
+        return useSecondaryKey;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public boolean[] getSecondarySortOrder() {
+        return secondarySortOrder;
+    }
+
+    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+        this.secondarySortOrder = secondarySortOrder;
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1679557&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java Fri May 15 13:00:02 2015
@@ -0,0 +1,218 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Secondary key sort optimization for spark mode
+ */
+public class SecondaryKeyOptimizerSpark extends SparkOpPlanVisitor implements SecondaryKeyOptimizer {
+    private static final Log LOG = LogFactory
+            .getLog(SecondaryKeyOptimizerSpark.class);
+
+    private int numSortRemoved = 0;
+    private int numDistinctChanged = 0;
+    private int numUseSecondaryKey = 0;
+
+    public SecondaryKeyOptimizerSpark(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+    }
+
+    /**
+     * Secondary key sort optimization is enabled in group + foreach nested situation, like TestAccumlator#testAccumWithSort
+     * After calling SecondaryKeyOptimizerUtil.applySecondaryKeySort, the POSort in the POForeach will be deleted in the spark plan.
+     * Sort function can be implemented in secondary key sort even though POSort is deleted in the spark plan.
+     *
+     * @param sparkOperator
+     * @throws VisitorException
+     */
+    @Override
+    public void visitSparkOp(SparkOperator sparkOperator) throws VisitorException {
+        List<POLocalRearrange> rearranges = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLocalRearrange.class);
+        if (rearranges.isEmpty()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No POLocalRearranges found in the sparkOperator.Secondary key optimization is no need");
+            }
+            return;
+        }
+
+        /**
+         * When every POLocalRearrange is encounted in the sparkOperator.physicalPlan,
+         * the sub-physicalplan between the previousLR(or root) to currentLR is considered as mapPlan(like what
+         * we call in mapreduce) and the sub-physicalplan between the POGlobalRearrange(the successor of currentLR) and
+         * nextLR(or leaf) is considered as reducePlan(like what we call in mapreduce).  After mapPlan and reducePlan are got,
+         * use SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan,reducePlan) to enable secondary key optimization.
+         * SecondaryKeyOptimizerUtil.applySecondaryKeySort will remove POSort in the foreach in the reducePlan or
+         * change PODistinct to POSortedDistinct in the foreach in the reducePlan.
+         */
+        for (POLocalRearrange currentLR : rearranges) {
+            PhysicalPlan mapPlan = null;
+            PhysicalPlan reducePlan = null;
+            try {
+                mapPlan = getMapPlan(sparkOperator.physicalPlan, currentLR);
+            } catch (PlanException e) {
+                throw new RuntimeException(e);
+            }
+            try {
+                reducePlan = getReducePlan(sparkOperator.physicalPlan, currentLR);
+            } catch (PlanException e) {
+                throw new RuntimeException(e);
+            }
+
+            // Current code does not enable secondarykey optimization when join case is encounted
+            List<PhysicalOperator> rootsOfReducePlan = reducePlan.getRoots();
+            if (rootsOfReducePlan.get(0) instanceof POGlobalRearrangeSpark) {
+                PhysicalOperator glr = rootsOfReducePlan.get(0);
+                List<PhysicalOperator> predecessors = sparkOperator.physicalPlan.getPredecessors(glr);
+                if (predecessors != null && predecessors.size() >= 2) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Current code does not enable secondarykey optimization when  join case is encounted");
+                    }
+                    return;
+                }
+            }
+
+            if (mapPlan.getOperator(currentLR.getOperatorKey()) == null) {
+                // The POLocalRearrange is sub-plan of a POSplit
+                mapPlan = PlanHelper.getLocalRearrangePlanFromSplit(mapPlan, currentLR.getOperatorKey());
+            }
+
+            SecondaryKeyOptimizerUtil.setIsSparkMode(true);
+            SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mapPlan, reducePlan);
+            if (info != null) {
+                numSortRemoved += info.getNumSortRemoved();
+                numDistinctChanged += info.getNumDistinctChanged();
+                numUseSecondaryKey += info.getNumUseSecondaryKey();
+            }
+        }
+    }
+
+    /**
+     * Find the MRPlan of the physicalPlan which containing currentLR
+     * Backward search all the physicalOperators which precede currentLR until the previous POLocalRearrange
+     * is found or the root of physicalPlan is found.
+     *
+     * @param physicalPlan
+     * @param currentLR
+     * @return
+     * @throws VisitorException
+     * @throws PlanException
+     */
+    private PhysicalPlan getMapPlan(PhysicalPlan physicalPlan, POLocalRearrange currentLR) throws VisitorException, PlanException {
+        PhysicalPlan mapPlan = new PhysicalPlan();
+        mapPlan.addAsRoot(currentLR);
+        List<PhysicalOperator> preList = physicalPlan.getPredecessors(currentLR);
+        while (true) {
+            if (preList == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("there is nothing to backward search");
+                }
+                break;
+            }
+            if (preList.size() != 1) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("the size of predecessor of currentLR should be 1 but now it is not 1,physicalPlan:" + physicalPlan);
+                }
+                break;
+            }
+            PhysicalOperator pre = preList.get(0);
+            if (pre instanceof POLocalRearrange) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Finishing to find the mapPlan between preLR and currentLR.");
+                }
+                break;
+            }
+            mapPlan.addAsRoot(pre);
+            preList = physicalPlan.getPredecessors(pre);
+
+        }
+        return mapPlan;
+    }
+
+    /**
+     * Find the ReducePlan of the physicalPlan which containing currentLR
+     * Forward search all the physicalOperators which succeed currentLR until the next POLocalRearrange
+     * is found or the leave of physicalPlan is found.
+     *
+     * @param physicalPlan
+     * @param currentLR
+     * @return
+     * @throws PlanException
+     */
+    private PhysicalPlan getReducePlan(PhysicalPlan physicalPlan, POLocalRearrange currentLR) throws PlanException {
+        PhysicalPlan reducePlan = new PhysicalPlan();
+        List<PhysicalOperator> succList = physicalPlan.getSuccessors(currentLR);
+        while (true) {
+            if (succList == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("there is nothing to forward search");
+                }
+                break;
+            }
+            if (succList.size() != 1) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("the size of successors of currentLR should be 1 but now it is not 1,physicalPlan:" + physicalPlan);
+                }
+                break;
+            }
+            PhysicalOperator succ = succList.get(0);
+            if (succ instanceof POLocalRearrange) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Finishing to find the ReducePlan between currentLR and netxtLR.");
+                }
+                break;
+            }
+            reducePlan.addAsLeaf(succ);
+            succList = physicalPlan.getSuccessors(succ);
+        }
+        return reducePlan;
+    }
+
+    @Override
+    public int getNumSortRemoved() {
+        return numSortRemoved;
+    }
+
+    @Override
+    public int getNumDistinctChanged() {
+        return numDistinctChanged;
+    }
+
+    @Override
+    public int getNumUseSecondaryKey() {
+        return numUseSecondaryKey;
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Fri May 15 13:00:02 2015
@@ -57,6 +57,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -551,9 +552,10 @@ public class SparkCompiler extends PhyPl
 	public void visitGlobalRearrange(POGlobalRearrange op)
 			throws VisitorException {
 		try {
-			addToPlan(op);
-			curSparkOp.customPartitioner = op.getCustomPartitioner();
-			phyToSparkOpMap.put(op, curSparkOp);
+            POGlobalRearrangeSpark glbOp = new POGlobalRearrangeSpark(op);
+            addToPlan(glbOp);
+            curSparkOp.customPartitioner = op.getCustomPartitioner();
+            phyToSparkOpMap.put(op, curSparkOp);
 		} catch (Exception e) {
 			int errCode = 2034;
 			String msg = "Error compiling operator "

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java Fri May 15 13:00:02 2015
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.plan;
 
+import java.util.BitSet;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -58,7 +59,7 @@ public class SparkOperator extends Opera
 
 	public int requestedParallelism = -1;
 
-	private OPER_FEATURE feature = OPER_FEATURE.NONE;
+    private BitSet feature = new BitSet();
 
 	private boolean splitter = false;
 
@@ -118,28 +119,28 @@ public class SparkOperator extends Opera
 	}
 
 	public boolean isGroupBy() {
-		return (feature == OPER_FEATURE.GROUPBY);
-	}
+        return feature.get(OPER_FEATURE.GROUPBY.ordinal());
+    }
 
 	public void markGroupBy() {
-		feature = OPER_FEATURE.GROUPBY;
-	}
+        feature.set(OPER_FEATURE.GROUPBY.ordinal());
+    }
 
 	public boolean isCogroup() {
-		return (feature == OPER_FEATURE.COGROUP);
-	}
+        return feature.get(OPER_FEATURE.COGROUP.ordinal());
+    }
 
 	public void markCogroup() {
-		feature = OPER_FEATURE.COGROUP;
-	}
+        feature.set(OPER_FEATURE.COGROUP.ordinal());
+    }
 
 	public boolean isRegularJoin() {
-		return (feature == OPER_FEATURE.HASHJOIN);
-	}
+        return feature.get(OPER_FEATURE.HASHJOIN.ordinal());
+    }
 
 	public void markRegularJoin() {
-		feature = OPER_FEATURE.HASHJOIN;
-	}
+        feature.set(OPER_FEATURE.HASHJOIN.ordinal());
+    }
 
 	public int getRequestedParallelism() {
 		return requestedParallelism;
@@ -154,12 +155,12 @@ public class SparkOperator extends Opera
 	}
 
 	public boolean isSampler() {
-		return (feature == OPER_FEATURE.SAMPLER);
-	}
+        return feature.get(OPER_FEATURE.SAMPLER.ordinal());
+    }
 
 	public void markSampler() {
-		feature = OPER_FEATURE.SAMPLER;
-	}
+        feature.set(OPER_FEATURE.SAMPLER.ordinal());
+    }
 
 	public void setSkewedJoinPartitionFile(String file) {
 		skewedJoinPartitionFile = file;
@@ -186,10 +187,10 @@ public class SparkOperator extends Opera
 	}
 
 	public boolean isIndexer() {
-		return (feature == OPER_FEATURE.INDEXER);
-	}
+        return feature.get(OPER_FEATURE.INDEXER.ordinal());
+    }
 
-	public void markIndexer() {
-		feature = OPER_FEATURE.INDEXER;
-	}
+    public void markIndexer() {
+        feature.set(OPER_FEATURE.INDEXER.ordinal());
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java Fri May 15 13:00:02 2015
@@ -18,11 +18,14 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.plan;
 
 import java.io.PrintStream;
+import java.util.List;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -46,21 +49,27 @@ public class SparkPrinter extends SparkO
 		isVerbose = verbose;
 	}
 
-	@Override
-	public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
-		mStream.println("");
-		mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
-		if (sparkOp instanceof NativeSparkOperator) {
-			mStream.println("--------");
-			mStream.println();
-			return;
-		}
-		if (sparkOp.physicalPlan != null && sparkOp.physicalPlan.size() > 0) {
-			PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>(
-					sparkOp.physicalPlan, mStream);
-			printer.setVerbose(isVerbose);
-			printer.visit();
-			mStream.println("--------");
-		}
-	}
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        mStream.println("");
+        mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
+        if (sparkOp instanceof NativeSparkOperator) {
+            mStream.println("--------");
+            mStream.println();
+            return;
+        }
+        if (sparkOp.physicalPlan != null && sparkOp.physicalPlan.size() > 0) {
+            PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>(
+                    sparkOp.physicalPlan, mStream);
+            printer.setVerbose(isVerbose);
+            printer.visit();
+            mStream.println("--------");
+        }
+        List<POGlobalRearrangeSpark> glrList = PlanHelper.getPhysicalOperators(sparkOp.physicalPlan, POGlobalRearrangeSpark.class);
+        for (POGlobalRearrangeSpark glr : glrList) {
+            if (glr.isUseSecondaryKey()) {
+                mStream.println("POGlobalRearrange(" + glr.getOperatorKey() + ") uses secondaryKey");
+            }
+        }
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Fri May 15 13:00:02 2015
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
@@ -297,8 +298,8 @@ public class AccumulatorOptimizerUtil {
             return;
         }
 
-        List<POGlobalRearrange> gras = PlanHelper.getPhysicalOperators(plan,
-            POGlobalRearrange.class);
+        List<POGlobalRearrangeSpark> gras = PlanHelper.getPhysicalOperators(plan,
+                POGlobalRearrangeSpark.class);
 
         for (POGlobalRearrange gra : gras) {
             addAccumulatorSparkForGRASubDAG(plan, gra);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Fri May 15 13:00:02 2015
@@ -35,12 +35,14 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -53,6 +55,7 @@ import org.apache.pig.impl.plan.VisitorE
 @InterfaceAudience.Private
 public class SecondaryKeyOptimizerUtil {
     private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
+    private static boolean isSparkMode;
 
     private SecondaryKeyOptimizerUtil() {
 
@@ -241,14 +244,33 @@ public class SecondaryKeyOptimizerUtil {
         }
 
         PhysicalOperator root = reduceRoots.get(0);
-        if (!(root instanceof POPackage)) {
-            log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
-            return null;
+        PhysicalOperator currentNode = null;
+        if (!isSparkMode) {
+            if (!(root instanceof POPackage)) {
+                log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+                return null;
+            } else {
+                currentNode = root;
+            }
+        } else {
+            if (!(root instanceof POGlobalRearrange)) {
+                log.debug("Expected reduce root to be a POGlobalRearrange, skip secondary key optimizing");
+                return null;
+            } else {
+                List<PhysicalOperator> globalRearrangeSuccs = reducePlan
+                        .getSuccessors(root);
+                if (globalRearrangeSuccs.size() == 1) {
+                    currentNode = globalRearrangeSuccs.get(0);
+                } else {
+                    log.debug("Expected successor of a POGlobalRearrange is POPackage, skip secondary key optimizing");
+                    return null;
+                }
+            }
         }
 
         // visit the POForEach of the reduce plan. We can have Limit and Filter
         // in the middle
-        PhysicalOperator currentNode = root;
+
         POForEach foreach = null;
         while (currentNode != null) {
             if (currentNode instanceof POPackage
@@ -402,8 +424,15 @@ public class SecondaryKeyOptimizerUtil {
                     throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
                 }
             }
-            POPackage pack = (POPackage) root;
-            pack.getPkgr().setUseSecondaryKey(true);
+
+            if (root instanceof POGlobalRearrangeSpark) {
+                POGlobalRearrangeSpark plg = (POGlobalRearrangeSpark) root;
+                plg.setUseSecondaryKey(true);
+                plg.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
+            } else if (root instanceof POPackage) {
+                POPackage pack = (POPackage) root;
+                pack.getPkgr().setUseSecondaryKey(true);
+            }
         }
         return secKeyOptimizerInfo;
     }
@@ -658,4 +687,7 @@ public class SecondaryKeyOptimizerUtil {
         return false;
     }
 
+    public static void setIsSparkMode(boolean isSparkMode) {
+        SecondaryKeyOptimizerUtil.isSparkMode = isSparkMode;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java Fri May 15 13:00:02 2015
@@ -534,6 +534,24 @@ public abstract class OperatorPlan<E ext
             connect(oper, leaf);
         }
     }
+
+    /**
+     * Adds the root operator to the plan and connects
+     * all existing roots the new root
+     *
+     * @param root
+     * @throws PlanException
+     */
+    public void addAsRoot(E root) throws PlanException {
+        List<E> oldRoots = new ArrayList<E>();
+        for (E operator : getRoots()) {
+            oldRoots.add(operator);
+        }
+        add(root);
+        for (E oper : oldRoots) {
+            connect(root, oper);
+        }
+    }
     
     public boolean isSingleLeafPlan() {
         List<E> tmpList = getLeaves() ;

Added: pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java?rev=1679557&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java (added)
+++ pig/branches/spark/test/org/apache/pig/spark/TestSecondarySortSpark.java Fri May 15 13:00:02 2015
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.test.MiniGenericCluster;
+import org.apache.pig.test.TestSecondarySort;
+import org.apache.pig.test.Util;
+
+/**
+ * TestSecondarySortSpark.
+ */
+public class TestSecondarySortSpark extends TestSecondarySort {
+
+    public TestSecondarySortSpark() {
+        super();
+    }
+
+    @Override
+    public MiniGenericCluster getCluster() {
+        return MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_SPARK);
+    }
+
+    @Override
+    public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query) throws Exception {
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        SparkCompiler comp = new SparkCompiler(pp, pc);
+        comp.compile();
+        SparkOperPlan sparkPlan = comp.getSparkPlan();
+        SecondaryKeyOptimizerSpark optimizer = new SecondaryKeyOptimizerSpark(sparkPlan);
+        optimizer.visit();
+        return optimizer;
+    }
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Fri May 15 13:00:02 2015
@@ -483,6 +483,9 @@ public abstract class TestSecondarySort
     @Test
     // Once custom partitioner is used, we cannot use secondary key optimizer, see PIG-3827
     public void testCustomPartitionerWithSort() throws Exception {
+        if( Util.isSparkExecType(cluster.getExecType())){
+            return;
+        }
         File tmpFile1 = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
         ps1.println("1\t2\t3");

Modified: pig/branches/spark/test/spark-tests
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/spark-tests?rev=1679557&r1=1679556&r2=1679557&view=diff
==============================================================================
--- pig/branches/spark/test/spark-tests (original)
+++ pig/branches/spark/test/spark-tests Fri May 15 13:00:02 2015
@@ -63,3 +63,4 @@
 **/TestMergeJoin.java
 **/TestNativeMapReduce.java
 **/TestPigProgressReporting.java
+**/TestSecondarySortSpark.java