You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dj...@apache.org on 2018/08/01 21:07:37 UTC

hive git commit: HIVE-20252 : Semijoin Reduction : Cycles due to semi join branch may remain undetected if small table side has a map join upstream.(Deepak Jaiswal, reviewed by Jesus Camacho Rodriguez)

Repository: hive
Updated Branches:
  refs/heads/master 163e0de98 -> 2cabb8da1


HIVE-20252 : Semijoin Reduction : Cycles due to semi join branch may remain undetected if small table side has a map join upstream.(Deepak Jaiswal, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2cabb8da
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2cabb8da
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2cabb8da

Branch: refs/heads/master
Commit: 2cabb8da150b8fb980223fbd6c2c93b842ca3ee5
Parents: 163e0de
Author: Deepak Jaiswal <dj...@apache.org>
Authored: Wed Aug 1 14:07:27 2018 -0700
Committer: Deepak Jaiswal <dj...@apache.org>
Committed: Wed Aug 1 14:07:27 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/OperatorUtils.java      |  60 ++++++
 .../hadoop/hive/ql/parse/ParseContext.java      |  11 +
 .../hadoop/hive/ql/parse/TezCompiler.java       | 210 +++++++------------
 3 files changed, 146 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2cabb8da/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index 7b2ae40..456786c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.NodeUtils.Function;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -441,4 +442,63 @@ public class OperatorUtils {
     }
     return null;
   }
