You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/02 02:03:37 UTC
[38/50] [abbrv] hive git commit: HIVE-15760: TezCompiler throws
ConcurrentModificationException during cycle detection (Deepak Jaiswal via
Jason Dere)
HIVE-15760: TezCompiler throws ConcurrentModificationException during cycle detection (Deepak Jaiswal via Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d178b195
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d178b195
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d178b195
Branch: refs/heads/hive-14535
Commit: d178b195e7c9707b16c345cafe43c3ffc433e983
Parents: d9cb93a
Author: Jason Dere <jd...@hortonworks.com>
Authored: Wed Feb 1 14:14:08 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Wed Feb 1 14:14:08 2017 -0800
----------------------------------------------------------------------
.../hadoop/hive/ql/parse/TezCompiler.java | 146 +++++++++----------
1 file changed, 67 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d178b195/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index cf8e843..47b229f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -571,53 +571,52 @@ public class TezCompiler extends TaskCompiler {
return;
}
- private static class SemijoinRemovalContext implements NodeProcessorCtx {
- List<Operator<?>> parents = new ArrayList<Operator<?>>();
+ private static class SMBJoinOpProcContext implements NodeProcessorCtx {
+ HashMap<CommonMergeJoinOperator, TableScanOperator> JoinOpToTsOpMap = new HashMap<CommonMergeJoinOperator, TableScanOperator>();
}
- private static class SemijoinRemovalProc implements NodeProcessor {
+ private static class SMBJoinOpProc implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- SemijoinRemovalContext ctx = (SemijoinRemovalContext) procCtx;
- Operator<?> parent = (Operator<?>) stack.get(stack.size() - 2);
- ctx.parents.add(parent);
+ SMBJoinOpProcContext ctx = (SMBJoinOpProcContext) procCtx;
+ ctx.JoinOpToTsOpMap.put((CommonMergeJoinOperator) nd,
+ (TableScanOperator) stack.get(0));
return null;
}
}
- private static void collectSemijoinOps(Operator<?> ts, NodeProcessorCtx ctx) throws SemanticException {
- // create a walker which walks the tree in a DFS manner while maintaining
- // the operator stack. The dispatcher
- // generates the plan from the operator tree
+ private static void removeSemijoinOptimizationFromSMBJoins(
+ OptimizeTezProcContext procCtx) throws SemanticException {
+ if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) ||
+ procCtx.parseContext.getRsOpToTsOpMap().size() == 0) {
+ return;
+ }
+
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", SelectOperator.getOperatorName() + "%" +
- TezDummyStoreOperator.getOperatorName() + "%"),
- new SemijoinRemovalProc());
- opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + "%" +
+ opRules.put(
+ new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%" +
+ ".*" + TezDummyStoreOperator.getOperatorName() + "%" +
CommonMergeJoinOperator.getOperatorName() + "%"),
- new SemijoinRemovalProc());
+ new SMBJoinOpProc());
+
+ SMBJoinOpProcContext ctx = new SMBJoinOpProcContext();
+ // The dispatcher finds SMB and if there is semijoin optimization before it, removes it.
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, ctx);
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
GraphWalker ogw = new PreOrderOnceWalker(disp);
- List<Node> startNodes = new ArrayList<Node>();
- startNodes.add(ts);
-
- HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
- ogw.startWalking(startNodes, null);
- }
-
- private static class SMBJoinOpProc implements NodeProcessor {
+ ogw.startWalking(topNodes, null);
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
+ // Iterate over the map and remove semijoin optimizations if needed.
+ for (CommonMergeJoinOperator joinOp : ctx.JoinOpToTsOpMap.keySet()) {
List<TableScanOperator> tsOps = new ArrayList<TableScanOperator>();
// Get one top level TS Op directly from the stack
- tsOps.add((TableScanOperator)stack.get(0));
+ tsOps.add(ctx.JoinOpToTsOpMap.get(joinOp));
// Get the other one by examining Join Op
- List<Operator<?>> parents = ((CommonMergeJoinOperator) nd).getParentOperators();
+ List<Operator<?>> parents = joinOp.getParentOperators();
for (Operator<?> parent : parents) {
if (parent instanceof TezDummyStoreOperator) {
// already accounted for
@@ -636,7 +635,7 @@ public class TezCompiler extends TaskCompiler {
// Now the relevant TableScanOperators are known, find if there exists
// a semijoin filter on any of them, if so, remove it.
- ParseContext pctx = ((OptimizeTezProcContext) procCtx).parseContext;
+ ParseContext pctx = procCtx.parseContext;
for (TableScanOperator ts : tsOps) {
for (ReduceSinkOperator rs : pctx.getRsOpToTsOpMap().keySet()) {
if (ts == pctx.getRsOpToTsOpMap().get(rs)) {
@@ -646,11 +645,27 @@ public class TezCompiler extends TaskCompiler {
}
}
}
+ }
+ }
+
+ private static class SemiJoinCycleRemovalDueTOMapsideJoinContext implements NodeProcessorCtx {
+ HashMap<Operator<?>,Operator<?>> childParentMap = new HashMap<Operator<?>,Operator<?>>();
+ }
+
+ private static class SemiJoinCycleRemovalDueToMapsideJoins implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ SemiJoinCycleRemovalDueTOMapsideJoinContext ctx =
+ (SemiJoinCycleRemovalDueTOMapsideJoinContext) procCtx;
+ ctx.childParentMap.put((Operator<?>)stack.get(stack.size() - 2), (Operator<?>) nd);
return null;
}
}
- private static void removeSemijoinOptimizationFromSMBJoins(
+ private static void removeSemiJoinCyclesDueToMapsideJoins(
OptimizeTezProcContext procCtx) throws SemanticException {
if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) ||
procCtx.parseContext.getRsOpToTsOpMap().size() == 0) {
@@ -659,31 +674,37 @@ public class TezCompiler extends TaskCompiler {
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(
- new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%" +
- ".*" + TezDummyStoreOperator.getOperatorName() + "%" +
+ new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%" +
+ MapJoinOperator.getOperatorName() + "%"),
+ new SemiJoinCycleRemovalDueToMapsideJoins());
+ opRules.put(
+ new RuleRegExp("R2", MapJoinOperator.getOperatorName() + "%" +
CommonMergeJoinOperator.getOperatorName() + "%"),
- new SMBJoinOpProc());
+ new SemiJoinCycleRemovalDueToMapsideJoins());
+ opRules.put(
+ new RuleRegExp("R3", CommonMergeJoinOperator.getOperatorName() + "%" +
+ MapJoinOperator.getOperatorName() + "%"),
+ new SemiJoinCycleRemovalDueToMapsideJoins());
+ opRules.put(
+ new RuleRegExp("R4", CommonMergeJoinOperator.getOperatorName() + "%" +
+ CommonMergeJoinOperator.getOperatorName() + "%"),
+ new SemiJoinCycleRemovalDueToMapsideJoins());
- // The dispatcher finds SMB and if there is semijoin optimization before it, removes it.
- Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ SemiJoinCycleRemovalDueTOMapsideJoinContext ctx =
+ new SemiJoinCycleRemovalDueTOMapsideJoinContext();
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, ctx);
List<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(procCtx.parseContext.getTopOps().values());
GraphWalker ogw = new PreOrderOnceWalker(disp);
ogw.startWalking(topNodes, null);
- }
- private static class SemiJoinCycleRemovalDueToMapsideJoins implements NodeProcessor {
-
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- ParseContext pCtx = ((OptimizeTezProcContext) procCtx).parseContext;
- Operator<?> childJoin = ((Operator<?>) nd);
- Operator<?> parentJoin = ((Operator<?>) stack.get(stack.size() - 2));
+ // process the list
+ ParseContext pCtx = procCtx.parseContext;
+ for (Operator<?> parentJoin : ctx.childParentMap.keySet()) {
+ Operator<?> childJoin = ctx.childParentMap.get(parentJoin);
if (parentJoin.getChildOperators().size() == 1) {
- // Nothing to do here
- return null;
+ continue;
}
for (Operator<?> child : parentJoin.getChildOperators()) {
@@ -723,40 +744,7 @@ public class TezCompiler extends TaskCompiler {
}
}
}
- return null;
- }
- }
-
- private static void removeSemiJoinCyclesDueToMapsideJoins(
- OptimizeTezProcContext procCtx) throws SemanticException {
- if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) ||
- procCtx.parseContext.getRsOpToTsOpMap().size() == 0) {
- return;
}
-
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(
- new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%" +
- MapJoinOperator.getOperatorName() + "%"),
- new SemiJoinCycleRemovalDueToMapsideJoins());
- opRules.put(
- new RuleRegExp("R2", MapJoinOperator.getOperatorName() + "%" +
- CommonMergeJoinOperator.getOperatorName() + "%"),
- new SemiJoinCycleRemovalDueToMapsideJoins());
- opRules.put(
- new RuleRegExp("R3", CommonMergeJoinOperator.getOperatorName() + "%" +
- MapJoinOperator.getOperatorName() + "%"),
- new SemiJoinCycleRemovalDueToMapsideJoins());
- opRules.put(
- new RuleRegExp("R4", CommonMergeJoinOperator.getOperatorName() + "%" +
- CommonMergeJoinOperator.getOperatorName() + "%"),
- new SemiJoinCycleRemovalDueToMapsideJoins());
-
- Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
- List<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(procCtx.parseContext.getTopOps().values());
- GraphWalker ogw = new PreOrderOnceWalker(disp);
- ogw.startWalking(topNodes, null);
}
private static class SemiJoinRemovalIfNoStatsProc implements NodeProcessor {