You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/21 07:16:26 UTC
svn commit: r1640859 -
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
Author: szehon
Date: Fri Nov 21 06:16:26 2014
New Revision: 1640859
URL: http://svn.apache.org/r1640859
Log:
HIVE-8908 : Investigate test failure on join34.q [Spark Branch] (Chao Sun via Szehon)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1640859&r1=1640858&r2=1640859&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Fri Nov 21 06:16:26 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
import org.apache.hadoop.hive.ql.exec.MuxOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -338,6 +339,29 @@ public class SparkMapJoinOptimizer imple
return -1;
}
+ // Union is hard to handle. For instance, the following case:
+ // TS TS
+ // | |
+ // FIL FIL
+ // | |
+ // SEL SEL
+ // \ /
+ // UNION
+ // |
+ // RS
+ // |
+ // JOIN
+ // If we treat this as a MJ case, then after the RS is removed, we would
+ // create two MapWorks, for each of the TS. Each of these MapWork will contain
+ // a MJ operator, which is wrong.
+ // Otherwise, we could try to break the op tree at the UNION, and create two MapWorks
+ // for the branches above. Then, MJ will be in the following ReduceWork.
+ // But, this is tricky to implement, and we'll leave it as a future work for now.
+ // TODO: handle this as a MJ case
+ if (containUnionWithoutRS(parentOp.getParentOperators().get(0))) {
+ return -1;
+ }
+
long inputSize = currInputStat.getDataSize();
if ((bigInputStat == null) ||
((bigInputStat != null) &&
@@ -420,19 +444,6 @@ public class SparkMapJoinOptimizer imple
Operator<? extends OperatorDesc> parentBigTableOp =
mapJoinOp.getParentOperators().get(bigTablePosition);
if (parentBigTableOp instanceof ReduceSinkOperator) {
- for (Operator<?> p : parentBigTableOp.getParentOperators()) {
- // we might have generated a dynamic partition operator chain. Since
- // we're removing the reduce sink we need do remove that too.
- Set<Operator<?>> dynamicPartitionOperators = new HashSet<Operator<?>>();
- for (Operator<?> c : p.getChildOperators()) {
- if (hasDynamicPartitionBroadcast(c)) {
- dynamicPartitionOperators.add(c);
- }
- }
- for (Operator<?> c : dynamicPartitionOperators) {
- p.removeChild(c);
- }
- }
mapJoinOp.getParentOperators().remove(bigTablePosition);
if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
mapJoinOp.getParentOperators().add(bigTablePosition,
@@ -450,31 +461,25 @@ public class SparkMapJoinOptimizer imple
return mapJoinOp;
}
- private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
- boolean hasDynamicPartitionPruning = false;
-
- for (Operator<?> op: parent.getChildOperators()) {
- while (op != null) {
- if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
- // found dynamic partition pruning operator
- hasDynamicPartitionPruning = true;
+ private boolean containUnionWithoutRS(Operator<? extends OperatorDesc> op) {
+ boolean result = false;
+ if (op instanceof UnionOperator) {
+ for (Operator<? extends OperatorDesc> pop : op.getParentOperators()) {
+ if (!(pop instanceof ReduceSinkOperator)) {
+ result = true;
break;
}
-
- if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
- // crossing reduce sink or file sink means the pruning isn't for this parent.
- break;
- }
-
- if (op.getChildOperators().size() != 1) {
- // dynamic partition pruning pipeline doesn't have multiple children
+ }
+ } else if (op instanceof ReduceSinkOperator) {
+ result = false;
+ } else {
+ for (Operator<? extends OperatorDesc> pop : op.getParentOperators()) {
+ if (containUnionWithoutRS(pop)) {
+ result = true;
break;
}
-
- op = op.getChildOperators().get(0);
}
}
-
- return hasDynamicPartitionPruning;
+ return result;
}
}