You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/04/25 01:28:23 UTC
svn commit: r1792564 -
/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
Author: zly
Date: Tue Apr 25 01:28:23 2017
New Revision: 1792564
URL: http://svn.apache.org/viewvc?rev=1792564&view=rev
Log:
PIG-5047:support outer join for skewedjoin in spark mode(Xianda via Liyun)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1792564&r1=1792563&r2=1792564&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Tue Apr 25 01:28:23 2017
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.HashMap;
+import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -109,11 +110,10 @@ public class SkewedJoinConverter impleme
streamIdxKeyJavaRDD.rdd(), SparkUtil.getManifest(PartitionIndexedKey.class),
SparkUtil.getManifest(Tuple.class));
- JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> result_KeyValue = skewIndexedJavaPairRDD
- .join(streamIndexedJavaPairRDD, buildPartitioner(keyDist, defaultParallelism));
-
- JavaRDD<Tuple> result = result_KeyValue
- .mapPartitions(new ToValueFunction());
+ JavaRDD<Tuple> result = doJoin(skewIndexedJavaPairRDD,
+ streamIndexedJavaPairRDD,
+ buildPartitioner(keyDist, defaultParallelism),
+ keyDist);
// return type is RDD<Tuple>, so take it from JavaRDD<Tuple>
return result.rdd();
@@ -141,48 +141,99 @@ public class SkewedJoinConverter impleme
return new OperatorKey(poSkewedJoin.getOperatorKey().scope, NodeIdGenerator.getGenerator().getNextNodeId(poSkewedJoin.getOperatorKey().scope));
}
- private static class ToValueFunction implements
- FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>>, Tuple>, Serializable {
+ /**
+ * @param <L> be generic because it can be Optional<Tuple> or Tuple
+ * @param <R> be generic because it can be Optional<Tuple> or Tuple
+ */
+ private static class ToValueFunction<L, R> implements
+ FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable {
+
+ private boolean[] innerFlags;
+ private int[] schemaSize;
+
+ private final Broadcast<List<Tuple>> keyDist;
+
+ transient private boolean initialized = false;
+ transient protected Map<Tuple, Pair<Integer, Integer>> reducerMap;
+
+ public ToValueFunction(boolean[] innerFlags, int[] schemaSize, Broadcast<List<Tuple>> keyDist) {
+ this.innerFlags = innerFlags;
+ this.schemaSize = schemaSize;
+ this.keyDist = keyDist;
+ }
private class Tuple2TransformIterable implements Iterable<Tuple> {
- Iterator<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>> in;
+ Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> in;
Tuple2TransformIterable(
- Iterator<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>> input) {
+ Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) {
in = input;
}
public Iterator<Tuple> iterator() {
- return new IteratorTransform<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>, Tuple>(
+ return new IteratorTransform<Tuple2<PartitionIndexedKey, Tuple2<L, R>>, Tuple>(
in) {
@Override
protected Tuple transform(
- Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>> next) {
+ Tuple2<PartitionIndexedKey, Tuple2<L, R>> next) {
try {
- Tuple leftTuple = next._2._1;
- Tuple rightTuple = next._2._2;
+ L left = next._2._1;
+ R right = next._2._2;
TupleFactory tf = TupleFactory.getInstance();
- Tuple result = tf.newTuple(leftTuple.size()
- + rightTuple.size());
+ Tuple result = tf.newTuple();
- // append the two tuples together to make a
- // resulting tuple
- for (int i = 0; i < leftTuple.size(); i++)
- result.set(i, leftTuple.get(i));
- for (int i = 0; i < rightTuple.size(); i++)
- result.set(i + leftTuple.size(),
- rightTuple.get(i));
+ Tuple leftTuple = tf.newTuple();
+ if (!innerFlags[0]) {
+ // left should be Optional<Tuple>
+ Optional<Tuple> leftOption = (Optional<Tuple>) left;
+ if (!leftOption.isPresent()) {
+ // Add an empty left record for RIGHT OUTER JOIN.
+ // Notice: if it is a skewed, only join the first reduce key
+ if (isFirstReduceKey(next._1)) {
+ for (int i = 0; i < schemaSize[0]; i++) {
+ leftTuple.append(null);
+ }
+ } else {
+ return this.next();
+ }
+ } else {
+ leftTuple = leftOption.get();
+ }
+ } else {
+ leftTuple = (Tuple) left;
+ }
+ for (int i = 0; i < leftTuple.size(); i++) {
+ result.append(leftTuple.get(i));
+ }
+
+ Tuple rightTuple = tf.newTuple();
+ if (!innerFlags[1]) {
+ // right should be Optional<Tuple>
+ Optional<Tuple> rightOption = (Optional<Tuple>) right;
+ if (!rightOption.isPresent()) {
+ for (int i = 0; i < schemaSize[1]; i++) {
+ rightTuple.append(null);
+ }
+ } else {
+ rightTuple = rightOption.get();
+ }
+ } else {
+ rightTuple = (Tuple) right;
+ }
+ for (int i = 0; i < rightTuple.size(); i++) {
+ result.append(rightTuple.get(i));
+ }
if (log.isDebugEnabled()) {
log.debug("MJC: Result = " + result.toDelimitedString(" "));
}
- return result;
+ return result;
} catch (Exception e) {
- System.out.println(e);
+ log.warn(e);
}
return null;
}
@@ -192,9 +243,31 @@ public class SkewedJoinConverter impleme
@Override
public Iterable<Tuple> call(
- Iterator<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>> input) {
+ Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) {
return new Tuple2TransformIterable(input);
}
+
+ private boolean isFirstReduceKey(PartitionIndexedKey pKey) {
+ // non-skewed key
+ if (pKey.getPartitionId() == -1) {
+ return true;
+ }
+
+ if (!initialized) {
+ Integer[] reducers = new Integer[1];
+ reducerMap = loadKeyDistribution(keyDist, reducers);
+ initialized = true;
+ }
+
+ Pair<Integer, Integer> indexes = reducerMap.get(pKey.getKey());
+ if (indexes != null && pKey.getPartitionId() != indexes.first) {
+ // return false only when the key is skewed
+ // and it is not the first reduce key.
+ return false;
+ }
+
+ return true;
+ }
}
/**
@@ -514,4 +587,55 @@ public class SkewedJoinConverter impleme
return new SkewedJoinPartitioner(parallelism);
}
+ /**
+ * do all kinds of Join (inner, left outer, right outer, full outer)
+ *
+ * @param skewIndexedJavaPairRDD
+ * @param streamIndexedJavaPairRDD
+ * @param partitioner
+ * @return
+ */
+ private JavaRDD<Tuple> doJoin(
+ JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD,
+ JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD,
+ SkewedJoinPartitioner partitioner,
+ Broadcast<List<Tuple>> keyDist) {
+
+ boolean[] innerFlags = poSkewedJoin.getInnerFlags();
+ int[] schemaSize = {0, 0};
+ for (int i = 0; i < 2; i++) {
+ if (poSkewedJoin.getSchema(i) != null) {
+ schemaSize[i] = poSkewedJoin.getSchema(i).size();
+ }
+ }
+
+ ToValueFunction toValueFun = new ToValueFunction(innerFlags, schemaSize, keyDist);
+
+ if (innerFlags[0] && innerFlags[1]) {
+ // inner join
+ JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> resultKeyValue = skewIndexedJavaPairRDD.
+ join(streamIndexedJavaPairRDD, partitioner);
+
+ return resultKeyValue.mapPartitions(toValueFun);
+ } else if (innerFlags[0] && !innerFlags[1]) {
+ // left outer join
+ JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
+ leftOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+ return resultKeyValue.mapPartitions(toValueFun);
+ } else if (!innerFlags[0] && innerFlags[1]) {
+ // right outer join
+ JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Tuple>> resultKeyValue = skewIndexedJavaPairRDD.
+ rightOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+ return resultKeyValue.mapPartitions(toValueFun);
+ } else {
+ // full outer join
+ JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
+ fullOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+ return resultKeyValue.mapPartitions(toValueFun);
+ }
+ }
+
}