You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/07/06 14:27:43 UTC

svn commit: r1751690 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: ./ converter/ operator/ optimizer/

Author: xuefu
Date: Wed Jul  6 14:27:42 2016
New Revision: 1751690

URL: http://svn.apache.org/viewvc?rev=1751690&view=rev
Log:
PIG-4797: Optimization for join/group case for spark mode (Liyun via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1751690&r1=1751689&r2=1751690&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Wed Jul  6 14:27:42 2016
@@ -23,6 +23,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -48,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -238,13 +241,8 @@ public class JobGraphBuilder extends Spa
                                Set<OperatorKey> predsFromPreviousSparkOper)
             throws IOException {
         RDD<Tuple> nextRDD = null;
-        List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = plan
-                .getPredecessors(physicalOperator);
-        if (predecessorsOfCurrentPhysicalOp != null && predecessorsOfCurrentPhysicalOp.size() > 1) {
-            Collections.sort(predecessorsOfCurrentPhysicalOp);
-        }
-
-        Set<OperatorKey> operatorKeysOfAllPreds = new HashSet<OperatorKey>();
+        List<PhysicalOperator> predecessorsOfCurrentPhysicalOp = getPredecessors(plan, physicalOperator);
+        Set<OperatorKey> operatorKeysOfAllPreds = new LinkedHashSet<OperatorKey>();
         addPredsFromPrevoiousSparkOp(sparkOperator, physicalOperator, operatorKeysOfAllPreds);
         if (predecessorsOfCurrentPhysicalOp != null) {
             for (PhysicalOperator predecessor : predecessorsOfCurrentPhysicalOp) {
@@ -296,12 +294,29 @@ public class JobGraphBuilder extends Spa
         }
     }
 
+    private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, PhysicalOperator op) {
+        List preds = null;
+        if (!(op instanceof POJoinGroupSpark)) {
+            preds = plan.getPredecessors(op);
+            if (preds != null && preds.size() > 1) {
+                Collections.sort(preds);
+            }
+        } else {
+            //For POJoinGroupSpark, we could not use plan.getPredecessors(op)+ sort to get
+            //the predecessors with correct order, more detail see JoinOptimizerSpark#restructSparkOp
+            preds = ((POJoinGroupSpark) op).getPredecessors();
+        }
+        return preds;
+    }
+
     //get all rdds of predecessors sorted by the OperatorKey
     private List<RDD<Tuple>> sortPredecessorRDDs(Set<OperatorKey> operatorKeysOfAllPreds) {
         List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
-        List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
-        Collections.sort(operatorKeyOfAllPreds);
-        for (OperatorKey operatorKeyOfAllPred : operatorKeyOfAllPreds) {
+//        List<OperatorKey> operatorKeyOfAllPreds = Lists.newArrayList(operatorKeysOfAllPreds);
+//        Collections.sort(operatorKeyOfAllPreds);
+        //We need not sort operatorKeyOfAllPreds any more because operatorKeyOfAllPreds is LinkedHashSet
+        //which provides the order of insertion, before we insert element which is sorted by OperatorKey
+        for (OperatorKey operatorKeyOfAllPred : operatorKeysOfAllPreds) {
             predecessorRDDs.add(physicalOpRdds.get(operatorKeyOfAllPred));
         }
         return predecessorRDDs;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1751690&r1=1751689&r2=1751690&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Jul  6 14:27:42 2016
@@ -78,6 +78,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.JoinGroupSparkConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
@@ -94,9 +95,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.CombinerOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.JoinGroupOptimizerSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.MultiQueryOptimizerSpark;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.NoopFilterRemover;
 import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.ParallelismSetter;
@@ -114,6 +117,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -199,6 +203,7 @@ public class SparkLauncher extends Launc
         convertMap.put(POPackage.class, new PackageConverter(confBytes));
         convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
         convertMap.put(POGlobalRearrangeSpark.class, new GlobalRearrangeConverter());
+	    convertMap.put(POJoinGroupSpark.class, new JoinGroupSparkConverter(confBytes));
         convertMap.put(POLimit.class, new LimitConverter());
         convertMap.put(PODistinct.class, new DistinctConverter());
         convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
@@ -273,6 +278,12 @@ public class SparkLauncher extends Launc
             mqOptimizer.visit();
         }
 
+        //since JoinGroupOptimizerSpark modifies the plan and collapses LRA+GLA+PKG into POJoinGroupSpark while
+        //CombinerOptimizer collapses GLA+PKG into ReduceBy, so if JoinGroupOptimizerSpark first, the spark plan will be
+        //changed and not suitable for CombinerOptimizer.More detail see PIG-4797
+        JoinGroupOptimizerSpark joinOptimizer = new JoinGroupOptimizerSpark(plan);
+        joinOptimizer.visit();
+
         if (LOG.isDebugEnabled()) {
             System.out.println("after multiquery optimization:");
             explain(plan, System.out, "text", true);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1751690&r1=1751689&r2=1751690&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Wed Jul  6 14:27:42 2016
@@ -19,7 +19,10 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.Serializable;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 
 /**
@@ -28,9 +31,12 @@ import org.apache.pig.data.Tuple;
  * either empty (or is a tuple with one or more empty fields). In this case,
  * we must respect the SQL standard as documented in the equals() method.
  */
-public class IndexedKey implements Serializable {
+public class IndexedKey implements Serializable, Comparable {
+    private static final Log LOG = LogFactory.getLog(IndexedKey.class);
     private byte index;
     private Object key;
+    private boolean useSecondaryKey;
+    private boolean[] secondarySortOrder;
 
     public IndexedKey(byte index, Object key) {
         this.index = index;
@@ -140,4 +146,40 @@ public class IndexedKey implements Seria
         }
         return result;
     }
+
+    //firstly compare the index
+    //secondly compare the key
+    @Override
+    public int compareTo(Object o) {
+        IndexedKey that = (IndexedKey) o;
+        int res = index - that.getIndex();
+        if (res > 0) {
+            return 1;
+        } else if (res < 0) {
+            return -1;
+        } else {
+            if (useSecondaryKey) {
+                Tuple thisCompoundKey = (Tuple) key;
+                Tuple thatCompoundKey = (Tuple) that.getKey();
+                try {
+                    Object thisSecondary = thisCompoundKey.get(1);
+                    Object thatSecondaryKey = thatCompoundKey.get(1);
+                    return PigSecondaryKeyComparatorSpark.compareSecondaryKeys(thisSecondary, thatSecondaryKey, secondarySortOrder);
+
+                } catch (ExecException e) {
+                    throw new RuntimeException("IndexedKey#compareTo throws exception ", e);
+                }
+            } else {
+                return DataType.compare(key, that.getKey());
+            }
+        }
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+        this.secondarySortOrder = secondarySortOrder;
+    }
 }
\ No newline at end of file

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1751690&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Wed Jul  6 14:27:42 2016
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.runtime.AbstractFunction1;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.rdd.CoGroupedRDD;
+import org.apache.spark.rdd.RDD;
+
+
+public class JoinGroupSparkConverter implements RDDConverter<Tuple, Tuple, POJoinGroupSpark> {
+    private static final Log LOG = LogFactory
+            .getLog(JoinGroupSparkConverter.class);
+    private byte[] confBytes;
+
+    public JoinGroupSparkConverter(byte[] confBytes) {
+        this.confBytes = confBytes;
+    }
+
+    @Override
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POJoinGroupSpark op) throws IOException {
+        SparkUtil.assertPredecessorSizeGreaterThan(predecessors,
+                op, 0);
+        List<POLocalRearrange> lraOps = op.getLraOps();
+        POGlobalRearrangeSpark glaOp = op.getGlaOp();
+        POPackage pkgOp = op.getPkgOp();
+        int parallelism = SparkUtil.getParallelism(predecessors, glaOp);
+        List<RDD<Tuple2<IndexedKey, Tuple>>> rddAfterLRA = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
+        boolean useSecondaryKey = glaOp.isUseSecondaryKey();
+
+        for (int i = 0; i < predecessors.size(); i++) {
+            RDD<Tuple> rdd = predecessors.get(i);
+            rddAfterLRA.add(rdd.map(new LocalRearrangeFunction(lraOps.get(i), glaOp),
+                    SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
+        }
+        if (rddAfterLRA.size() == 1 && useSecondaryKey) {
+            return handleSecondarySort(rddAfterLRA.get(0), pkgOp);
+        } else {
+            CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
+                    (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
+                            .asScalaBuffer(rddAfterLRA).toSeq()),
+                    SparkUtil.getPartitioner(glaOp.getCustomPartitioner(), parallelism), SparkUtil.getManifest(Object
+                    .class));
+
+            RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+                    (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+            return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp, this.confBytes)).rdd();
+        }
+    }
+
+    private RDD<Tuple> handleSecondarySort(
+            RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
+        //first sort the tuple by secondary key if enable useSecondaryKey sort
+        JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class), SparkUtil.getManifest(Tuple.class));
+        int partitionNums = pairRDD.partitions().size();
+        //repartition to group tuples with same indexedkey to same partition
+        JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
+                new IndexedKeyPartitioner(partitionNums));
+        //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+        return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
+    }
+
+    //Group tuples with same IndexKey into same partition
+    private static class IndexedKeyPartitioner extends Partitioner {
+        private int partition;
+
+        public IndexedKeyPartitioner(int partition) {
+            this.partition = partition;
+        }
+
+        @Override
+        public int getPartition(Object obj) {
+            IndexedKey indexedKey = (IndexedKey) obj;
+            Tuple key = (Tuple) indexedKey.getKey();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("key:" + key);
+            }
+            int hashCode = 0;
+            try {
+                hashCode = Objects.hashCode(key.get(0));
+            } catch (ExecException e) {
+                throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
+            }
+            return Math.abs(hashCode) % partition;
+        }
+
+        @Override
+        public int numPartitions() {
+            return partition;
+        }
+    }
+
+    //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+    //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result
+    private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>, Serializable {
+        private POPackage pkgOp;
+
+        public AccumulateByKey(POPackage pkgOp) {
+            this.pkgOp = pkgOp;
+        }
+
+        @Override
+        public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception {
+            return new Iterable<Tuple>() {
+                IndexedKey curKey = null;
+                ArrayList curValues = new ArrayList();
+
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new Iterator<Tuple>() {
+
+                        @Override
+                        public boolean hasNext() {
+                            return it.hasNext() || curKey != null;
+                        }
+
+                        @Override
+                        public Tuple next() {
+                            while (it.hasNext()) {
+                                Tuple2<IndexedKey, Tuple> t = it.next();
+                                //key changes, restruct the last tuple by curKey, curValues and return
+                                if (curKey != null && !curKey.equals(t._1())) {
+                                    IndexedKey retKey = curKey;
+                                    Tuple result = restructTuple(retKey, new ArrayList(curValues));
+                                    curValues.clear();
+                                    curKey = t._1();
+                                    curValues.add(t._2());
+                                    return result;
+                                }
+                                curKey = t._1();
+                                //if key does not change, just append the value to the same key
+                                curValues.add(t._2());
+                            }
+                            if (curKey == null) {
+                                throw new RuntimeException("AccumulateByKey curKey is null");
+                            }
+
+                            //if we get here, this should be the last record
+                            Tuple res = restructTuple(curKey, curValues);
+                            curKey = null;
+                            return res;
+                        }
+
+
+                        @Override
+                        public void remove() {
+                            // Not implemented.
+                            // throw Unsupported Method Invocation Exception.
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+            };
+        }
+
+        private Tuple restructTuple(final IndexedKey curKey, final ArrayList<Tuple> curValues) {
+            try {
+                Tuple retVal = null;
+                PigNullableWritable retKey = new PigNullableWritable() {
+
+                    public Object getValueAsPigType() {
+                        Tuple compoundKey = (Tuple) curKey.getKey();
+                        Object keyTuple = null;
+                        try {
+                            keyTuple = compoundKey.get(0);
+                        } catch (ExecException e) {
+                            throw new RuntimeException("AccumulateByKeyr#restructTuple throw exception: ", e);
+                        }
+                        return keyTuple;
+                    }
+                };
+
+                //Here restruct a tupleIterator, later POPackage#tupIter will use it.
+                final Iterator<Tuple> tupleItearator = curValues.iterator();
+                Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() {
+                    public boolean hasNext() {
+                        return tupleItearator.hasNext();
+                    }
+
+                    public NullableTuple next() {
+                        Tuple t = tupleItearator.next();
+                        return new NullableTuple(t);
+                    }
+
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+                pkgOp.setInputs(null);
+                pkgOp.attachInput(retKey, iterator);
+                Result res = pkgOp.getNextTuple();
+                if (res.returnStatus == POStatus.STATUS_OK) {
+                    retVal = (Tuple) res.result;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("AccumulateByKey out: " + retVal);
+                }
+                return retVal;
+            } catch (ExecException e) {
+                throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e);
+            }
+        }
+    }
+
+    private static class LocalRearrangeFunction extends
+            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
+
+        private final POLocalRearrange lra;
+
+        private boolean useSecondaryKey;
+        private boolean[] secondarySortOrder;
+
+        public LocalRearrangeFunction(POLocalRearrange lra, POGlobalRearrangeSpark glaOp) {
+            if( glaOp.isUseSecondaryKey()) {
+                this.useSecondaryKey = glaOp.isUseSecondaryKey();
+                this.secondarySortOrder = glaOp.getSecondarySortOrder();
+            }
+            this.lra = lra;
+        }
+
+        @Override
+        public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LocalRearrangeFunction in " + t);
+            }
+            Result result;
+            try {
+                lra.setInputs(null);
+                lra.attachInput(t);
+                result = lra.getNextTuple();
+
+                if (result == null) {
+                    throw new RuntimeException(
+                            "Null response found for LocalRearange on tuple: "
+                                    + t);
+                }
+
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        // (index, key, value without keys)
+                        Tuple resultTuple = (Tuple) result.result;
+                        Object key = resultTuple.get(1);
+                        IndexedKey indexedKey = new IndexedKey((Byte) resultTuple.get(0), key);
+                        if( useSecondaryKey) {
+                            indexedKey.setUseSecondaryKey(useSecondaryKey);
+                            indexedKey.setSecondarySortOrder(secondarySortOrder);
+                        }
+                        Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
+                                (Tuple) resultTuple.get(2));
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("LocalRearrangeFunction out " + out);
+                        }
+                        return out;
+                    default:
+                        throw new RuntimeException(
+                                "Unexpected response code from operator "
+                                        + lra + " : " + result);
+                }
+            } catch (ExecException e) {
+                throw new RuntimeException(
+                        "Couldn't do LocalRearange on tuple: " + t, e);
+            }
+        }
+
+    }
+
+    /**
+     * Send cogroup output where each element is {key, bag[]} to PoPackage
+     * then call PoPackage#getNextTuple to get the result
+     */
+    private static class GroupPkgFunction implements
+            Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
+
+        private final POPackage pkgOp;
+        private byte[] confBytes;
+        private JobConf jobConf = null;
+
+        public GroupPkgFunction(POPackage pkgOp, byte[] confBytes) {
+            this.pkgOp = pkgOp;
+            this.confBytes = confBytes;
+        }
+
+        void initializeJobConf() {
+            jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+            jobConf.set("pig.cachedbag.type", "default");
+            PigMapReduce.sJobConfInternal.set(jobConf);
+        }
+
+        @Override
+        public Tuple call(final Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
+            if( jobConf == null) {
+                initializeJobConf();
+            }
+
+            try {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GroupPkgFunction in " + input);
+                }
+
+                final PigNullableWritable key = new PigNullableWritable() {
+
+                    public Object getValueAsPigType() {
+                        IndexedKey keyTuple = input._1();
+                        return keyTuple.getKey();
+                    }
+                };
+                Object obj = input._2();
+                // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
+                Seq<Tuple>[] bags = (Seq<Tuple>[]) obj;
+                int i = 0;
+                List<Iterator<NullableTuple>> tupleIterators = new ArrayList<Iterator<NullableTuple>>();
+                for (int j = 0; j < bags.length; j++) {
+                    Seq<Tuple> bag = bags[j];
+                    Iterator<Tuple> iterator = JavaConversions
+                            .asJavaCollection(bag).iterator();
+                    final int index = i;
+                    tupleIterators.add(new IteratorTransform<Tuple, NullableTuple>(
+                            iterator) {
+                        @Override
+                        protected NullableTuple transform(Tuple next) {
+                            NullableTuple nullableTuple = new NullableTuple(next);
+                            nullableTuple.setIndex((byte) index);
+                            return nullableTuple;
+                        }
+                    });
+                    ++i;
+                }
+
+
+                pkgOp.setInputs(null);
+                pkgOp.attachInput(key, new IteratorUnion<NullableTuple>(tupleIterators.iterator()));
+                Result result = pkgOp.getNextTuple();
+                if (result == null) {
+                    throw new RuntimeException(
+                            "Null response found for Package on key: " + key);
+                }
+                Tuple out;
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        // (key, {(value)...})
+                        out = (Tuple) result.result;
+                        break;
+                    case POStatus.STATUS_NULL:
+                        out = null;
+                        break;
+                    default:
+                        throw new RuntimeException(
+                                "Unexpected response code from operator "
+                                        + pkgOp + " : " + result + " "
+                                        + result.returnStatus);
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("GroupPkgFunction out " + out);
+                }
+                return out;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+
+    private static class IteratorUnion<T> implements Iterator<T> {
+
+        private final Iterator<Iterator<T>> iterators;
+
+        private Iterator<T> current;
+
+        public IteratorUnion(Iterator<Iterator<T>> iterators) {
+            super();
+            this.iterators = iterators;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (current != null && current.hasNext()) {
+                return true;
+            } else if (iterators.hasNext()) {
+                current = iterators.next();
+                return hasNext();
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public T next() {
+            return current.next();
+        }
+
+        @Override
+        public void remove() {
+            current.remove();
+        }
+
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1751690&r1=1751689&r2=1751690&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Wed Jul  6 14:27:42 2016
@@ -66,7 +66,7 @@ class PigSecondaryKeyComparatorSpark imp
         }
     }
 
-    private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+    public static int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
         int rc = 0;
         if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
             // objects are Tuples, we may need to apply sort order inside them

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java?rev=1751690&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java Wed Jul  6 14:27:42 2016
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Collapse POLocalRearrange,POGlobalRearrange and POPackage to POJoinGroupSpark to reduce unnecessary map operations in the join/group
+ */
+public class POJoinGroupSpark extends PhysicalOperator {
+    private List<POLocalRearrange> lraOps;
+    private POGlobalRearrangeSpark glaOp;
+    private POPackage pkgOp;
+    private List<PhysicalOperator> predecessors;
+
+    public POJoinGroupSpark(List<POLocalRearrange> lraOps, POGlobalRearrangeSpark glaOp, POPackage pkgOp){
+        super(glaOp.getOperatorKey());
+        this.lraOps = lraOps;
+        this.glaOp = glaOp;
+        this.pkgOp = pkgOp;
+    }
+
+    public List<POLocalRearrange> getLraOps() {
+        return lraOps;
+    }
+
+    public POGlobalRearrangeSpark getGlaOp() {
+        return glaOp;
+    }
+
+    public POPackage getPkgOp() {
+        return pkgOp;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "POJoinGroupSpark"+ "["
+                + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
+
+    public void setPredecessors(List<PhysicalOperator> predecessors) {
+        this.predecessors = predecessors;
+    }
+
+    public List<PhysicalOperator> getPredecessors() {
+        return predecessors;
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java?rev=1751690&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java Wed Jul  6 14:27:42 2016
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Collapse LocalRearrange,GlobalRearrange,Package to POJoinGroupSpark to reduce unnecessary
+ * map operations to optimize join/group. Detail see PIG-4797
+ */
+public class JoinGroupOptimizerSpark extends SparkOpPlanVisitor {
+    private static final Log LOG = LogFactory.getLog(JoinGroupOptimizerSpark.class);
+
+    public JoinGroupOptimizerSpark(SparkOperPlan plan) {
+        super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        if (sparkOp.physicalPlan != null) {
+            GlobalRearrangeDiscover glrDiscover = new GlobalRearrangeDiscover(sparkOp.physicalPlan);
+            glrDiscover.visit();
+            List<PhysicalPlan> plans = glrDiscover.getPlansWithJoinAndGroup();
+            handlePlans(plans);
+        }
+
+    }
+
+    private void handlePlans(List<PhysicalPlan> plans) throws VisitorException {
+        for(int i=0;i<plans.size();i++){
+            PhysicalPlan planWithJoinAndGroup = plans.get(i);
+            POGlobalRearrangeSpark glrSpark = PlanHelper.getPhysicalOperators(planWithJoinAndGroup,POGlobalRearrangeSpark.class).get(0);
+            if (verifyJoinOrGroupCase(plans.get(i), glrSpark)) {
+                try {
+                    restructSparkOp(planWithJoinAndGroup, glrSpark);
+                } catch (PlanException e) {
+                    throw new RuntimeException("GlobalRearrangeDiscover#visitSparkOp fails: ", e);
+                }
+            }
+        }
+    }
+
+    static class GlobalRearrangeDiscover extends PhyPlanVisitor {
+        private List<PhysicalPlan> plansWithJoinAndGroup = new ArrayList<PhysicalPlan>();
+        public GlobalRearrangeDiscover(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+        }
+
+        @Override
+        public void visitGlobalRearrange(POGlobalRearrange glr) throws VisitorException {
+            PhysicalPlan currentPlan = this.mCurrentWalker.getPlan();//If there are POSplit, we need traverse the POSplit.getPlans(), so use mCurrentWalker.getPlan()
+            if( currentPlan != null) {
+                plansWithJoinAndGroup.add(currentPlan);
+            }else{
+                LOG.info("GlobalRearrangeDiscover#currentPlan is null");
+            }
+
+        }
+
+        public List<PhysicalPlan> getPlansWithJoinAndGroup() {
+            return plansWithJoinAndGroup;
+        }
+    }
+
+    //collapse LRA,GRA,PKG to POJoinGroupSpark
+    private void restructSparkOp(PhysicalPlan plan,POGlobalRearrangeSpark glaOp) throws PlanException {
+
+        List<PhysicalOperator> predes = plan.getPredecessors(glaOp);
+        if (predes != null) {
+            List<POLocalRearrange> lraOps = new ArrayList<POLocalRearrange>();
+            List<PhysicalOperator> allPredsOfLRA = new ArrayList<PhysicalOperator>();
+
+            //Get the predecessors of POJoinGroupSpark with correct order after JoinOptimizationSpark
+            //For other PhysicalOperator, we usually use OperatorPlan#getPredecessor(op) to get predecessors and sort predecessors[JobGraphBuilder#getPredecessors] to
+            //get the predecessor with correct order(in common case, PhysicalOperator
+            //with small OperatorKey must be executed before that with bigger OperatorKey),but this is not suitable for POJoinGroupSpark
+            //Give an example to explain this:
+            //original:
+            //POLOAD(scope-1)                                POLOAD(scope-2)
+            //               \                                   /
+            //   POFOREach(scope-3)                              POLocalRearrange(scope-5)
+            //                  \                                /
+            //              POLocalRearrange(scope-4)       POLocalRearrange(scope-5)
+            //                      \                           /
+            //                              POGlobalRearrange(scope-6)
+            //                                      |
+            //                              POPackage(scope-7)
+            //after JoinOptimizationSpark:
+            //POLOAD(scope-1)                                POLOAD(scope-2)
+            //               \                                   /
+            //   POFOREach(scope-3)                             /
+            //                     \                           /
+            //                        POJoinGroupSpark(scope-8)
+
+            //the predecessor of POJoinGroupSpark(scope-8) is POForEach(scope-3) and POLoad(scope-2) because they are
+            //the predecessor of POLocalRearrange(scope-4) and POLocalRearrange(scope-5) while we will get
+            //will be POLoad(scope-2) and POForEach(scope-3) if use OperatorPlan#getPredecessor(op)to gain predecessors and sort predecessors
+            Collections.sort(predes);
+            for (PhysicalOperator lra : predes) {
+                lraOps.add((POLocalRearrange) lra);
+                List<PhysicalOperator> predOfLRAList = plan.getPredecessors(lra);
+                if( predOfLRAList != null && predOfLRAList.size() ==1) {
+                    PhysicalOperator predOfLRA = predOfLRAList.get(0);
+                    plan.disconnect(predOfLRA, lra);
+                    allPredsOfLRA.add(predOfLRA);
+                }
+            }
+
+            POPackage pkgOp = (POPackage) plan.getSuccessors(glaOp).get(0);
+            PhysicalOperator pkgSuccessor = plan.getSuccessors(pkgOp).get(0);
+            POJoinGroupSpark joinSpark = new POJoinGroupSpark(lraOps, glaOp, pkgOp);
+            if(allPredsOfLRA.size()>0) {
+                joinSpark.setPredecessors(allPredsOfLRA);
+            }
+            plan.add(joinSpark);
+
+            for (PhysicalOperator predOfLRA : allPredsOfLRA) {
+                plan.connect(predOfLRA, joinSpark);
+            }
+
+            plan.disconnect(pkgOp, pkgSuccessor);
+            plan.connect(joinSpark, pkgSuccessor);
+            for (POLocalRearrange lra : lraOps) {
+                plan.remove(lra);
+            }
+            plan.remove(glaOp);
+            plan.remove(pkgOp);
+        }
+    }
+
+    private boolean verifyJoinOrGroupCase(PhysicalPlan plan, POGlobalRearrangeSpark glaOp) {
+        List<PhysicalOperator> lraOps = plan.getPredecessors(glaOp);
+        List<PhysicalOperator> pkgOps = plan.getSuccessors(glaOp);
+        boolean isAllPredecessorLRA = isAllPredecessorLRA(lraOps);
+        boolean isSuccessorPKG = isSuccessorPKG(pkgOps);
+        return isAllPredecessorLRA && isSuccessorPKG;
+    }
+
+    private boolean isSuccessorPKG(List<PhysicalOperator> pkgOps) {
+        boolean result = false;
+        if (pkgOps != null && (pkgOps.size() == 1)) {
+            if (pkgOps.get(0) instanceof POPackage) {
+                result = true;
+            }
+        } else {
+            result = false;
+        }
+
+
+        return result;
+    }
+
+    private boolean isAllPredecessorLRA(List<PhysicalOperator> lraOps) {
+        boolean result = true;
+        if (lraOps != null) {
+            for (PhysicalOperator lraOp : lraOps) {
+                if (!(lraOp instanceof POLocalRearrange)) {
+                    result = false;
+                    break;
+                }
+            }
+        } else {
+            result = false;
+        }
+
+        return result;
+    }
+}