+
+  public static Set<Operator<?>>
+  findWorkOperatorsAndSemiJoinEdges(Operator<?> start,
+                                    final Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo,
+                                    Set<ReduceSinkOperator> semiJoinOps, Set<TerminalOperator<?>> terminalOps) {
+    Set<Operator<?>> found = new HashSet<>();
+    findWorkOperatorsAndSemiJoinEdges(start,
+            found, rsToSemiJoinBranchInfo, semiJoinOps, terminalOps);
+    return found;
+  }
+
+  private static void
+  findWorkOperatorsAndSemiJoinEdges(Operator<?> start, Set<Operator<?>> found,
+                                    final Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo,
+                                    Set<ReduceSinkOperator> semiJoinOps, Set<TerminalOperator<?>> terminalOps) {
+    found.add(start);
+
+    if (start.getParentOperators() != null) {
+      for (Operator<?> parent : start.getParentOperators()) {
+        if (parent instanceof ReduceSinkOperator) {
+          continue;
+        }
+        if (!found.contains(parent)) {
+          findWorkOperatorsAndSemiJoinEdges(parent, found, rsToSemiJoinBranchInfo, semiJoinOps, terminalOps);
+        }
+      }
+    }
+    if (start instanceof TerminalOperator) {
+      // This could be RS1 in semijoin edge which looks like,
+      // SEL->GBY1->RS1->GBY2->RS2
+      boolean semiJoin = false;
+      if (start.getChildOperators().size() == 1) {
+        Operator<?> gb2 = start.getChildOperators().get(0);
+        if (gb2 instanceof GroupByOperator && gb2.getChildOperators().size() == 1) {
+          Operator<?> rs2 = gb2.getChildOperators().get(0);
+          if (rs2 instanceof ReduceSinkOperator && (rsToSemiJoinBranchInfo.get(rs2) != null)) {
+            // Semijoin edge found. Add all the operators to the set
+            found.add(start);
+            found.add(gb2);
+            found.add(rs2);
+            semiJoinOps.add((ReduceSinkOperator)rs2);
+            semiJoin = true;
+          }
+        }
+      }
+      if (!semiJoin) {
+        terminalOps.add((TerminalOperator)start);
+      }
+      return;
+    }
+    if (start.getChildOperators() != null) {
+      for (Operator<?> child : start.getChildOperators()) {
+        if (!found.contains(child)) {
+          findWorkOperatorsAndSemiJoinEdges(child, found, rsToSemiJoinBranchInfo, semiJoinOps, terminalOps);
+        }
+      }
+    }
+    return;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2cabb8da/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 538aa5e..7b30b59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import com.google.common.collect.Multimap;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryProperties;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -137,6 +139,7 @@ public class ParseContext {
 
   private Map<String, List<SemiJoinHint>> semiJoinHints;
   private boolean disableMapJoin;
+  private Multimap<TerminalOperator<?>, ReduceSinkOperator> terminalOpToRSMap;
 
   public ParseContext() {
   }
@@ -714,4 +717,12 @@ public class ParseContext {
   public boolean getDisableMapJoin() {
     return disableMapJoin;
   }
+
+  public void setTerminalOpToRSMap(Multimap<TerminalOperator<?>, ReduceSinkOperator> terminalOpToRSMap) {
+    this.terminalOpToRSMap = terminalOpToRSMap;
+  }
+
+  public Multimap<TerminalOperator<?>, ReduceSinkOperator> getTerminalOpToRSMap() {
+    return terminalOpToRSMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2cabb8da/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index c3eb886..f316f09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -31,6 +31,8 @@ import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
 import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -202,6 +205,8 @@ public class TezCompiler extends TaskCompiler {
 
   private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx,
       Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+    // Semijoins may have created task level cycles, examine those
+    connectTerminalOps(procCtx.parseContext);
     boolean cycleFree = false;
     while (!cycleFree) {
       cycleFree = true;
@@ -317,7 +322,6 @@ public class TezCompiler extends TaskCompiler {
           + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
           + ". Needed to break cyclic dependency");
     }
-    return;
   }
 
   // Tarjan's algo
@@ -351,20 +355,25 @@ public class TezCompiler extends TaskCompiler {
 
     List<Operator<?>> children;
     if (o instanceof AppMasterEventOperator) {
-      children = new ArrayList<Operator<?>>();
-      children.addAll(o.getChildOperators());
+      children = new ArrayList<>((o.getChildOperators()));
       TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan();
       LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
       children.add(ts);
-    } else if (o instanceof ReduceSinkOperator){
-      // semijoin case
-      children = new ArrayList<Operator<?>>();
-      children.addAll(o.getChildOperators());
-      SemiJoinBranchInfo sjInfo = parseContext.getRsToSemiJoinBranchInfo().get(o);
-      if (sjInfo != null ) {
-        TableScanOperator ts = sjInfo.getTsOp();
-        LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
-        children.add(ts);
+    } else if (o instanceof TerminalOperator) {
+      children = new ArrayList<>((o.getChildOperators()));
+      for (ReduceSinkOperator rs : parseContext.getTerminalOpToRSMap().get((TerminalOperator<?>)o)) {
+        // add an edge
+        LOG.debug("Adding special edge: From terminal op to semijoin edge "  + o.getName() + " --> " + rs.toString());
+        children.add(rs);
+      }
+      if (o instanceof ReduceSinkOperator) {
+        // semijoin case
+        SemiJoinBranchInfo sjInfo = parseContext.getRsToSemiJoinBranchInfo().get(o);
+        if (sjInfo != null) {
+          TableScanOperator ts = sjInfo.getTsOp();
+          LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+          children.add(ts);
+        }
       }
     } else {
       children = o.getChildOperators();
@@ -428,7 +437,8 @@ public class TezCompiler extends TaskCompiler {
     final boolean dynamicPartitionPruningEnabled =
         procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING);
     final boolean semiJoinReductionEnabled = dynamicPartitionPruningEnabled &&
-        procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION);
+        procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION) &&
+            procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0;
     final boolean extendedReductionEnabled = dynamicPartitionPruningEnabled &&
         procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_EXTENDED);
 
@@ -438,46 +448,31 @@ public class TezCompiler extends TaskCompiler {
     }
     perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Run remove dynamic pruning by size");
 
-    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
     if (semiJoinReductionEnabled) {
+      perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
       markSemiJoinForDPP(procCtx);
-    }
-    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Mark certain semijoin edges important based ");
-
-    // Removing semijoin optimization when it may not be beneficial
-    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
-    if (semiJoinReductionEnabled) {
-      removeSemijoinOptimizationByBenefit(procCtx);
-    }
-    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove Semijoins based on cost benefits");
+      perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Mark certain semijoin edges important based ");
 
-    // Remove any parallel edge between semijoin and mapjoin.
-    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
-    if (semiJoinReductionEnabled) {
+      // Remove any parallel edge between semijoin and mapjoin.
+      perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
       removeSemijoinsParallelToMapJoin(procCtx);
-    }
-    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove any parallel edge between semijoin and mapjoin");
-
-    // Remove semijoin optimization if it creates a cycle with mapside joins
-    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
-    if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) {
-      removeSemiJoinCyclesDueToMapsideJoins(procCtx);
-    }
-    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if it creates a cycle with mapside join");
+      perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove any parallel edge between semijoin and mapjoin");
 
-    // Remove semijoin optimization if SMB join is created.
-    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
-    if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) {
+      // Remove semijoin optimization if SMB join is created.
+      perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
       removeSemijoinOptimizationFromSMBJoins(procCtx);
-    }
-    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if needed");
+      perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove semijoin optimizations if needed");
 
-    // Remove bloomfilter if no stats generated
-    perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
-    if (semiJoinReductionEnabled && procCtx.parseContext.getRsToSemiJoinBranchInfo().size() != 0) {
+      // Remove bloomfilter if no stats generated
+      perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
       removeSemiJoinIfNoStats(procCtx);
+      perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove bloom filter optimizations if needed");
+
+      // Removing semijoin optimization when it may not be beneficial
+      perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.TEZ_COMPILER);
+      removeSemijoinOptimizationByBenefit(procCtx);
+      perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove Semijoins based on cost benefits");
     }
-    perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.TEZ_COMPILER, "Remove bloom filter optimizations if needed");
 
     // after the stats phase we might have some cyclic dependencies that we need
     // to take care of.
