You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dj...@apache.org on 2018/08/01 21:07:37 UTC
hive git commit: HIVE-20252 : Semijoin Reduction : Cycles due to semi
join branch may remain undetected if small table side has a map join
upstream.(Deepak Jaiswal, reviewed by Jesus Camacho Rodriguez)
Repository: hive
Updated Branches:
refs/heads/master 163e0de98 -> 2cabb8da1
HIVE-20252 : Semijoin Reduction : Cycles due to semi join branch may remain undetected if small table side has a map join upstream.(Deepak Jaiswal, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2cabb8da
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2cabb8da
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2cabb8da
Branch: refs/heads/master
Commit: 2cabb8da150b8fb980223fbd6c2c93b842ca3ee5
Parents: 163e0de
Author: Deepak Jaiswal <dj...@apache.org>
Authored: Wed Aug 1 14:07:27 2018 -0700
Committer: Deepak Jaiswal <dj...@apache.org>
Committed: Wed Aug 1 14:07:27 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/OperatorUtils.java | 60 ++++++
.../hadoop/hive/ql/parse/ParseContext.java | 11 +
.../hadoop/hive/ql/parse/TezCompiler.java | 210 +++++++------------
3 files changed, 146 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2cabb8da/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index 7b2ae40..456786c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.NodeUtils.Function;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -441,4 +442,63 @@ public class OperatorUtils {
}
return null;
}
+
+ public static Set<Operator<?>>
+ findWorkOperatorsAndSemiJoinEdges(Operator<?> start,
+ final Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo,
+ Set<ReduceSinkOperator> semiJoinOps, Set<TerminalOperator<?>> terminalOps) {
+ Set<Operator<?>> found = new HashSet<>();
+ findWorkOperatorsAndSemiJoinEdges(start,
+ found, rsToSemiJoinBranchInfo, semiJoinOps, terminalOps);
+ return found;
+ }
+
+ private static void
+ findWorkOperatorsAndSemiJoinEdges(Operator<?> start, Set<Operator<?>> found,
+ final Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo,
+ Set<ReduceSinkOperator> semiJoinOps, Set<TerminalOperator<?>> terminalOps) {
+ found.add(start);
+
+ if (start.getParentOperators() != null) {
+ for (Operator<?> parent : start.getParentOperators()) {
+ if (parent instanceof ReduceSinkOperator) {
+ continue;
+ }
+ if (!found.contains(parent)) {
+ findWorkOperatorsAndSemiJoinEdges(parent, found, rsToSemiJoinBranchInfo, semiJoinOps, terminalOps);
+ }
+ }
+ }
+ if (start instanceof TerminalOperator) {
+ // This could be RS1 in semijoin edge which looks like,
+ // SEL->GBY1->RS1->GBY2->RS2
+ boolean semiJoin = false;
+ if (start.getChildOperators().size() == 1) {
+ Operator<?> gb2 = start.getChildOperators().get(0);
+ if (gb2 instanceof GroupByOperator && gb2.getChildOperators().size() == 1) {
+ Operator<?> rs2 = gb2.getChildOperators().get(0);
+ if (rs2 instanceof ReduceSinkOperator && (rsToSemiJoinBranchInfo.get(rs2) != null)) {
+ // Semijoin edge found. Add all the operators to the set
+ found.add(start);
+ found.add(gb2);
+ found.add(rs2);
+ semiJoinOps.add((ReduceSinkOperator)rs2);
+ semiJoin = true;
+ }
+ }
+ }
+ if (!semiJoin) {
+ terminalOps.add((TerminalOperator)start);
+ }
+ return;
+ }
+ if (start.getChildOperators() != null) {
+ for (Operator<?> child : start.getChildOperators()) {
+ if (!found.contains(child)) {
+ findWorkOperatorsAndSemiJoinEdges(child, found, rsToSemiJoinBranchInfo, semiJoinOps, terminalOps);
+ }
+ }
+ }
+ return;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2cabb8da/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 538aa5e..7b30b59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.parse;
+import com.google.common.collect.Multimap;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryProperties;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -137,6 +139,7 @@ public class ParseContext {
private Map<String, List<SemiJoinHint>> semiJoinHints;
private boolean disableMapJoin;
+ private Multimap<TerminalOperator<?>, ReduceSinkOperator> terminalOpToRSMap;
public ParseContext() {
}
@@ -714,4 +717,12 @@ public class ParseContext {
public boolean getDisableMapJoin() {
return disableMapJoin;
}
+
+ public void setTerminalOpToRSMap(Multimap<TerminalOperator<?>, ReduceSinkOperator> terminalOpToRSMap) {
+ this.terminalOpToRSMap = terminalOpToRSMap;
+ }
+
+ public Multimap<TerminalOperator<?>, ReduceSinkOperator> getTerminalOpToRSMap() {
+ return terminalOpToRSMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2cabb8da/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index c3eb886..f316f09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -31,6 +31,8 @@ import java.util.Set;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -202,6 +205,8 @@ public class TezCompiler extends TaskCompiler {
private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx,
Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+ // Semijoins may have created task level cycles, examine those
+ connectTerminalOps(procCtx.parseContext);
boolean cycleFree = false;
while (!cycleFree) {
cycleFree = true;
@@ -317,7 +322,6 @@ public class TezCompiler extends TaskCompiler {
+ ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
+ ". Needed to break cyclic dependency");
}
- return;
}
// Tarjan's algo
@@ -351,20 +355,25 @@ public class TezCompiler extends TaskCompiler {
List<Operator<?>> children;
if (o instanceof AppMasterEventOperator) {
- children = new ArrayList<Operator<?>>();
- children.addAll(o.getChildOperators());
+ children = new ArrayList<>((o.getChildOperators()));
TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan();
LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
children.add(ts);
- } else if (o instanceof ReduceSinkOperator){
- // semijoin case
- children = new ArrayList<Operator<?>>();
- children.addAll(o.getChildOperators());
- SemiJoinBranchInfo sjInfo = parseContext.getRsToSemiJoinBranchInfo().get(o);
- if (sjInfo != null ) {
- TableScanOperator ts = sjInfo.getTsOp();
- LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
- children.add(ts);
+ } else if (o instanceof TerminalOperator) {
+ children = new ArrayList<>((o.getChildOperators()));
+ for (ReduceSinkOperator rs : parseContext.getTerminalOpToRSMap().get((TerminalOperator<?>)o)) {
+ // add an edge
+ LOG.debug("Adding special edge: From terminal op to semijoin edge " + o.getName() + " --> " + rs.toString());
+ children.add(rs);
+ }
+ if (o instanceof ReduceSinkOperator) {
+ // semijoin case
+ SemiJoinBranchInfo sjInfo = parseContext.getRsToSemiJoinBranchInfo().get(o);
+ if (sjInfo != null) {
+ TableScanOperator ts = sjInfo.getTsOp();
+ LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+ children.add(ts);
+ }
}
} else {
children = o.getChildOperators();
@@ -428,7 +437,8 @@ public class TezCompiler extends TaskCompiler {
final boolean dynamicPartitionPruningEnabled =
procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING);
final boolean semiJoinReductionEnabled = dynamicPartitionPruningEnabled &&
- procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION);
+ procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) &&
+ procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0;
final boolean extendedReductionEnabled = dynamicPartitionPruningEnabled &&
procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_EXTENDED);
@@ -438,46 +448,31 @@ public class TezCompiler extends TaskCompiler {
}
perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run remove dynamic pruning by size");
- perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
if (semiJoinReductionEnabled) {
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
markSemiJoinForDPP(procCtx);
- }
- perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Mark certain semijoin edges important based ");
-
- // Removing semijoin optimization when it may not be beneficial
- perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
- if (semiJoinReductionEnabled) {
- removeSemijoinOptimizationByBenefit(procCtx);
- }
- perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove Semijoins based on cost benefits");
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Mark certain semijoin edges important based ");
- // Remove any parallel edge between semijoin and mapjoin.
- perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
- if (semiJoinReductionEnabled) {
+ // Remove any parallel edge between semijoin and mapjoin.
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
removeSemijoinsParallelToMapJoin(procCtx);
- }
- perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove any parallel edge between semijoin and mapjoin");
-
- // Remove semijoin optimization if it creates a cycle with mapside joins
- perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
- if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) {
- removeSemiJoinCyclesDueToMapsideJoins(procCtx);
- }
- perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if it creates a cycle with mapside join");
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove any parallel edge between semijoin and mapjoin");
- // Remove semijoin optimization if SMB join is created.
- perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
- if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) {
+ // Remove semijoin optimization if SMB join is created.
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
removeSemijoinOptimizationFromSMBJoins(procCtx);
- }
- perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if needed");
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if needed");
- // Remove bloomfilter if no stats generated
- perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
- if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) {
+ // Remove bloomfilter if no stats generated
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
removeSemiJoinIfNoStats(procCtx);
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove bloom filter optimizations if needed");
+
+ // Removing semijoin optimization when it may not be beneficial
+ perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+ removeSemijoinOptimizationByBenefit(procCtx);
+ perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove Semijoins based on cost benefits");
}
- perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove bloom filter optimizations if needed");
// after the stats phase we might have some cyclic dependencies that we need
// to take care of.
@@ -842,107 +837,52 @@ public class TezCompiler extends TaskCompiler {
}
}
- private static class SemiJoinCycleRemovalDueTOMapsideJoinContext implements NodeProcessorCtx {
- HashMap<Operator<?>,Operator<?>> childParentMap = new HashMap<Operator<?>,Operator<?>>();
- }
-
- private static class SemiJoinCycleRemovalDueToMapsideJoins implements NodeProcessor {
+ private static class TerminalOpsInfo {
+ public Set<TerminalOperator<?>> terminalOps;
+ public Set<ReduceSinkOperator> rsOps;
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
-
- SemiJoinCycleRemovalDueTOMapsideJoinContext ctx =
- (SemiJoinCycleRemovalDueTOMapsideJoinContext) procCtx;
- ctx.childParentMap.put((Operator<?>)stack.get(stack.size() - 2), (Operator<?>) nd);
- return null;
+ TerminalOpsInfo(Set<TerminalOperator<?>> terminalOps, Set<ReduceSinkOperator> rsOps) {
+ this.terminalOps = terminalOps;
+ this.rsOps = rsOps;
}
}
- private static void removeSemiJoinCyclesDueToMapsideJoins(
- OptimizeTezProcContext procCtx) throws SemanticException {
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(
- new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%" +
- MapJoinOperator.getOperatorName() + "%"),
- new SemiJoinCycleRemovalDueToMapsideJoins());
- opRules.put(
- new RuleRegExp("R2", MapJoinOperator.getOperatorName() + "%" +
- CommonMergeJoinOperator.getOperatorName() + "%"),
- new SemiJoinCycleRemovalDueToMapsideJoins());
- opRules.put(
- new RuleRegExp("R3", CommonMergeJoinOperator.getOperatorName() + "%" +
- MapJoinOperator.getOperatorName() + "%"),
- new SemiJoinCycleRemovalDueToMapsideJoins());
- opRules.put(
- new RuleRegExp("R4", CommonMergeJoinOperator.getOperatorName() + "%" +
- CommonMergeJoinOperator.getOperatorName() + "%"),
- new SemiJoinCycleRemovalDueToMapsideJoins());
-
- SemiJoinCycleRemovalDueTOMapsideJoinContext ctx =
- new SemiJoinCycleRemovalDueTOMapsideJoinContext();
- Dispatcher disp = new DefaultRuleDispatcher(null, opRules, ctx);
- List<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(procCtx.parseContext.getTopOps().values());
- GraphWalker ogw = new PreOrderOnceWalker(disp);
- ogw.startWalking(topNodes, null);
+ private void connectTerminalOps(ParseContext pCtx) {
+ // The map which contains the virtual edges from non-semijoin terminal ops to semjoin RSs.
+ Multimap<TerminalOperator<?>, ReduceSinkOperator> terminalOpToRSMap = ArrayListMultimap.create();
- // process the list
- ParseContext pCtx = procCtx.parseContext;
- for (Operator<?> parentJoin : ctx.childParentMap.keySet()) {
- Operator<?> childJoin = ctx.childParentMap.get(parentJoin);
+ // Map of semijoin RS to work ops to ensure no work is examined more than once.
+ Map<ReduceSinkOperator, TerminalOpsInfo> rsToTerminalOpsInfo = new HashMap<>();
- if (parentJoin.getChildOperators().size() == 1) {
- continue;
+ // Get all the terminal ops
+ for (ReduceSinkOperator rs : pCtx.getRsToSemiJoinBranchInfo().keySet()) {
+ TerminalOpsInfo terminalOpsInfo = rsToTerminalOpsInfo.get(rs);
+ if (terminalOpsInfo != null) {
+ continue; // done with this one
}
- for (Operator<?> child : parentJoin.getChildOperators()) {
- if (!(child instanceof SelectOperator)) {
- continue;
- }
-
- while(child.getChildOperators().size() > 0) {
- child = child.getChildOperators().get(0);
- }
-
- if (!(child instanceof ReduceSinkOperator)) {
- continue;
- }
-
- ReduceSinkOperator rs = ((ReduceSinkOperator) child);
- SemiJoinBranchInfo sjInfo = pCtx.getRsToSemiJoinBranchInfo().get(rs);
- if (sjInfo == null) {
- continue;
- }
-
- TableScanOperator ts = sjInfo.getTsOp();
- // This is a semijoin branch. Find if this is creating a potential
- // cycle with childJoin.
- for (Operator<?> parent : childJoin.getParentOperators()) {
- if (parent == parentJoin) {
- continue;
- }
-
- assert parent instanceof ReduceSinkOperator;
- while (parent.getParentOperators().size() > 0) {
- parent = parent.getParentOperators().get(0);
- }
-
- if (parent == ts) {
- // We have a cycle!
- if (sjInfo.getIsHint()) {
- throw new SemanticException("Removing hinted semijoin as it is creating cycles with mapside joins " + rs + " : " + ts);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Semijoin cycle due to mapjoin. Removing semijoin "
- + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts));
- }
- GenTezUtils.removeBranch(rs);
- GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts);
- }
+ Set<ReduceSinkOperator> workRSOps = new HashSet<>();
+ Set<TerminalOperator<?>> workTerminalOps = new HashSet<>();
+ // Get the SEL Op in the semijoin-branch, SEL->GBY1->RS1->GBY2->RS2
+ Operator<?> selOp = rs.getParentOperators().get(0)
+ .getParentOperators().get(0)
+ .getParentOperators().get(0)
+ .getParentOperators().get(0);
+ OperatorUtils.findWorkOperatorsAndSemiJoinEdges(selOp,
+ pCtx.getRsToSemiJoinBranchInfo(), workRSOps, workTerminalOps);
+
+ TerminalOpsInfo candidate = new TerminalOpsInfo(workTerminalOps, workRSOps);
+
+ // A work may contain multiple semijoin edges, traverse rsOps and add for each
+ for (ReduceSinkOperator rsFound : workRSOps) {
+ rsToTerminalOpsInfo.put(rsFound, candidate);
+ for (TerminalOperator<?> terminalOp : candidate.terminalOps) {
+ terminalOpToRSMap.put(terminalOp, rsFound);
}
}
}
+
+ pCtx.setTerminalOpToRSMap(terminalOpToRSMap);
}
private void removeSemiJoinIfNoStats(OptimizeTezProcContext procCtx)
@@ -1558,7 +1498,7 @@ public class TezCompiler extends TaskCompiler {
private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx)
throws SemanticException {
- List<ReduceSinkOperator> semijoinRsToRemove = new ArrayList<ReduceSinkOperator>();
+ List<ReduceSinkOperator> semijoinRsToRemove = new ArrayList<>();
Map<ReduceSinkOperator, SemiJoinBranchInfo> map = procCtx.parseContext.getRsToSemiJoinBranchInfo();
double semijoinReductionThreshold = procCtx.conf.getFloatVar(
HiveConf.ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD);