You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/04/25 01:28:23 UTC

svn commit: r1792564 - /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java

Author: zly
Date: Tue Apr 25 01:28:23 2017
New Revision: 1792564

URL: http://svn.apache.org/viewvc?rev=1792564&view=rev
Log:
PIG-5047:support outer join for skewedjoin in spark mode(Xianda via Liyun)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1792564&r1=1792563&r2=1792564&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java Tue Apr 25 01:28:23 2017
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -109,11 +110,10 @@ public class SkewedJoinConverter impleme
                 streamIdxKeyJavaRDD.rdd(), SparkUtil.getManifest(PartitionIndexedKey.class),
                 SparkUtil.getManifest(Tuple.class));
 
-        JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> result_KeyValue = skewIndexedJavaPairRDD
-                .join(streamIndexedJavaPairRDD, buildPartitioner(keyDist, defaultParallelism));
-
-        JavaRDD<Tuple> result = result_KeyValue
-                .mapPartitions(new ToValueFunction());
+        JavaRDD<Tuple> result = doJoin(skewIndexedJavaPairRDD,
+                streamIndexedJavaPairRDD,
+                buildPartitioner(keyDist, defaultParallelism),
+                keyDist);
 
         // return type is RDD<Tuple>, so take it from JavaRDD<Tuple>
         return result.rdd();
@@ -141,48 +141,99 @@ public class SkewedJoinConverter impleme
         return new OperatorKey(poSkewedJoin.getOperatorKey().scope, NodeIdGenerator.getGenerator().getNextNodeId(poSkewedJoin.getOperatorKey().scope));
     }
 
-    private static class ToValueFunction implements
-            FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>>, Tuple>, Serializable {
+    /**
+     * @param <L> be generic because it can be Optional<Tuple> or Tuple
+     * @param <R> be generic because it can be Optional<Tuple> or Tuple
+     */
+    private static class ToValueFunction<L, R> implements
+            FlatMapFunction<Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>>, Tuple>, Serializable {
+
+        private boolean[] innerFlags;
+        private int[] schemaSize;
+
+        private final Broadcast<List<Tuple>> keyDist;
+
+        transient private boolean initialized = false;
+        transient protected Map<Tuple, Pair<Integer, Integer>> reducerMap;
+
+        public ToValueFunction(boolean[] innerFlags, int[] schemaSize, Broadcast<List<Tuple>> keyDist) {
+            this.innerFlags = innerFlags;
+            this.schemaSize = schemaSize;
+            this.keyDist = keyDist;
+        }
 
         private class Tuple2TransformIterable implements Iterable<Tuple> {
 
-            Iterator<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>> in;
+            Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> in;
 
             Tuple2TransformIterable(
-                    Iterator<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>> input) {
+                    Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) {
                 in = input;
             }
 
             public Iterator<Tuple> iterator() {
-                return new IteratorTransform<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>, Tuple>(
+                return new IteratorTransform<Tuple2<PartitionIndexedKey, Tuple2<L, R>>, Tuple>(
                         in) {
                     @Override
                     protected Tuple transform(
-                            Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>> next) {
+                            Tuple2<PartitionIndexedKey, Tuple2<L, R>> next) {
                         try {
 
-                            Tuple leftTuple = next._2._1;
-                            Tuple rightTuple = next._2._2;
+                            L left = next._2._1;
+                            R right = next._2._2;
 
                             TupleFactory tf = TupleFactory.getInstance();
-                            Tuple result = tf.newTuple(leftTuple.size()
-                                    + rightTuple.size());
+                            Tuple result = tf.newTuple();
 
-                            // append the two tuples together to make a
-                            // resulting tuple
-                            for (int i = 0; i < leftTuple.size(); i++)
-                                result.set(i, leftTuple.get(i));
-                            for (int i = 0; i < rightTuple.size(); i++)
-                                result.set(i + leftTuple.size(),
-                                        rightTuple.get(i));
+                            Tuple leftTuple = tf.newTuple();
+                            if (!innerFlags[0]) {
+                                // left should be Optional<Tuple>
+                                Optional<Tuple> leftOption = (Optional<Tuple>) left;
+                                if (!leftOption.isPresent()) {
+                                    // Add an empty left record for RIGHT OUTER JOIN.
+                                    // Notice: if it is a skewed, only join the first reduce key
+                                    if (isFirstReduceKey(next._1)) {
+                                        for (int i = 0; i < schemaSize[0]; i++) {
+                                            leftTuple.append(null);
+                                        }
+                                    } else {
+                                        return this.next();
+                                    }
+                                } else {
+                                    leftTuple = leftOption.get();
+                                }
+                            } else {
+                                leftTuple = (Tuple) left;
+                            }
+                            for (int i = 0; i < leftTuple.size(); i++) {
+                                result.append(leftTuple.get(i));
+                            }
+
+                            Tuple rightTuple = tf.newTuple();
+                            if (!innerFlags[1]) {
+                                // right should be Optional<Tuple>
+                                Optional<Tuple> rightOption = (Optional<Tuple>) right;
+                                if (!rightOption.isPresent()) {
+                                    for (int i = 0; i < schemaSize[1]; i++) {
+                                        rightTuple.append(null);
+                                    }
+                                } else {
+                                    rightTuple = rightOption.get();
+                                }
+                            } else {
+                                rightTuple = (Tuple) right;
+                            }
+                            for (int i = 0; i < rightTuple.size(); i++) {
+                                result.append(rightTuple.get(i));
+                            }
 
                             if (log.isDebugEnabled()) {
                                 log.debug("MJC: Result = " + result.toDelimitedString(" "));
                             }
-                            return result;
 
+                            return result;
                         } catch (Exception e) {
-                            System.out.println(e);
+                            log.warn(e);
                         }
                         return null;
                     }
@@ -192,9 +243,31 @@ public class SkewedJoinConverter impleme
 
         @Override
         public Iterable<Tuple> call(
-                Iterator<Tuple2<PartitionIndexedKey, Tuple2<Tuple, Tuple>>> input) {
+                Iterator<Tuple2<PartitionIndexedKey, Tuple2<L, R>>> input) {
             return new Tuple2TransformIterable(input);
         }
+
+        private boolean isFirstReduceKey(PartitionIndexedKey pKey) {
+            // non-skewed key
+            if (pKey.getPartitionId() == -1) {
+                return true;
+            }
+
+            if (!initialized) {
+                Integer[] reducers = new Integer[1];
+                reducerMap = loadKeyDistribution(keyDist, reducers);
+                initialized = true;
+            }
+
+            Pair<Integer, Integer> indexes = reducerMap.get(pKey.getKey());
+            if (indexes != null && pKey.getPartitionId() != indexes.first) {
+                // return false only when the key is skewed
+                // and it is not the first reduce key.
+                return false;
+            }
+
+            return true;
+        }
     }
 
     /**
@@ -514,4 +587,55 @@ public class SkewedJoinConverter impleme
         return new SkewedJoinPartitioner(parallelism);
     }
 
+    /**
+     * do all kinds of Join (inner, left outer, right outer, full outer)
+     *
+     * @param skewIndexedJavaPairRDD
+     * @param streamIndexedJavaPairRDD
+     * @param partitioner
+     * @return
+     */
+    private JavaRDD<Tuple> doJoin(
+            JavaPairRDD<PartitionIndexedKey, Tuple> skewIndexedJavaPairRDD,
+            JavaPairRDD<PartitionIndexedKey, Tuple> streamIndexedJavaPairRDD,
+            SkewedJoinPartitioner partitioner,
+            Broadcast<List<Tuple>> keyDist) {
+
+        boolean[] innerFlags = poSkewedJoin.getInnerFlags();
+        int[] schemaSize = {0, 0};
+        for (int i = 0; i < 2; i++) {
+            if (poSkewedJoin.getSchema(i) != null) {
+                schemaSize[i] = poSkewedJoin.getSchema(i).size();
+            }
+        }
+
+        ToValueFunction toValueFun = new ToValueFunction(innerFlags, schemaSize, keyDist);
+
+        if (innerFlags[0] && innerFlags[1]) {
+            // inner join
+            JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Tuple>> resultKeyValue = skewIndexedJavaPairRDD.
+                    join(streamIndexedJavaPairRDD, partitioner);
+
+            return resultKeyValue.mapPartitions(toValueFun);
+        } else if (innerFlags[0] && !innerFlags[1]) {
+            // left outer join
+            JavaPairRDD<PartitionIndexedKey, Tuple2<Tuple, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
+                    leftOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+            return resultKeyValue.mapPartitions(toValueFun);
+        } else if (!innerFlags[0] && innerFlags[1]) {
+            // right outer join
+            JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Tuple>> resultKeyValue = skewIndexedJavaPairRDD.
+                    rightOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+            return resultKeyValue.mapPartitions(toValueFun);
+        } else {
+            // full outer join
+            JavaPairRDD<PartitionIndexedKey, Tuple2<Optional<Tuple>, Optional<Tuple>>> resultKeyValue = skewIndexedJavaPairRDD.
+                    fullOuterJoin(streamIndexedJavaPairRDD, partitioner);
+
+            return resultKeyValue.mapPartitions(toValueFun);
+        }
+    }
+
 }