You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/07/18 03:48:02 UTC
svn commit: r1753141 - in
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter:
IndexedKey.java JoinGroupSparkConverter.java
PigSecondaryKeyComparatorSpark.java
Author: xuefu
Date: Mon Jul 18 03:48:02 2016
New Revision: 1753141
URL: http://svn.apache.org/viewvc?rev=1753141&view=rev
Log:
PIG-4553: Implement secondary sort using one shuffle (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1753141&r1=1753140&r2=1753141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java Mon Jul 18 03:48:02 2016
@@ -148,7 +148,7 @@ public class IndexedKey implements Seria
}
//firstly compare the index
- //secondly compare the key
+ //secondly compare the key (both first and secondary key)
@Override
public int compareTo(Object o) {
IndexedKey that = (IndexedKey) o;
@@ -160,15 +160,9 @@ public class IndexedKey implements Seria
} else {
if (useSecondaryKey) {
Tuple thisCompoundKey = (Tuple) key;
- Tuple thatCompoundKey = (Tuple) that.getKey();
- try {
- Object thisSecondary = thisCompoundKey.get(1);
- Object thatSecondaryKey = thatCompoundKey.get(1);
- return PigSecondaryKeyComparatorSpark.compareSecondaryKeys(thisSecondary, thatSecondaryKey, secondarySortOrder);
-
- } catch (ExecException e) {
- throw new RuntimeException("IndexedKey#compareTo throws exception ", e);
- }
+ Tuple thatCompoundKey = (Tuple)that.getKey();
+ return PigSecondaryKeyComparatorSpark.compareKeys(thisCompoundKey, thatCompoundKey,
+ secondarySortOrder);
} else {
return DataType.compare(key, that.getKey());
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java?rev=1753141&r1=1753140&r2=1753141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java Mon Jul 18 03:48:02 2016
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import scala.Product2;
import scala.Tuple2;
@@ -46,10 +47,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.spark.HashPartitioner;
+import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.CoGroupedRDD;
import org.apache.spark.rdd.RDD;
@@ -80,50 +81,172 @@ public class JoinGroupSparkConverter imp
SparkUtil.<IndexedKey, Tuple>getTuple2Manifest()));
}
if (rddAfterLRA.size() == 1 && useSecondaryKey) {
- rddAfterLRA.set(0, handleSecondarySort(rddAfterLRA.get(0), parallelism));
- }
+ return handleSecondarySort(rddAfterLRA.get(0), pkgOp);
+ } else {
- CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
- (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
- .asScalaBuffer(rddAfterLRA).toSeq()),
- SparkUtil.getPartitioner(glaOp.getCustomPartitioner(), parallelism),
- SparkUtil.getManifest(Object.class));
-
- RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
- (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
- return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp, this.confBytes)).rdd();
+ CoGroupedRDD<Object> coGroupedRDD = new CoGroupedRDD<Object>(
+ (Seq<RDD<? extends Product2<Object, ?>>>) (Object) (JavaConversions
+ .asScalaBuffer(rddAfterLRA).toSeq()),
+ SparkUtil.getPartitioner(glaOp.getCustomPartitioner(), parallelism),
+ SparkUtil.getManifest(Object.class));
+
+ RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+ (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+ return rdd.toJavaRDD().map(new GroupPkgFunction(pkgOp, this.confBytes)).rdd();
+ }
}
+ private RDD<Tuple> handleSecondarySort(
+ RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
+ JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class),
+ SparkUtil.getManifest(Tuple.class));
- private RDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(
- RDD<Tuple2<IndexedKey, Tuple>> rdd, int parallelism) {
+ int partitionNums = pairRDD.partitions().size();
+ //repartition to group tuples with same indexedkey to same partition
+ JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
+ new IndexedKeyPartitioner(partitionNums));
+ //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+ return sorted.mapPartitions(new AccumulateByKey(pkgOp), true).rdd();
+ }
- JavaPairRDD<IndexedKey, Tuple> pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class), SparkUtil.getManifest(Tuple.class));
+ //Group tuples with same IndexKey into same partition
+ private static class IndexedKeyPartitioner extends Partitioner {
+ private int partition;
+ public IndexedKeyPartitioner(int partition) {
+ this.partition = partition;
+ }
+ @Override
+ public int getPartition(Object obj) {
+ IndexedKey indexedKey = (IndexedKey) obj;
+ Tuple key = (Tuple) indexedKey.getKey();
+
+ int hashCode = 0;
+ try {
+ hashCode = Objects.hashCode(key.get(0));
+ } catch (ExecException e) {
+ throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
+ }
+ return Math.abs(hashCode) % partition;
+ }
- //first sort the tuple by secondary key if enable useSecondaryKey sort
- JavaPairRDD<IndexedKey, Tuple> sorted = pairRDD.repartitionAndSortWithinPartitions(
- new HashPartitioner(parallelism));
- JavaPairRDD<IndexedKey, Tuple> sortByKey = sorted.sortByKey();
- return sortByKey.mapToPair(new RemoveSecondaryKey()).rdd();
+ @Override
+ public int numPartitions() {
+ return partition;
+ }
}
- private static class RemoveSecondaryKey implements
- PairFunction<Tuple2<IndexedKey, Tuple>, IndexedKey, Tuple>,
+ //Package tuples with same indexedkey as the result: (key,(val1,val2,val3,...))
+ //Send (key,Iterator) to POPackage, use POPackage#getNextTuple to get the result
+ private static class AccumulateByKey implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
Serializable {
+ private POPackage pkgOp;
+
+ public AccumulateByKey(POPackage pkgOp) {
+ this.pkgOp = pkgOp;
+ }
@Override
- public Tuple2<IndexedKey, Tuple> call(Tuple2<IndexedKey, Tuple> t) throws Exception {
- IndexedKey key = t._1();
- Tuple compoundKey = (Tuple) key.getKey();
- if (compoundKey.size() < 2) {
- throw new RuntimeException("compoundKey.size() should be more than 2");
+ public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception {
+ return new Iterable<Tuple>() {
+ Object curKey = null;
+ ArrayList curValues = new ArrayList();
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new Iterator<Tuple>() {
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext() || curKey != null;
+ }
+
+ @Override
+ public Tuple next() {
+ while (it.hasNext()) {
+ Tuple2<IndexedKey, Tuple> t = it.next();
+ //key changes, restruct the last tuple by curKey, curValues and return
+ Object tMainKey = null;
+ try {
+ tMainKey = ((Tuple) (t._1()).getKey()).get(0);
+ if (curKey != null && !curKey.equals(tMainKey)) {
+ Tuple result = restructTuple(curKey, new ArrayList(curValues));
+ curValues.clear();
+ curKey = tMainKey;
+ curValues.add(t._2());
+ return result;
+ }
+ curKey = tMainKey;
+ //if key does not change, just append the value to the same key
+ curValues.add(t._2());
+
+ } catch (ExecException e) {
+ throw new RuntimeException("AccumulateByKey throw exception: ", e);
+ }
+ }
+ if (curKey == null) {
+ throw new RuntimeException("AccumulateByKey curKey is null");
+ }
+
+ //if we get here, this should be the last record
+ Tuple res = restructTuple(curKey, curValues);
+ curKey = null;
+ return res;
+ }
+
+
+ @Override
+ public void remove() {
+ // Not implemented.
+ // throw Unsupported Method Invocation Exception.
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ private Tuple restructTuple(final Object curKey, final ArrayList<Tuple> curValues) {
+ try {
+ Tuple retVal = null;
+ PigNullableWritable retKey = new PigNullableWritable() {
+
+ public Object getValueAsPigType() {
+ return curKey;
+ }
+ };
+
+ //Here restruct a tupleIterator, later POPackage#tupIter will use it.
+ final Iterator<Tuple> tupleItearator = curValues.iterator();
+ Iterator<NullableTuple> iterator = new Iterator<NullableTuple>() {
+ public boolean hasNext() {
+ return tupleItearator.hasNext();
+ }
+
+ public NullableTuple next() {
+ Tuple t = tupleItearator.next();
+ return new NullableTuple(t);
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ pkgOp.setInputs(null);
+ pkgOp.attachInput(retKey, iterator);
+ Result res = pkgOp.getNextTuple();
+ if (res.returnStatus == POStatus.STATUS_OK) {
+ retVal = (Tuple) res.result;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AccumulateByKey out: " + retVal);
+ }
+ return retVal;
+ } catch (ExecException e) {
+ throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e);
}
- IndexedKey newKey = new IndexedKey(key.getIndex(), compoundKey.get(0));
- return new Tuple2<IndexedKey, Tuple>(newKey, t._2());
}
}
-
private static class LocalRearrangeFunction extends
AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements Serializable {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java?rev=1753141&r1=1753140&r2=1753141&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java Mon Jul 18 03:48:02 2016
@@ -66,7 +66,11 @@ class PigSecondaryKeyComparatorSpark imp
}
}
- public static int compareSecondaryKeys(Object o1, Object o2, boolean[] asc) {
+ private int compareSecondaryKeys(Object o1, Object o2, boolean[] asc){
+ return compareKeys(o1, o2, asc);
+ }
+
+ public static int compareKeys(Object o1, Object o2, boolean[] asc) {
int rc = 0;
if (o1 != null && o2 != null && o1 instanceof Tuple && o2 instanceof Tuple) {
// objects are Tuples, we may need to apply sort order inside them