You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/21 07:16:26 UTC

svn commit: r1640859 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java

Author: szehon
Date: Fri Nov 21 06:16:26 2014
New Revision: 1640859

URL: http://svn.apache.org/r1640859
Log:
HIVE-8908 : Investigate test failure on join34.q [Spark Branch] (Chao Sun via Szehon)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1640859&r1=1640858&r2=1640859&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Fri Nov 21 06:16:26 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
 import org.apache.hadoop.hive.ql.exec.MuxOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -338,6 +339,29 @@ public class SparkMapJoinOptimizer imple
         return -1;
       }
 
+      // Union is hard to handle. For instance, the following case:
+      //  TS    TS
+      //  |      |
+      //  FIL   FIL
+      //  |      |
+      //  SEL   SEL
+      //    \   /
+      //    UNION
+      //      |
+      //      RS
+      //      |
+      //     JOIN
+      // If we treat this as a MJ case, then after the RS is removed, we would
+      // create two MapWorks, for each of the TS. Each of these MapWork will contain
+      // a MJ operator, which is wrong.
+      // Otherwise, we could try to break the op tree at the UNION, and create two MapWorks
+      // for the branches above. Then, MJ will be in the following ReduceWork.
+      // But, this is tricky to implement, and we'll leave it as a future work for now.
+      // TODO: handle this as a MJ case
+      if (containUnionWithoutRS(parentOp.getParentOperators().get(0))) {
+        return -1;
+      }
+
       long inputSize = currInputStat.getDataSize();
       if ((bigInputStat == null) ||
           ((bigInputStat != null) &&
@@ -420,19 +444,6 @@ public class SparkMapJoinOptimizer imple
     Operator<? extends OperatorDesc> parentBigTableOp =
         mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
-      for (Operator<?> p : parentBigTableOp.getParentOperators()) {
-        // we might have generated a dynamic partition operator chain. Since
-        // we're removing the reduce sink we need do remove that too.
-        Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>();
-        for (Operator<?> c : p.getChildOperators()) {
-          if (hasDynamicPartitionBroadcast(c)) {
-            dynamicPartitionOperators.add(c);
-          }
-        }
-        for (Operator<?> c : dynamicPartitionOperators) {
-          p.removeChild(c);
-        }
-      }
       mapJoinOp.getParentOperators().remove(bigTablePosition);
       if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
         mapJoinOp.getParentOperators().add(bigTablePosition,
@@ -450,31 +461,25 @@ public class SparkMapJoinOptimizer imple
     return mapJoinOp;
   }
 
-  private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
-    boolean hasDynamicPartitionPruning = false;
-
-    for (Operator<?> op: parent.getChildOperators()) {
-      while (op != null) {
-        if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
-          // found dynamic partition pruning operator
-          hasDynamicPartitionPruning = true;
+  private boolean containUnionWithoutRS(Operator<? extends OperatorDesc> op) {
+    boolean result = false;
+    if (op instanceof UnionOperator) {
+      for (Operator<? extends OperatorDesc> pop : op.getParentOperators()) {
+        if (!(pop instanceof ReduceSinkOperator)) {
+          result = true;
           break;
         }
-
-        if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
-          // crossing reduce sink or file sink means the pruning isn't for this parent.
-          break;
-        }
-
-        if (op.getChildOperators().size() != 1) {
-          // dynamic partition pruning pipeline doesn't have multiple children
+      }
+    } else if (op instanceof ReduceSinkOperator) {
+      result = false;
+    } else {
+      for (Operator<? extends OperatorDesc> pop : op.getParentOperators()) {
+        if (containUnionWithoutRS(pop)) {
+          result = true;
           break;
         }
-
-        op = op.getChildOperators().get(0);
       }
     }
-
-    return hasDynamicPartitionPruning;
+    return result;
   }
 }