@@ -842,107 +837,52 @@ public class TezCompiler extends TaskCompiler {
     }
   }
 
-  private static class SemiJoinCycleRemovalDueTOMapsideJoinContext implements NodeProcessorCtx {
-    HashMap<Operator<?>,Operator<?>> childParentMap = new HashMap<Operator<?>,Operator<?>>();
-  }
-
-  private static class SemiJoinCycleRemovalDueToMapsideJoins implements NodeProcessor {
+  private static class TerminalOpsInfo {
+    public Set<TerminalOperator<?>> terminalOps;
+    public Set<ReduceSinkOperator> rsOps;
 
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-                          Object... nodeOutputs) throws SemanticException {
-
-      SemiJoinCycleRemovalDueTOMapsideJoinContext ctx =
-              (SemiJoinCycleRemovalDueTOMapsideJoinContext) procCtx;
-      ctx.childParentMap.put((Operator<?>)stack.get(stack.size() - 2), (Operator<?>) nd);
-      return null;
+    TerminalOpsInfo(Set<TerminalOperator<?>> terminalOps, Set<ReduceSinkOperator> rsOps) {
+      this.terminalOps = terminalOps;
+      this.rsOps = rsOps;
     }
   }
 
-  private static void removeSemiJoinCyclesDueToMapsideJoins(
-          OptimizeTezProcContext procCtx) throws SemanticException {
-    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(
-            new RuleRegExp("R1", MapJoinOperator.getOperatorName() + "%" +
-                    MapJoinOperator.getOperatorName() + "%"),
-            new SemiJoinCycleRemovalDueToMapsideJoins());
-    opRules.put(
-            new RuleRegExp("R2", MapJoinOperator.getOperatorName() + "%" +
-                    CommonMergeJoinOperator.getOperatorName() + "%"),
-            new SemiJoinCycleRemovalDueToMapsideJoins());
-    opRules.put(
-            new RuleRegExp("R3", CommonMergeJoinOperator.getOperatorName() + "%" +
-                    MapJoinOperator.getOperatorName() + "%"),
-            new SemiJoinCycleRemovalDueToMapsideJoins());
-    opRules.put(
-            new RuleRegExp("R4", CommonMergeJoinOperator.getOperatorName() + "%" +
-                    CommonMergeJoinOperator.getOperatorName() + "%"),
-            new SemiJoinCycleRemovalDueToMapsideJoins());
-
-    SemiJoinCycleRemovalDueTOMapsideJoinContext ctx =
-            new SemiJoinCycleRemovalDueTOMapsideJoinContext();
-    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, ctx);
-    List<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(procCtx.parseContext.getTopOps().values());
-    GraphWalker ogw = new PreOrderOnceWalker(disp);
-    ogw.startWalking(topNodes, null);
+  private void connectTerminalOps(ParseContext pCtx) {
+    // The map which contains the virtual edges from non-semijoin terminal ops to semjoin RSs.
+    Multimap<TerminalOperator<?>, ReduceSinkOperator> terminalOpToRSMap = ArrayListMultimap.create();
 
-    // process the list
-    ParseContext pCtx = procCtx.parseContext;
-    for (Operator<?> parentJoin : ctx.childParentMap.keySet()) {
-      Operator<?> childJoin = ctx.childParentMap.get(parentJoin);
+    // Map of semijoin RS to work ops to ensure no work is examined more than once.
+    Map<ReduceSinkOperator, TerminalOpsInfo> rsToTerminalOpsInfo = new HashMap<>();
 
-      if (parentJoin.getChildOperators().size() == 1) {
-        continue;
+    // Get all the terminal ops
+    for (ReduceSinkOperator rs : pCtx.getRsToSemiJoinBranchInfo().keySet()) {
+      TerminalOpsInfo terminalOpsInfo = rsToTerminalOpsInfo.get(rs);
+      if (terminalOpsInfo != null) {
+        continue; // done with this one
       }
 
-      for (Operator<?> child : parentJoin.getChildOperators()) {
-        if (!(child instanceof SelectOperator)) {
-          continue;
-        }
-
-        while(child.getChildOperators().size() > 0) {
-          child = child.getChildOperators().get(0);
-        }
-
-        if (!(child instanceof ReduceSinkOperator)) {
-          continue;
-        }
-
-        ReduceSinkOperator rs = ((ReduceSinkOperator) child);
-        SemiJoinBranchInfo sjInfo = pCtx.getRsToSemiJoinBranchInfo().get(rs);
-        if (sjInfo == null) {
-          continue;
-        }
-
-        TableScanOperator ts = sjInfo.getTsOp();
-        // This is a semijoin branch. Find if this is creating a potential
-        // cycle with childJoin.
-        for (Operator<?> parent : childJoin.getParentOperators()) {
-          if (parent == parentJoin) {
-            continue;
-          }
-
-          assert parent instanceof ReduceSinkOperator;
-          while (parent.getParentOperators().size() > 0) {
-            parent = parent.getParentOperators().get(0);
-          }
-
-          if (parent == ts) {
-            // We have a cycle!
-            if (sjInfo.getIsHint()) {
-              throw new SemanticException("Removing hinted semijoin as it is creating cycles with mapside joins " + rs + " : " + ts);
-            }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Semijoin cycle due to mapjoin. Removing semijoin "
-                  + OperatorUtils.getOpNamePretty(rs) + " - " + OperatorUtils.getOpNamePretty(ts));
-            }
-            GenTezUtils.removeBranch(rs);
-            GenTezUtils.removeSemiJoinOperator(pCtx, rs, ts);
-          }
+      Set<ReduceSinkOperator> workRSOps = new HashSet<>();
+      Set<TerminalOperator<?>> workTerminalOps = new HashSet<>();
+      // Get the SEL Op in the semijoin-branch, SEL->GBY1->RS1->GBY2->RS2
+      Operator<?> selOp = rs.getParentOperators().get(0)
+              .getParentOperators().get(0)
+              .getParentOperators().get(0)
+              .getParentOperators().get(0);
+      OperatorUtils.findWorkOperatorsAndSemiJoinEdges(selOp,
+              pCtx.getRsToSemiJoinBranchInfo(), workRSOps, workTerminalOps);
+
+      TerminalOpsInfo candidate = new TerminalOpsInfo(workTerminalOps, workRSOps);
+
+      // A work may contain multiple semijoin edges, traverse rsOps and add for each
+      for (ReduceSinkOperator rsFound : workRSOps) {
+        rsToTerminalOpsInfo.put(rsFound, candidate);
+        for (TerminalOperator<?> terminalOp : candidate.terminalOps) {
+          terminalOpToRSMap.put(terminalOp, rsFound);
         }
       }
     }
+
+    pCtx.setTerminalOpToRSMap(terminalOpToRSMap);
   }
 
   private void removeSemiJoinIfNoStats(OptimizeTezProcContext procCtx)
@@ -1558,7 +1498,7 @@ public class TezCompiler extends TaskCompiler {
 
   private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx)
       throws SemanticException {
-    List<ReduceSinkOperator> semijoinRsToRemove = new ArrayList<ReduceSinkOperator>();
+    List<ReduceSinkOperator> semijoinRsToRemove = new ArrayList<>();
     Map<ReduceSinkOperator, SemiJoinBranchInfo> map = procCtx.parseContext.getRsToSemiJoinBranchInfo();
     double semijoinReductionThreshold = procCtx.conf.getFloatVar(
         HiveConf.ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD);