You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/07/18 03:48:02 UTC

svn commit: r1753141 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter: IndexedKey.java JoinGroupSparkConverter.java PigSecondaryKeyComparatorSpark.java

Author: xuefu
Date: Mon Jul 18 03:48:02 2016
New Revision: 1753141

URL: http://svn.apache.org/viewvc?rev=1753141&view=rev
Log:
PIG-4553: Implement secondary sort using one shuffle (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1753141&r1=1753140&r2=1753141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Mon Jul 18 03:48:02 2016
@@ -148,7 +148,7 @@ public class IndexedKey implements Seria
     }
 
     //firstly compare the index
-    //secondly compare the key
+    //secondly compare the key (both first and secondary key)
     @Override
     public int compareTo(Object o) {
         IndexedKey that = (IndexedKey) o;
@@ -160,15 +160,9 @@ public class IndexedKey implements Seria
         } else {
             if (useSecondaryKey) {
                 Tuple thisCompoundKey = (Tuple) key;
-                Tuple thatCompoundKey = (Tuple) that.getKey();
-                try {
-                    Object thisSecondary = thisCompoundKey.get(1);
-                    Object thatSecondaryKey = thatCompoundKey.get(1);
-                    return PigSecondaryKeyComparatorSpark.compareSecondaryKeys(thisSecondary, thatSecondaryKey, secondarySortOrder);
-
-                } catch (ExecException e) {
-                    throw new RuntimeException("IndexedKey#compareTo throws exception ", e);
-                }
+                Tuple thatCompoundKey = (Tuple)that.getKey();
+                return PigSecondaryKeyComparatorSpark.compareKeys(thisCompoundKey, thatCompoundKey,
+                        secondarySortOrder);
             } else {
                 return DataType.compare(key, that.getKey());
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1753141&r1=1753140&r2=1753141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Mon Jul 18 03:48:02 2016
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 
 import scala.Product2;
 import scala.Tuple2;
@@ -46,10 +47,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.spark.HashPartitioner;
+import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.rdd.CoGroupedRDD;
 import org.apache.spark.rdd.RDD;
 
@@ -80,50 +81,172 @@ public class JoinGroupSparkConverter imp
                     SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
         }
         if (rddAfterLRA.size() == 1 && useSecondaryKey) {
-            rddAfterLRA.set(0, handleSecondarySort(rddAfterLRA.get(0), parallelism));
-        }
+            return handleSecondarySort(rddAfterLRA.get(0), pkgOp);
+        } else {
 
-        CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
-                (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
-                        .asScalaBuffer(rddAfterLRA).toSeq()),
-                SparkUtil.getPartitioner(glaOp.getCustomPartitioner(), parallelism),
-		SparkUtil.getManifest(Object.class));
-
-        RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
-                (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
-        return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp, this.confBytes)).rdd();
+            CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
+                    (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
+                            .asScalaBuffer(rddAfterLRA).toSeq()),
+                    SparkUtil.getPartitioner(glaOp.getCustomPartitioner(), parallelism),
+                    SparkUtil.getManifest(Object.class));
+
+            RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+                    (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+            return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp, this.confBytes)).rdd();
+        }
     }
 
+    private RDD<Tuple> handleSecondarySort(
+            RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
+        JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class),
+                SparkUtil.getManifest(Tuple.class));
 
-    private RDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
-            RDD<Tuple2<IndexedKey, Tuple>> rdd, int parallelism) {
+        int partitionNums = pairRDD.partitions().size();
+        //repartition to group tuples with same indexedkey to same partition
+        JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
+                new IndexedKeyPartitioner(partitionNums));
+        //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+        return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
+    }
 
-        JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class), SparkUtil.getManifest(Tuple.class));
+    //Group tuples with same IndexKey into same partition
+    private static class IndexedKeyPartitioner extends Partitioner {
+        private int partition;
+        public IndexedKeyPartitioner(int partition) {
+            this.partition = partition;
+        }
+        @Override
+        public int getPartition(Object obj) {
+            IndexedKey indexedKey = (IndexedKey) obj;
+            Tuple key = (Tuple) indexedKey.getKey();
+
+            int hashCode = 0;
+            try {
+                hashCode = Objects.hashCode(key.get(0));
+            } catch (ExecException e) {
+                throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
+            }
+            return Math.abs(hashCode) % partition;
+        }
 
-        //first sort the tuple by secondary key if enable useSecondaryKey sort
-        JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
-                new HashPartitioner(parallelism));
-        JavaPairRDD<IndexedKey, Tuple> sortByKey = sorted.sortByKey();
-        return sortByKey.mapToPair(new RemoveSecondaryKey()).rdd();
+        @Override
+        public int numPartitions() {
+            return partition;
+        }
     }
 
-    private static class RemoveSecondaryKey implements
-            PairFunction<Tuple2<IndexedKey, Tuple>, IndexedKey, Tuple>,
+    //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+    //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result
+    private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
             Serializable {
+        private POPackage pkgOp;
+
+        public AccumulateByKey(POPackage pkgOp) {
+            this.pkgOp = pkgOp;
+        }
 
         @Override
-        public Tuple2<IndexedKey, Tuple> call(Tuple2<IndexedKey, Tuple> t) throws Exception {
-            IndexedKey key = t._1();
-            Tuple compoundKey = (Tuple) key.getKey();
-            if (compoundKey.size() < 2) {
-                throw new RuntimeException("compoundKey.size() should be more than 2");
+        public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception {
+            return new Iterable<Tuple>() {
+                Object curKey = null;
+                ArrayList curValues = new ArrayList();
+
+                @Override
+                public Iterator<Tuple> iterator() {
+                    return new Iterator<Tuple>() {
+
+                        @Override
+                        public boolean hasNext() {
+                            return it.hasNext() || curKey != null;
+                        }
+
+                        @Override
+                        public Tuple next() {
+                            while (it.hasNext()) {
+                                Tuple2<IndexedKey, Tuple> t = it.next();
+                                //key changes, restruct the last tuple by curKey, curValues and return
+                                Object tMainKey = null;
+                                try {
+                                    tMainKey = ((Tuple) (t._1()).getKey()).get(0);
+                                    if (curKey != null && !curKey.equals(tMainKey)) {
+                                        Tuple result = restructTuple(curKey, new ArrayList(curValues));
+                                        curValues.clear();
+                                        curKey = tMainKey;
+                                        curValues.add(t._2());
+                                        return result;
+                                    }
+                                    curKey = tMainKey;
+                                    //if key does not change, just append the value to the same key
+                                    curValues.add(t._2());
+
+                                } catch (ExecException e) {
+                                    throw new RuntimeException("AccumulateByKey throw exception: ", e);
+                                }
+                            }
+                            if (curKey == null) {
+                                throw new RuntimeException("AccumulateByKey curKey is null");
+                            }
+
+                            //if we get here, this should be the last record
+                            Tuple res = restructTuple(curKey, curValues);
+                            curKey = null;
+                            return res;
+                        }
+
+
+                        @Override
+                        public void remove() {
+                            // Not implemented.
+                            // throw Unsupported Method Invocation Exception.
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+            };
+        }
+
+        private Tuple restructTuple(final Object curKey, final ArrayList<Tuple> curValues) {
+            try {
+                Tuple retVal = null;
+                PigNullableWritable retKey = new PigNullableWritable() {
+
+                    public Object getValueAsPigType() {
+                        return curKey;
+                    }
+                };
+
+                //Here restruct a tupleIterator, later POPackage#tupIter will use it.
+                final Iterator<Tuple> tupleItearator = curValues.iterator();
+                Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() {
+                    public boolean hasNext() {
+                        return tupleItearator.hasNext();
+                    }
+
+                    public NullableTuple next() {
+                        Tuple t = tupleItearator.next();
+                        return new NullableTuple(t);
+                    }
+
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+                pkgOp.setInputs(null);
+                pkgOp.attachInput(retKey, iterator);
+                Result res = pkgOp.getNextTuple();
+                if (res.returnStatus == POStatus.STATUS_OK) {
+                    retVal = (Tuple) res.result;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("AccumulateByKey out: " + retVal);
+                }
+                return retVal;
+            } catch (ExecException e) {
+                throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e);
             }
-            IndexedKey newKey = new IndexedKey(key.getIndex(), compoundKey.get(0));
-            return new Tuple2<IndexedKey, Tuple>(newKey, t._2());
         }
     }
 
-
     private static class LocalRearrangeFunction extends
             AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1753141&r1=1753140&r2=1753141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Mon Jul 18 03:48:02 2016
@@ -66,7 +66,11 @@ class PigSecondaryKeyComparatorSpark imp
         }
     }
 
-    public static int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+    private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc){
+        return compareKeys(o1, o2, asc);
+    }
+
+    public static int compareKeys(Object o1, Object o2, boolean[] asc) {
         int rc = 0;
         if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
             // objects are Tuples, we may need to apply sort order inside them