You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/08/04 04:26:08 UTC

[GitHub] [hive] jcamachor commented on a change in pull request #1325: HIVE-21196 HIVE-23940 Multi-column semijoin reducers and TPC-H datasets

jcamachor commented on a change in pull request #1325:
URL: https://github.com/apache/hive/pull/1325#discussion_r464764059



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
##########
@@ -53,6 +53,34 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(OperatorUtils.class);
 
+  /**
+   * Return the ancestor of the specified operator at the provided path or null if the path is invalid.
+   *
+   * The method is equivalent to following code:
+   * <pre>{@code
+   *     op.getParentOperators().get(path[0])
+   *     .getParentOperators().get(path[1])
+   *     ...
+   *     .getParentOperators().get(path[n])
+   * }</pre>
+   * with additional checks about the validity of the provided path and the type of the ancestor.
+   *
+   * @param op the operator for which we
+   * @param clazz the class of the ancestor operator
+   * @param path the path leading to the desired ancestor
+   * @param <T> the type of the ancestor
+   * @return the ancestor of the specified operator at the provided path or null if the path is invalid.
+   */
+  public static <T> T ancestor(Operator<?> op, Class<T> clazz, int... path) {
+    Operator<?> target = op;
+    for (int i = 0; i < path.length; i++) {
+      if (target.getParentOperators() == null || path[i] > target.getParentOperators().size())

Review comment:
       nit. We use `{` `}` even for single line statements.
   
   Please, check below in other code changes in this PR too since I have seen the same.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
##########
@@ -53,6 +53,34 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(OperatorUtils.class);
 
+  /**
+   * Return the ancestor of the specified operator at the provided path or null if the path is invalid.
+   *
+   * The method is equivalent to following code:
+   * <pre>{@code
+   *     op.getParentOperators().get(path[0])

Review comment:
       Neat! Interesting method... Reminds me of our good old times with XPath 😄 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {

Review comment:
       Please add comment with general explanation of purpose to the class.
   
   It would also be useful to add comments to the methods below.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));
+
+      List<SelectOperator> selOps = new ArrayList<>(sjBrances.size());
+      for (ReduceSinkOperator rs : sjBrances) {
+        selOps.add(OperatorUtils.ancestor(rs, SelectOperator.class, 0, 0, 0, 0));
+      }
+      SelectOperator selectOp = mergeSelectOps(sjMergeCandidate.getKey().source, selOps);
+
+      GroupByOperator gbPartialOp = createGroupBy(selectOp, selectOp, GroupByDesc.Mode.HASH, hiveConf);
+
+      ReduceSinkOperator rsPartialOp = createReduceSink(gbPartialOp, NullOrdering.defaultNullOrder(hiveConf));
+      rsPartialOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
+      GroupByOperator gbCompleteOp = createGroupBy(selectOp, rsPartialOp, GroupByDesc.Mode.FINAL, hiveConf);
+
+      ReduceSinkOperator rsCompleteOp = createReduceSink(gbCompleteOp, NullOrdering.defaultNullOrder(hiveConf));
+
+      final TableScanOperator sjTargetTable = sjMergeCandidate.getKey().target;
+      SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(sjTargetTable, false);
+      parseContext.getRsToSemiJoinBranchInfo().put(rsCompleteOp, sjInfo);
+
+      // Save the info that is required at query time to resolve dynamic/runtime values.
+      RuntimeValuesInfo valuesInfo = createRuntimeValuesInfo(rsCompleteOp, sjBrances, parseContext);
+      parseContext.getRsToRuntimeValuesInfoMap().put(rsCompleteOp, valuesInfo);
+
+      ExprNodeGenericFuncDesc sjPredicate = createSemiJoinPredicate(sjBrances, valuesInfo, parseContext);
+
+      // Update filter operators with the new semi-join predicate
+      for (Operator<?> op : sjTargetTable.getChildOperators()) {
+        if (op instanceof FilterOperator) {
+          FilterDesc filter = ((FilterOperator) op).getConf();
+          filter.setPredicate(and(Arrays.asList(filter.getPredicate(), sjPredicate)));
+        }
+      }
+      // Update tableScan with the new semi-join predicate
+      sjTargetTable.getConf().setFilterExpr(and(Arrays.asList(sjTargetTable.getConf().getFilterExpr(), sjPredicate)));
+
+      for (ReduceSinkOperator rs : sjBrances) {
+        GenTezUtils.removeSemiJoinOperator(parseContext, rs, sjTargetTable);
+        GenTezUtils.removeBranch(rs);
+      }
+
+      // TODO How to associate multi-cols with gb ?
+      // parseContext.getColExprToGBMap().put(key, gb);
+    }
+    return parseContext;
+  }
+
+  private static ExprNodeGenericFuncDesc createSemiJoinPredicate(List<ReduceSinkOperator> sjBranches,
+      RuntimeValuesInfo sjValueInfo, ParseContext context) {
+    // Performance note: To speed-up evaluation 'BETWEEN' predicates should come before the 'IN_BLOOM_FILTER'
+    Deque<String> dynamicIds = new ArrayDeque<>(sjValueInfo.getDynamicValueIDs());
+    List<ExprNodeDesc> sjPredicates = new ArrayList<>();
+    List<ExprNodeDesc> hashArgs = new ArrayList<>();
+    for (ReduceSinkOperator rs : sjBranches) {
+      RuntimeValuesInfo info = context.getRsToRuntimeValuesInfoMap().get(rs);
+      assert info.getTargetColumns().size() == 1;
+      final ExprNodeDesc targetColumn = info.getTargetColumns().get(0);
+      TypeInfo typeInfo = targetColumn.getTypeInfo();
+      DynamicValue minDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+      DynamicValue maxDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+
+      List<ExprNodeDesc> betweenArgs = Arrays.asList(
+          // Use false to not invert between result
+          new ExprNodeConstantDesc(Boolean.FALSE),
+          targetColumn,
+          new ExprNodeDynamicValueDesc(minDynamic),
+          new ExprNodeDynamicValueDesc(maxDynamic));
+      ExprNodeDesc betweenExp =
+          new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFBetween(), "between", betweenArgs);
+      sjPredicates.add(betweenExp);
+      hashArgs.add(targetColumn);
+    }
+
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", hashArgs);
+
+    assert dynamicIds.size() == 1 : "There should be one column left untreated the one with the bloom filter";
+    DynamicValue bloomDynamic = new DynamicValue(dynamicIds.poll(), TypeInfoFactory.binaryTypeInfo);
+    sjPredicates.add(
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFInBloomFilter(), "in_bloom_filter",
+            Arrays.asList(hashExp, new ExprNodeDynamicValueDesc(bloomDynamic))));
+    return and(sjPredicates);
+  }
+
+  private static RuntimeValuesInfo createRuntimeValuesInfo(ReduceSinkOperator rs, List<ReduceSinkOperator> sjBranches,
+      ParseContext parseContext) {
+    List<ExprNodeDesc> valueCols = rs.getConf().getValueCols();
+    RuntimeValuesInfo info = new RuntimeValuesInfo();
+    TableDesc rsFinalTableDesc =
+        PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "_col"));
+    List<String> dynamicValueIDs = new ArrayList<>();
+    for (ExprNodeDesc rsCol : valueCols) {
+      dynamicValueIDs.add(rs.toString() + rsCol.getExprString());
+    }
+
+    info.setTableDesc(rsFinalTableDesc);
+    info.setDynamicValueIDs(dynamicValueIDs);
+    info.setColExprs(valueCols);
+    List<ExprNodeDesc> targetTableExpressions = new ArrayList<>();
+    for (ReduceSinkOperator sjBranch : sjBranches) {
+      RuntimeValuesInfo sjInfo = parseContext.getRsToRuntimeValuesInfoMap().get(sjBranch);
+      assert sjInfo.getTargetColumns().size() == 1;
+      targetTableExpressions.add(sjInfo.getTargetColumns().get(0));
+    }
+    info.setTargetColumns(targetTableExpressions);
+    return info;
+  }
+
+  private static SelectOperator mergeSelectOps(Operator<?> parent, List<SelectOperator> selectOperators) {
+    List<String> colNames = new ArrayList<>();
+    List<ExprNodeDesc> colDescs = new ArrayList<>();
+    List<ColumnInfo> columnInfos = new ArrayList<>();
+    Map<String, ExprNodeDesc> selectColumnExprMap = new HashMap<>();
+    for (SelectOperator sel : selectOperators) {
+      for (ExprNodeDesc col : sel.getConf().getColList()) {
+        String colName = HiveConf.getColumnInternalName(colDescs.size());
+        colNames.add(colName);
+        columnInfos.add(new ColumnInfo(colName, col.getTypeInfo(), "", false));
+        colDescs.add(col);
+        selectColumnExprMap.put(colName, col);
+      }
+    }
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", colDescs);
+    String hashName = HiveConf.getColumnInternalName(colDescs.size() + 1);
+    colNames.add(hashName);
+    columnInfos.add(new ColumnInfo(hashName, hashExp.getTypeInfo(), "", false));
+
+    List<ExprNodeDesc> selDescs = new ArrayList<>(colDescs);

Review comment:
       Add comment mentioning you need these for min/max

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));
+
+      List<SelectOperator> selOps = new ArrayList<>(sjBrances.size());
+      for (ReduceSinkOperator rs : sjBrances) {
+        selOps.add(OperatorUtils.ancestor(rs, SelectOperator.class, 0, 0, 0, 0));
+      }
+      SelectOperator selectOp = mergeSelectOps(sjMergeCandidate.getKey().source, selOps);
+
+      GroupByOperator gbPartialOp = createGroupBy(selectOp, selectOp, GroupByDesc.Mode.HASH, hiveConf);
+
+      ReduceSinkOperator rsPartialOp = createReduceSink(gbPartialOp, NullOrdering.defaultNullOrder(hiveConf));
+      rsPartialOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
+      GroupByOperator gbCompleteOp = createGroupBy(selectOp, rsPartialOp, GroupByDesc.Mode.FINAL, hiveConf);
+
+      ReduceSinkOperator rsCompleteOp = createReduceSink(gbCompleteOp, NullOrdering.defaultNullOrder(hiveConf));
+
+      final TableScanOperator sjTargetTable = sjMergeCandidate.getKey().target;
+      SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(sjTargetTable, false);
+      parseContext.getRsToSemiJoinBranchInfo().put(rsCompleteOp, sjInfo);
+
+      // Save the info that is required at query time to resolve dynamic/runtime values.
+      RuntimeValuesInfo valuesInfo = createRuntimeValuesInfo(rsCompleteOp, sjBrances, parseContext);
+      parseContext.getRsToRuntimeValuesInfoMap().put(rsCompleteOp, valuesInfo);
+
+      ExprNodeGenericFuncDesc sjPredicate = createSemiJoinPredicate(sjBrances, valuesInfo, parseContext);
+
+      // Update filter operators with the new semi-join predicate
+      for (Operator<?> op : sjTargetTable.getChildOperators()) {
+        if (op instanceof FilterOperator) {
+          FilterDesc filter = ((FilterOperator) op).getConf();
+          filter.setPredicate(and(Arrays.asList(filter.getPredicate(), sjPredicate)));
+        }
+      }
+      // Update tableScan with the new semi-join predicate
+      sjTargetTable.getConf().setFilterExpr(and(Arrays.asList(sjTargetTable.getConf().getFilterExpr(), sjPredicate)));
+
+      for (ReduceSinkOperator rs : sjBrances) {
+        GenTezUtils.removeSemiJoinOperator(parseContext, rs, sjTargetTable);
+        GenTezUtils.removeBranch(rs);
+      }
+
+      // TODO How to associate multi-cols with gb ?
+      // parseContext.getColExprToGBMap().put(key, gb);
+    }
+    return parseContext;
+  }
+
+  private static ExprNodeGenericFuncDesc createSemiJoinPredicate(List<ReduceSinkOperator> sjBranches,
+      RuntimeValuesInfo sjValueInfo, ParseContext context) {
+    // Performance note: To speed-up evaluation 'BETWEEN' predicates should come before the 'IN_BLOOM_FILTER'
+    Deque<String> dynamicIds = new ArrayDeque<>(sjValueInfo.getDynamicValueIDs());
+    List<ExprNodeDesc> sjPredicates = new ArrayList<>();
+    List<ExprNodeDesc> hashArgs = new ArrayList<>();
+    for (ReduceSinkOperator rs : sjBranches) {
+      RuntimeValuesInfo info = context.getRsToRuntimeValuesInfoMap().get(rs);
+      assert info.getTargetColumns().size() == 1;
+      final ExprNodeDesc targetColumn = info.getTargetColumns().get(0);
+      TypeInfo typeInfo = targetColumn.getTypeInfo();
+      DynamicValue minDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+      DynamicValue maxDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+
+      List<ExprNodeDesc> betweenArgs = Arrays.asList(
+          // Use false to not invert between result
+          new ExprNodeConstantDesc(Boolean.FALSE),
+          targetColumn,
+          new ExprNodeDynamicValueDesc(minDynamic),
+          new ExprNodeDynamicValueDesc(maxDynamic));
+      ExprNodeDesc betweenExp =
+          new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFBetween(), "between", betweenArgs);
+      sjPredicates.add(betweenExp);
+      hashArgs.add(targetColumn);
+    }
+
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", hashArgs);
+
+    assert dynamicIds.size() == 1 : "There should be one column left untreated the one with the bloom filter";
+    DynamicValue bloomDynamic = new DynamicValue(dynamicIds.poll(), TypeInfoFactory.binaryTypeInfo);
+    sjPredicates.add(
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFInBloomFilter(), "in_bloom_filter",
+            Arrays.asList(hashExp, new ExprNodeDynamicValueDesc(bloomDynamic))));
+    return and(sjPredicates);
+  }
+
+  private static RuntimeValuesInfo createRuntimeValuesInfo(ReduceSinkOperator rs, List<ReduceSinkOperator> sjBranches,
+      ParseContext parseContext) {
+    List<ExprNodeDesc> valueCols = rs.getConf().getValueCols();
+    RuntimeValuesInfo info = new RuntimeValuesInfo();
+    TableDesc rsFinalTableDesc =
+        PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "_col"));
+    List<String> dynamicValueIDs = new ArrayList<>();
+    for (ExprNodeDesc rsCol : valueCols) {
+      dynamicValueIDs.add(rs.toString() + rsCol.getExprString());
+    }
+
+    info.setTableDesc(rsFinalTableDesc);
+    info.setDynamicValueIDs(dynamicValueIDs);
+    info.setColExprs(valueCols);
+    List<ExprNodeDesc> targetTableExpressions = new ArrayList<>();
+    for (ReduceSinkOperator sjBranch : sjBranches) {
+      RuntimeValuesInfo sjInfo = parseContext.getRsToRuntimeValuesInfoMap().get(sjBranch);
+      assert sjInfo.getTargetColumns().size() == 1;
+      targetTableExpressions.add(sjInfo.getTargetColumns().get(0));
+    }
+    info.setTargetColumns(targetTableExpressions);
+    return info;
+  }
+
+  private static SelectOperator mergeSelectOps(Operator<?> parent, List<SelectOperator> selectOperators) {
+    List<String> colNames = new ArrayList<>();
+    List<ExprNodeDesc> colDescs = new ArrayList<>();
+    List<ColumnInfo> columnInfos = new ArrayList<>();
+    Map<String, ExprNodeDesc> selectColumnExprMap = new HashMap<>();
+    for (SelectOperator sel : selectOperators) {
+      for (ExprNodeDesc col : sel.getConf().getColList()) {
+        String colName = HiveConf.getColumnInternalName(colDescs.size());
+        colNames.add(colName);
+        columnInfos.add(new ColumnInfo(colName, col.getTypeInfo(), "", false));
+        colDescs.add(col);
+        selectColumnExprMap.put(colName, col);
+      }
+    }
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", colDescs);
+    String hashName = HiveConf.getColumnInternalName(colDescs.size() + 1);
+    colNames.add(hashName);
+    columnInfos.add(new ColumnInfo(hashName, hashExp.getTypeInfo(), "", false));
+
+    List<ExprNodeDesc> selDescs = new ArrayList<>(colDescs);
+    selDescs.add(hashExp);
+
+    SelectDesc select = new SelectDesc(selDescs, colNames);
+    SelectOperator selectOp =
+        (SelectOperator) OperatorFactory.getAndMakeChild(select, new RowSchema(columnInfos), parent);
+    selectOp.setColumnExprMap(selectColumnExprMap);
+    return selectOp;
+  }
+
+  private static ReduceSinkOperator createReduceSink(Operator<?> parentOp, NullOrdering nullOrder)
+      throws SemanticException {
+    List<ExprNodeDesc> valueCols = new ArrayList<>();
+    RowSchema parentSchema = parentOp.getSchema();
+    List<String> outColNames = new ArrayList<>();
+    for (int i = 0; i < parentSchema.getSignature().size(); i++) {
+      ColumnInfo colInfo = parentSchema.getSignature().get(i);
+      ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName(), "", false);
+      valueCols.add(colExpr);
+      outColNames.add(SemanticAnalyzer.getColumnInternalName(i));
+    }
+
+    ReduceSinkDesc rsDesc = PlanUtils
+        .getReduceSinkDesc(Collections.emptyList(), valueCols, outColNames, false, -1, 0, 1,
+            AcidUtils.Operation.NOT_ACID, nullOrder);
+    rsDesc.setColumnExprMap(Collections.emptyMap());
+    return (ReduceSinkOperator) OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(parentSchema), parentOp);
+  }
+
+  private static GroupByOperator createGroupBy(SelectOperator selectOp, Operator<?> parentOp, GroupByDesc.Mode gbMode,
+      HiveConf hiveConf) {
+
+    final List<ExprNodeDesc> params;
+    final GenericUDAFEvaluator.Mode udafMode = SemanticAnalyzer.groupByDescModeToUDAFMode(gbMode, false);
+    switch (gbMode) {
+    case FINAL:
+      params = createGroupByAggregationParameters((ReduceSinkOperator) parentOp);
+      break;
+    case HASH:
+      params = createGroupByAggregationParameters(selectOp);
+      break;
+    default:
+      throw new AssertionError(gbMode.toString() + " is not supported");
+    }
+
+    List<AggregationDesc> gbAggs = new ArrayList<>();
+    Deque<ExprNodeDesc> paramsCopy = new ArrayDeque<>(params);
+    while (paramsCopy.size() > 1) {
+      gbAggs.add(minAggregation(udafMode, paramsCopy.poll()));
+      gbAggs.add(maxAggregation(udafMode, paramsCopy.poll()));
+    }
+    gbAggs.add(bloomFilterAggregation(udafMode, paramsCopy.poll(), selectOp, hiveConf));
+    assert paramsCopy.size() == 0;
+
+    List<String> gbOutputNames = new ArrayList<>(gbAggs.size());
+    List<ColumnInfo> gbColInfos = new ArrayList<>(gbAggs.size());
+    for (int i = 0; i < params.size(); i++) {
+      String colName = HiveConf.getColumnInternalName(i);
+      gbOutputNames.add(colName);
+      final TypeInfo colType;
+      if (i == params.size() - 1) {
+        colType = TypeInfoFactory.binaryTypeInfo; // Bloom type
+      } else {
+        colType = params.get(i).getTypeInfo(); // Min/Max type
+      }
+      gbColInfos.add(new ColumnInfo(colName, colType, "", false));
+    }
+
+    float groupByMemoryUsage = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+    float memoryThreshold = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+    float minReductionHashAggr = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
+    GroupByDesc groupBy =
+        new GroupByDesc(gbMode, gbOutputNames, Collections.emptyList(), gbAggs, false, groupByMemoryUsage,
+            memoryThreshold, minReductionHashAggr, null, false, -1, false);
+    groupBy.setColumnExprMap(Collections.emptyMap());
+    return (GroupByOperator) OperatorFactory.getAndMakeChild(groupBy, new RowSchema(gbColInfos), parentOp);
+  }
+
+  private static List<ExprNodeDesc> createGroupByAggregationParameters(SelectOperator selectOp) {
+    List<ExprNodeDesc> params = new ArrayList<>();
+    // The first n-1 cols are used as parameters for min & max so we need two expressions
+    for (ColumnInfo c : selectOp.getSchema().getSignature()) {
+      String name = c.getInternalName();
+      ExprNodeColumnDesc p = new ExprNodeColumnDesc(new ColumnInfo(name, c.getType(), "", false));
+      params.add(p);
+      params.add(p);
+    }
+    // The last col is used as parameter for bloom so we need only one expression
+    params.remove(params.size() - 1);
+    return params;
+  }
+
+  private static List<ExprNodeDesc> createGroupByAggregationParameters(ReduceSinkOperator reduceOp) {
+    List<ExprNodeDesc> params = new ArrayList<>();
+    // There is a 1-1 mapping between columns and parameters for the aggregation functions min, max, bloom
+    for (ColumnInfo c : reduceOp.getSchema().getSignature()) {
+      String name = Utilities.ReduceField.VALUE + "." + c.getInternalName();
+      params.add(new ExprNodeColumnDesc(new ColumnInfo(name, c.getType(), "", false)));
+    }
+    return params;
+  }
+
+  private static ExprNodeGenericFuncDesc and(List<ExprNodeDesc> args) {
+    return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(), "and", args);

Review comment:
       It would be interesting to flatten here in case any of the args is an `AND`.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));
+
+      List<SelectOperator> selOps = new ArrayList<>(sjBrances.size());
+      for (ReduceSinkOperator rs : sjBrances) {
+        selOps.add(OperatorUtils.ancestor(rs, SelectOperator.class, 0, 0, 0, 0));
+      }
+      SelectOperator selectOp = mergeSelectOps(sjMergeCandidate.getKey().source, selOps);
+
+      GroupByOperator gbPartialOp = createGroupBy(selectOp, selectOp, GroupByDesc.Mode.HASH, hiveConf);
+
+      ReduceSinkOperator rsPartialOp = createReduceSink(gbPartialOp, NullOrdering.defaultNullOrder(hiveConf));
+      rsPartialOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
+      GroupByOperator gbCompleteOp = createGroupBy(selectOp, rsPartialOp, GroupByDesc.Mode.FINAL, hiveConf);
+
+      ReduceSinkOperator rsCompleteOp = createReduceSink(gbCompleteOp, NullOrdering.defaultNullOrder(hiveConf));
+
+      final TableScanOperator sjTargetTable = sjMergeCandidate.getKey().target;
+      SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(sjTargetTable, false);
+      parseContext.getRsToSemiJoinBranchInfo().put(rsCompleteOp, sjInfo);
+
+      // Save the info that is required at query time to resolve dynamic/runtime values.
+      RuntimeValuesInfo valuesInfo = createRuntimeValuesInfo(rsCompleteOp, sjBrances, parseContext);
+      parseContext.getRsToRuntimeValuesInfoMap().put(rsCompleteOp, valuesInfo);
+
+      ExprNodeGenericFuncDesc sjPredicate = createSemiJoinPredicate(sjBrances, valuesInfo, parseContext);
+
+      // Update filter operators with the new semi-join predicate
+      for (Operator<?> op : sjTargetTable.getChildOperators()) {
+        if (op instanceof FilterOperator) {
+          FilterDesc filter = ((FilterOperator) op).getConf();
+          filter.setPredicate(and(Arrays.asList(filter.getPredicate(), sjPredicate)));
+        }
+      }
+      // Update tableScan with the new semi-join predicate
+      sjTargetTable.getConf().setFilterExpr(and(Arrays.asList(sjTargetTable.getConf().getFilterExpr(), sjPredicate)));
+
+      for (ReduceSinkOperator rs : sjBrances) {
+        GenTezUtils.removeSemiJoinOperator(parseContext, rs, sjTargetTable);
+        GenTezUtils.removeBranch(rs);
+      }
+
+      // TODO How to associate multi-cols with gb ?
+      // parseContext.getColExprToGBMap().put(key, gb);
+    }
+    return parseContext;
+  }
+
+  private static ExprNodeGenericFuncDesc createSemiJoinPredicate(List<ReduceSinkOperator> sjBranches,
+      RuntimeValuesInfo sjValueInfo, ParseContext context) {
+    // Performance note: To speed-up evaluation 'BETWEEN' predicates should come before the 'IN_BLOOM_FILTER'
+    Deque<String> dynamicIds = new ArrayDeque<>(sjValueInfo.getDynamicValueIDs());
+    List<ExprNodeDesc> sjPredicates = new ArrayList<>();
+    List<ExprNodeDesc> hashArgs = new ArrayList<>();
+    for (ReduceSinkOperator rs : sjBranches) {
+      RuntimeValuesInfo info = context.getRsToRuntimeValuesInfoMap().get(rs);
+      assert info.getTargetColumns().size() == 1;
+      final ExprNodeDesc targetColumn = info.getTargetColumns().get(0);
+      TypeInfo typeInfo = targetColumn.getTypeInfo();
+      DynamicValue minDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+      DynamicValue maxDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+
+      List<ExprNodeDesc> betweenArgs = Arrays.asList(
+          // Use false to not invert between result
+          new ExprNodeConstantDesc(Boolean.FALSE),
+          targetColumn,
+          new ExprNodeDynamicValueDesc(minDynamic),
+          new ExprNodeDynamicValueDesc(maxDynamic));
+      ExprNodeDesc betweenExp =
+          new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFBetween(), "between", betweenArgs);
+      sjPredicates.add(betweenExp);
+      hashArgs.add(targetColumn);
+    }
+
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", hashArgs);
+
+    assert dynamicIds.size() == 1 : "There should be one column left untreated the one with the bloom filter";
+    DynamicValue bloomDynamic = new DynamicValue(dynamicIds.poll(), TypeInfoFactory.binaryTypeInfo);
+    sjPredicates.add(
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFInBloomFilter(), "in_bloom_filter",
+            Arrays.asList(hashExp, new ExprNodeDynamicValueDesc(bloomDynamic))));
+    return and(sjPredicates);
+  }
+
+  private static RuntimeValuesInfo createRuntimeValuesInfo(ReduceSinkOperator rs, List<ReduceSinkOperator> sjBranches,
+      ParseContext parseContext) {
+    List<ExprNodeDesc> valueCols = rs.getConf().getValueCols();
+    RuntimeValuesInfo info = new RuntimeValuesInfo();
+    TableDesc rsFinalTableDesc =
+        PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "_col"));
+    List<String> dynamicValueIDs = new ArrayList<>();
+    for (ExprNodeDesc rsCol : valueCols) {
+      dynamicValueIDs.add(rs.toString() + rsCol.getExprString());
+    }
+
+    info.setTableDesc(rsFinalTableDesc);
+    info.setDynamicValueIDs(dynamicValueIDs);
+    info.setColExprs(valueCols);
+    List<ExprNodeDesc> targetTableExpressions = new ArrayList<>();
+    for (ReduceSinkOperator sjBranch : sjBranches) {
+      RuntimeValuesInfo sjInfo = parseContext.getRsToRuntimeValuesInfoMap().get(sjBranch);
+      assert sjInfo.getTargetColumns().size() == 1;
+      targetTableExpressions.add(sjInfo.getTargetColumns().get(0));
+    }
+    info.setTargetColumns(targetTableExpressions);
+    return info;
+  }
+
+  private static SelectOperator mergeSelectOps(Operator<?> parent, List<SelectOperator> selectOperators) {
+    List<String> colNames = new ArrayList<>();
+    List<ExprNodeDesc> colDescs = new ArrayList<>();
+    List<ColumnInfo> columnInfos = new ArrayList<>();
+    Map<String, ExprNodeDesc> selectColumnExprMap = new HashMap<>();
+    for (SelectOperator sel : selectOperators) {
+      for (ExprNodeDesc col : sel.getConf().getColList()) {
+        String colName = HiveConf.getColumnInternalName(colDescs.size());
+        colNames.add(colName);
+        columnInfos.add(new ColumnInfo(colName, col.getTypeInfo(), "", false));
+        colDescs.add(col);
+        selectColumnExprMap.put(colName, col);
+      }
+    }
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", colDescs);
+    String hashName = HiveConf.getColumnInternalName(colDescs.size() + 1);
+    colNames.add(hashName);
+    columnInfos.add(new ColumnInfo(hashName, hashExp.getTypeInfo(), "", false));
+
+    List<ExprNodeDesc> selDescs = new ArrayList<>(colDescs);
+    selDescs.add(hashExp);
+
+    SelectDesc select = new SelectDesc(selDescs, colNames);
+    SelectOperator selectOp =
+        (SelectOperator) OperatorFactory.getAndMakeChild(select, new RowSchema(columnInfos), parent);
+    selectOp.setColumnExprMap(selectColumnExprMap);
+    return selectOp;
+  }
+
+  private static ReduceSinkOperator createReduceSink(Operator<?> parentOp, NullOrdering nullOrder)
+      throws SemanticException {
+    List<ExprNodeDesc> valueCols = new ArrayList<>();
+    RowSchema parentSchema = parentOp.getSchema();
+    List<String> outColNames = new ArrayList<>();
+    for (int i = 0; i < parentSchema.getSignature().size(); i++) {
+      ColumnInfo colInfo = parentSchema.getSignature().get(i);
+      ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName(), "", false);
+      valueCols.add(colExpr);
+      outColNames.add(SemanticAnalyzer.getColumnInternalName(i));
+    }
+
+    ReduceSinkDesc rsDesc = PlanUtils
+        .getReduceSinkDesc(Collections.emptyList(), valueCols, outColNames, false, -1, 0, 1,
+            AcidUtils.Operation.NOT_ACID, nullOrder);
+    rsDesc.setColumnExprMap(Collections.emptyMap());
+    return (ReduceSinkOperator) OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(parentSchema), parentOp);
+  }
+
+  private static GroupByOperator createGroupBy(SelectOperator selectOp, Operator<?> parentOp, GroupByDesc.Mode gbMode,
+      HiveConf hiveConf) {
+
+    final List<ExprNodeDesc> params;
+    final GenericUDAFEvaluator.Mode udafMode = SemanticAnalyzer.groupByDescModeToUDAFMode(gbMode, false);
+    switch (gbMode) {
+    case FINAL:
+      params = createGroupByAggregationParameters((ReduceSinkOperator) parentOp);
+      break;
+    case HASH:
+      params = createGroupByAggregationParameters(selectOp);
+      break;
+    default:
+      throw new AssertionError(gbMode.toString() + " is not supported");
+    }
+
+    List<AggregationDesc> gbAggs = new ArrayList<>();
+    Deque<ExprNodeDesc> paramsCopy = new ArrayDeque<>(params);
+    while (paramsCopy.size() > 1) {
+      gbAggs.add(minAggregation(udafMode, paramsCopy.poll()));
+      gbAggs.add(maxAggregation(udafMode, paramsCopy.poll()));
+    }
+    gbAggs.add(bloomFilterAggregation(udafMode, paramsCopy.poll(), selectOp, hiveConf));
+    assert paramsCopy.size() == 0;
+
+    List<String> gbOutputNames = new ArrayList<>(gbAggs.size());
+    List<ColumnInfo> gbColInfos = new ArrayList<>(gbAggs.size());
+    for (int i = 0; i < params.size(); i++) {
+      String colName = HiveConf.getColumnInternalName(i);
+      gbOutputNames.add(colName);
+      final TypeInfo colType;
+      if (i == params.size() - 1) {
+        colType = TypeInfoFactory.binaryTypeInfo; // Bloom type
+      } else {
+        colType = params.get(i).getTypeInfo(); // Min/Max type
+      }
+      gbColInfos.add(new ColumnInfo(colName, colType, "", false));
+    }
+
+    float groupByMemoryUsage = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+    float memoryThreshold = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+    float minReductionHashAggr = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
+    GroupByDesc groupBy =
+        new GroupByDesc(gbMode, gbOutputNames, Collections.emptyList(), gbAggs, false, groupByMemoryUsage,
+            memoryThreshold, minReductionHashAggr, null, false, -1, false);
+    groupBy.setColumnExprMap(Collections.emptyMap());
+    return (GroupByOperator) OperatorFactory.getAndMakeChild(groupBy, new RowSchema(gbColInfos), parentOp);
+  }
+
+  private static List<ExprNodeDesc> createGroupByAggregationParameters(SelectOperator selectOp) {
+    List<ExprNodeDesc> params = new ArrayList<>();
+    // The first n-1 cols are used as parameters for min & max so we need two expressions
+    for (ColumnInfo c : selectOp.getSchema().getSignature()) {
+      String name = c.getInternalName();
+      ExprNodeColumnDesc p = new ExprNodeColumnDesc(new ColumnInfo(name, c.getType(), "", false));
+      params.add(p);
+      params.add(p);
+    }
+    // The last col is used as parameter for bloom so we need only one expression
+    params.remove(params.size() - 1);
+    return params;
+  }
+
+  private static List<ExprNodeDesc> createGroupByAggregationParameters(ReduceSinkOperator reduceOp) {
+    List<ExprNodeDesc> params = new ArrayList<>();
+    // There is a 1-1 mapping between columns and parameters for the aggregation functions min, max, bloom
+    for (ColumnInfo c : reduceOp.getSchema().getSignature()) {
+      String name = Utilities.ReduceField.VALUE + "." + c.getInternalName();
+      params.add(new ExprNodeColumnDesc(new ColumnInfo(name, c.getType(), "", false)));
+    }
+    return params;
+  }
+
+  private static ExprNodeGenericFuncDesc and(List<ExprNodeDesc> args) {
+    return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(), "and", args);
+  }
+
+  private static AggregationDesc minAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col) {
+    List<ExprNodeDesc> p = Collections.singletonList(col);
+    return new AggregationDesc("min", new GenericUDAFMin.GenericUDAFMinEvaluator(), p, false, mode);
+  }
+
+  private static AggregationDesc maxAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col) {
+    List<ExprNodeDesc> p = Collections.singletonList(col);
+    return new AggregationDesc("max", new GenericUDAFMax.GenericUDAFMaxEvaluator(), p, false, mode);
+  }
+
+  private static AggregationDesc bloomFilterAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col,
+      SelectOperator source, HiveConf conf) {
+    GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator bloomFilterEval =
+        new GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator();
+    bloomFilterEval.setSourceOperator(source);
+    bloomFilterEval.setMaxEntries(conf.getLongVar(HiveConf.ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+    bloomFilterEval.setMinEntries(conf.getLongVar(HiveConf.ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
+    bloomFilterEval.setFactor(conf.getFloatVar(HiveConf.ConfVars.TEZ_BLOOM_FILTER_FACTOR));
+    // TODO Setup hints

Review comment:
       What's the current behavior in the presence of hints? It seems we do not propagate them... Should we create a follow-up JIRA to keep track?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;

Review comment:
       Wondering whether we should make these checks `Preconditions` rather than `assert`.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));
+
+      List<SelectOperator> selOps = new ArrayList<>(sjBrances.size());
+      for (ReduceSinkOperator rs : sjBrances) {
+        selOps.add(OperatorUtils.ancestor(rs, SelectOperator.class, 0, 0, 0, 0));
+      }
+      SelectOperator selectOp = mergeSelectOps(sjMergeCandidate.getKey().source, selOps);
+
+      GroupByOperator gbPartialOp = createGroupBy(selectOp, selectOp, GroupByDesc.Mode.HASH, hiveConf);
+
+      ReduceSinkOperator rsPartialOp = createReduceSink(gbPartialOp, NullOrdering.defaultNullOrder(hiveConf));
+      rsPartialOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
+      GroupByOperator gbCompleteOp = createGroupBy(selectOp, rsPartialOp, GroupByDesc.Mode.FINAL, hiveConf);
+
+      ReduceSinkOperator rsCompleteOp = createReduceSink(gbCompleteOp, NullOrdering.defaultNullOrder(hiveConf));
+
+      final TableScanOperator sjTargetTable = sjMergeCandidate.getKey().target;
+      SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(sjTargetTable, false);
+      parseContext.getRsToSemiJoinBranchInfo().put(rsCompleteOp, sjInfo);
+
+      // Save the info that is required at query time to resolve dynamic/runtime values.
+      RuntimeValuesInfo valuesInfo = createRuntimeValuesInfo(rsCompleteOp, sjBrances, parseContext);
+      parseContext.getRsToRuntimeValuesInfoMap().put(rsCompleteOp, valuesInfo);
+
+      ExprNodeGenericFuncDesc sjPredicate = createSemiJoinPredicate(sjBrances, valuesInfo, parseContext);
+
+      // Update filter operators with the new semi-join predicate
+      for (Operator<?> op : sjTargetTable.getChildOperators()) {
+        if (op instanceof FilterOperator) {
+          FilterDesc filter = ((FilterOperator) op).getConf();
+          filter.setPredicate(and(Arrays.asList(filter.getPredicate(), sjPredicate)));
+        }
+      }
+      // Update tableScan with the new semi-join predicate
+      sjTargetTable.getConf().setFilterExpr(and(Arrays.asList(sjTargetTable.getConf().getFilterExpr(), sjPredicate)));
+
+      for (ReduceSinkOperator rs : sjBrances) {
+        GenTezUtils.removeSemiJoinOperator(parseContext, rs, sjTargetTable);
+        GenTezUtils.removeBranch(rs);
+      }
+
+      // TODO How to associate multi-cols with gb ?

Review comment:
       TODO can be removed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));
+
+      List<SelectOperator> selOps = new ArrayList<>(sjBrances.size());
+      for (ReduceSinkOperator rs : sjBrances) {
+        selOps.add(OperatorUtils.ancestor(rs, SelectOperator.class, 0, 0, 0, 0));
+      }
+      SelectOperator selectOp = mergeSelectOps(sjMergeCandidate.getKey().source, selOps);
+
+      GroupByOperator gbPartialOp = createGroupBy(selectOp, selectOp, GroupByDesc.Mode.HASH, hiveConf);
+
+      ReduceSinkOperator rsPartialOp = createReduceSink(gbPartialOp, NullOrdering.defaultNullOrder(hiveConf));
+      rsPartialOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
+      GroupByOperator gbCompleteOp = createGroupBy(selectOp, rsPartialOp, GroupByDesc.Mode.FINAL, hiveConf);
+
+      ReduceSinkOperator rsCompleteOp = createReduceSink(gbCompleteOp, NullOrdering.defaultNullOrder(hiveConf));
+
+      final TableScanOperator sjTargetTable = sjMergeCandidate.getKey().target;
+      SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(sjTargetTable, false);
+      parseContext.getRsToSemiJoinBranchInfo().put(rsCompleteOp, sjInfo);
+
+      // Save the info that is required at query time to resolve dynamic/runtime values.
+      RuntimeValuesInfo valuesInfo = createRuntimeValuesInfo(rsCompleteOp, sjBrances, parseContext);
+      parseContext.getRsToRuntimeValuesInfoMap().put(rsCompleteOp, valuesInfo);
+
+      ExprNodeGenericFuncDesc sjPredicate = createSemiJoinPredicate(sjBrances, valuesInfo, parseContext);
+
+      // Update filter operators with the new semi-join predicate
+      for (Operator<?> op : sjTargetTable.getChildOperators()) {
+        if (op instanceof FilterOperator) {
+          FilterDesc filter = ((FilterOperator) op).getConf();
+          filter.setPredicate(and(Arrays.asList(filter.getPredicate(), sjPredicate)));
+        }
+      }
+      // Update tableScan with the new semi-join predicate
+      sjTargetTable.getConf().setFilterExpr(and(Arrays.asList(sjTargetTable.getConf().getFilterExpr(), sjPredicate)));
+
+      for (ReduceSinkOperator rs : sjBrances) {
+        GenTezUtils.removeSemiJoinOperator(parseContext, rs, sjTargetTable);
+        GenTezUtils.removeBranch(rs);
+      }
+
+      // TODO How to associate multi-cols with gb ?
+      // parseContext.getColExprToGBMap().put(key, gb);
+    }
+    return parseContext;
+  }
+
+  private static ExprNodeGenericFuncDesc createSemiJoinPredicate(List<ReduceSinkOperator> sjBranches,
+      RuntimeValuesInfo sjValueInfo, ParseContext context) {
+    // Performance note: To speed-up evaluation 'BETWEEN' predicates should come before the 'IN_BLOOM_FILTER'
+    Deque<String> dynamicIds = new ArrayDeque<>(sjValueInfo.getDynamicValueIDs());
+    List<ExprNodeDesc> sjPredicates = new ArrayList<>();
+    List<ExprNodeDesc> hashArgs = new ArrayList<>();
+    for (ReduceSinkOperator rs : sjBranches) {
+      RuntimeValuesInfo info = context.getRsToRuntimeValuesInfoMap().get(rs);
+      assert info.getTargetColumns().size() == 1;
+      final ExprNodeDesc targetColumn = info.getTargetColumns().get(0);
+      TypeInfo typeInfo = targetColumn.getTypeInfo();
+      DynamicValue minDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+      DynamicValue maxDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+
+      List<ExprNodeDesc> betweenArgs = Arrays.asList(
+          // Use false to not invert between result
+          new ExprNodeConstantDesc(Boolean.FALSE),
+          targetColumn,
+          new ExprNodeDynamicValueDesc(minDynamic),
+          new ExprNodeDynamicValueDesc(maxDynamic));
+      ExprNodeDesc betweenExp =
+          new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFBetween(), "between", betweenArgs);
+      sjPredicates.add(betweenExp);
+      hashArgs.add(targetColumn);
+    }
+
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", hashArgs);
+
+    assert dynamicIds.size() == 1 : "There should be one column left untreated the one with the bloom filter";
+    DynamicValue bloomDynamic = new DynamicValue(dynamicIds.poll(), TypeInfoFactory.binaryTypeInfo);
+    sjPredicates.add(
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFInBloomFilter(), "in_bloom_filter",
+            Arrays.asList(hashExp, new ExprNodeDynamicValueDesc(bloomDynamic))));
+    return and(sjPredicates);
+  }
+
+  private static RuntimeValuesInfo createRuntimeValuesInfo(ReduceSinkOperator rs, List<ReduceSinkOperator> sjBranches,
+      ParseContext parseContext) {
+    List<ExprNodeDesc> valueCols = rs.getConf().getValueCols();
+    RuntimeValuesInfo info = new RuntimeValuesInfo();
+    TableDesc rsFinalTableDesc =
+        PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "_col"));
+    List<String> dynamicValueIDs = new ArrayList<>();
+    for (ExprNodeDesc rsCol : valueCols) {
+      dynamicValueIDs.add(rs.toString() + rsCol.getExprString());
+    }
+
+    info.setTableDesc(rsFinalTableDesc);
+    info.setDynamicValueIDs(dynamicValueIDs);
+    info.setColExprs(valueCols);
+    List<ExprNodeDesc> targetTableExpressions = new ArrayList<>();
+    for (ReduceSinkOperator sjBranch : sjBranches) {
+      RuntimeValuesInfo sjInfo = parseContext.getRsToRuntimeValuesInfoMap().get(sjBranch);
+      assert sjInfo.getTargetColumns().size() == 1;
+      targetTableExpressions.add(sjInfo.getTargetColumns().get(0));
+    }
+    info.setTargetColumns(targetTableExpressions);
+    return info;
+  }
+
+  private static SelectOperator mergeSelectOps(Operator<?> parent, List<SelectOperator> selectOperators) {
+    List<String> colNames = new ArrayList<>();
+    List<ExprNodeDesc> colDescs = new ArrayList<>();
+    List<ColumnInfo> columnInfos = new ArrayList<>();
+    Map<String, ExprNodeDesc> selectColumnExprMap = new HashMap<>();
+    for (SelectOperator sel : selectOperators) {
+      for (ExprNodeDesc col : sel.getConf().getColList()) {

Review comment:
       Can these SEL operators have multiple columns? I thought they would project a single column for the SJ?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();

Review comment:
       typo? `sjBrances` -> `sjBranches`

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));

Review comment:
       Maybe using a LinkedHashMap for `rsToSemiJoinBranchInfo` from the onset would eliminate the need for sorting and above (just a thought).

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));
+
+      List<SelectOperator> selOps = new ArrayList<>(sjBrances.size());
+      for (ReduceSinkOperator rs : sjBrances) {
+        selOps.add(OperatorUtils.ancestor(rs, SelectOperator.class, 0, 0, 0, 0));
+      }
+      SelectOperator selectOp = mergeSelectOps(sjMergeCandidate.getKey().source, selOps);
+
+      GroupByOperator gbPartialOp = createGroupBy(selectOp, selectOp, GroupByDesc.Mode.HASH, hiveConf);
+
+      ReduceSinkOperator rsPartialOp = createReduceSink(gbPartialOp, NullOrdering.defaultNullOrder(hiveConf));
+      rsPartialOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
+      GroupByOperator gbCompleteOp = createGroupBy(selectOp, rsPartialOp, GroupByDesc.Mode.FINAL, hiveConf);
+
+      ReduceSinkOperator rsCompleteOp = createReduceSink(gbCompleteOp, NullOrdering.defaultNullOrder(hiveConf));
+
+      final TableScanOperator sjTargetTable = sjMergeCandidate.getKey().target;
+      SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(sjTargetTable, false);
+      parseContext.getRsToSemiJoinBranchInfo().put(rsCompleteOp, sjInfo);
+
+      // Save the info that is required at query time to resolve dynamic/runtime values.
+      RuntimeValuesInfo valuesInfo = createRuntimeValuesInfo(rsCompleteOp, sjBrances, parseContext);
+      parseContext.getRsToRuntimeValuesInfoMap().put(rsCompleteOp, valuesInfo);
+
+      ExprNodeGenericFuncDesc sjPredicate = createSemiJoinPredicate(sjBrances, valuesInfo, parseContext);
+
+      // Update filter operators with the new semi-join predicate
+      for (Operator<?> op : sjTargetTable.getChildOperators()) {
+        if (op instanceof FilterOperator) {
+          FilterDesc filter = ((FilterOperator) op).getConf();
+          filter.setPredicate(and(Arrays.asList(filter.getPredicate(), sjPredicate)));
+        }
+      }
+      // Update tableScan with the new semi-join predicate
+      sjTargetTable.getConf().setFilterExpr(and(Arrays.asList(sjTargetTable.getConf().getFilterExpr(), sjPredicate)));
+
+      for (ReduceSinkOperator rs : sjBrances) {
+        GenTezUtils.removeSemiJoinOperator(parseContext, rs, sjTargetTable);
+        GenTezUtils.removeBranch(rs);
+      }
+
+      // TODO How to associate multi-cols with gb ?
+      // parseContext.getColExprToGBMap().put(key, gb);
+    }
+    return parseContext;
+  }
+
+  private static ExprNodeGenericFuncDesc createSemiJoinPredicate(List<ReduceSinkOperator> sjBranches,
+      RuntimeValuesInfo sjValueInfo, ParseContext context) {
+    // Performance note: To speed-up evaluation 'BETWEEN' predicates should come before the 'IN_BLOOM_FILTER'
+    Deque<String> dynamicIds = new ArrayDeque<>(sjValueInfo.getDynamicValueIDs());
+    List<ExprNodeDesc> sjPredicates = new ArrayList<>();
+    List<ExprNodeDesc> hashArgs = new ArrayList<>();
+    for (ReduceSinkOperator rs : sjBranches) {
+      RuntimeValuesInfo info = context.getRsToRuntimeValuesInfoMap().get(rs);
+      assert info.getTargetColumns().size() == 1;
+      final ExprNodeDesc targetColumn = info.getTargetColumns().get(0);
+      TypeInfo typeInfo = targetColumn.getTypeInfo();
+      DynamicValue minDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+      DynamicValue maxDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+
+      List<ExprNodeDesc> betweenArgs = Arrays.asList(
+          // Use false to not invert between result
+          new ExprNodeConstantDesc(Boolean.FALSE),
+          targetColumn,
+          new ExprNodeDynamicValueDesc(minDynamic),
+          new ExprNodeDynamicValueDesc(maxDynamic));
+      ExprNodeDesc betweenExp =
+          new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFBetween(), "between", betweenArgs);
+      sjPredicates.add(betweenExp);
+      hashArgs.add(targetColumn);
+    }
+
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", hashArgs);
+
+    assert dynamicIds.size() == 1 : "There should be one column left untreated the one with the bloom filter";
+    DynamicValue bloomDynamic = new DynamicValue(dynamicIds.poll(), TypeInfoFactory.binaryTypeInfo);
+    sjPredicates.add(
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFInBloomFilter(), "in_bloom_filter",
+            Arrays.asList(hashExp, new ExprNodeDynamicValueDesc(bloomDynamic))));
+    return and(sjPredicates);
+  }
+
+  private static RuntimeValuesInfo createRuntimeValuesInfo(ReduceSinkOperator rs, List<ReduceSinkOperator> sjBranches,
+      ParseContext parseContext) {
+    List<ExprNodeDesc> valueCols = rs.getConf().getValueCols();
+    RuntimeValuesInfo info = new RuntimeValuesInfo();
+    TableDesc rsFinalTableDesc =
+        PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "_col"));
+    List<String> dynamicValueIDs = new ArrayList<>();
+    for (ExprNodeDesc rsCol : valueCols) {
+      dynamicValueIDs.add(rs.toString() + rsCol.getExprString());
+    }
+
+    info.setTableDesc(rsFinalTableDesc);
+    info.setDynamicValueIDs(dynamicValueIDs);
+    info.setColExprs(valueCols);
+    List<ExprNodeDesc> targetTableExpressions = new ArrayList<>();
+    for (ReduceSinkOperator sjBranch : sjBranches) {
+      RuntimeValuesInfo sjInfo = parseContext.getRsToRuntimeValuesInfoMap().get(sjBranch);
+      assert sjInfo.getTargetColumns().size() == 1;
+      targetTableExpressions.add(sjInfo.getTargetColumns().get(0));
+    }
+    info.setTargetColumns(targetTableExpressions);
+    return info;
+  }
+
+  private static SelectOperator mergeSelectOps(Operator<?> parent, List<SelectOperator> selectOperators) {
+    List<String> colNames = new ArrayList<>();
+    List<ExprNodeDesc> colDescs = new ArrayList<>();
+    List<ColumnInfo> columnInfos = new ArrayList<>();
+    Map<String, ExprNodeDesc> selectColumnExprMap = new HashMap<>();
+    for (SelectOperator sel : selectOperators) {
+      for (ExprNodeDesc col : sel.getConf().getColList()) {
+        String colName = HiveConf.getColumnInternalName(colDescs.size());
+        colNames.add(colName);
+        columnInfos.add(new ColumnInfo(colName, col.getTypeInfo(), "", false));
+        colDescs.add(col);
+        selectColumnExprMap.put(colName, col);
+      }
+    }
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", colDescs);
+    String hashName = HiveConf.getColumnInternalName(colDescs.size() + 1);
+    colNames.add(hashName);
+    columnInfos.add(new ColumnInfo(hashName, hashExp.getTypeInfo(), "", false));
+
+    List<ExprNodeDesc> selDescs = new ArrayList<>(colDescs);
+    selDescs.add(hashExp);
+
+    SelectDesc select = new SelectDesc(selDescs, colNames);
+    SelectOperator selectOp =
+        (SelectOperator) OperatorFactory.getAndMakeChild(select, new RowSchema(columnInfos), parent);
+    selectOp.setColumnExprMap(selectColumnExprMap);
+    return selectOp;
+  }
+
+  private static ReduceSinkOperator createReduceSink(Operator<?> parentOp, NullOrdering nullOrder)
+      throws SemanticException {
+    List<ExprNodeDesc> valueCols = new ArrayList<>();
+    RowSchema parentSchema = parentOp.getSchema();
+    List<String> outColNames = new ArrayList<>();
+    for (int i = 0; i < parentSchema.getSignature().size(); i++) {
+      ColumnInfo colInfo = parentSchema.getSignature().get(i);
+      ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName(), "", false);
+      valueCols.add(colExpr);
+      outColNames.add(SemanticAnalyzer.getColumnInternalName(i));
+    }
+
+    ReduceSinkDesc rsDesc = PlanUtils
+        .getReduceSinkDesc(Collections.emptyList(), valueCols, outColNames, false, -1, 0, 1,
+            AcidUtils.Operation.NOT_ACID, nullOrder);
+    rsDesc.setColumnExprMap(Collections.emptyMap());
+    return (ReduceSinkOperator) OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(parentSchema), parentOp);
+  }
+
+  private static GroupByOperator createGroupBy(SelectOperator selectOp, Operator<?> parentOp, GroupByDesc.Mode gbMode,
+      HiveConf hiveConf) {
+
+    final List<ExprNodeDesc> params;
+    final GenericUDAFEvaluator.Mode udafMode = SemanticAnalyzer.groupByDescModeToUDAFMode(gbMode, false);
+    switch (gbMode) {
+    case FINAL:
+      params = createGroupByAggregationParameters((ReduceSinkOperator) parentOp);
+      break;
+    case HASH:
+      params = createGroupByAggregationParameters(selectOp);
+      break;
+    default:
+      throw new AssertionError(gbMode.toString() + " is not supported");
+    }
+
+    List<AggregationDesc> gbAggs = new ArrayList<>();
+    Deque<ExprNodeDesc> paramsCopy = new ArrayDeque<>(params);
+    while (paramsCopy.size() > 1) {
+      gbAggs.add(minAggregation(udafMode, paramsCopy.poll()));
+      gbAggs.add(maxAggregation(udafMode, paramsCopy.poll()));
+    }
+    gbAggs.add(bloomFilterAggregation(udafMode, paramsCopy.poll(), selectOp, hiveConf));
+    assert paramsCopy.size() == 0;
+
+    List<String> gbOutputNames = new ArrayList<>(gbAggs.size());
+    List<ColumnInfo> gbColInfos = new ArrayList<>(gbAggs.size());
+    for (int i = 0; i < params.size(); i++) {
+      String colName = HiveConf.getColumnInternalName(i);
+      gbOutputNames.add(colName);
+      final TypeInfo colType;
+      if (i == params.size() - 1) {
+        colType = TypeInfoFactory.binaryTypeInfo; // Bloom type
+      } else {
+        colType = params.get(i).getTypeInfo(); // Min/Max type
+      }
+      gbColInfos.add(new ColumnInfo(colName, colType, "", false));
+    }
+
+    float groupByMemoryUsage = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+    float memoryThreshold = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+    float minReductionHashAggr = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
+    GroupByDesc groupBy =
+        new GroupByDesc(gbMode, gbOutputNames, Collections.emptyList(), gbAggs, false, groupByMemoryUsage,
+            memoryThreshold, minReductionHashAggr, null, false, -1, false);
+    groupBy.setColumnExprMap(Collections.emptyMap());
+    return (GroupByOperator) OperatorFactory.getAndMakeChild(groupBy, new RowSchema(gbColInfos), parentOp);
+  }
+
+  private static List<ExprNodeDesc> createGroupByAggregationParameters(SelectOperator selectOp) {
+    List<ExprNodeDesc> params = new ArrayList<>();
+    // The first n-1 cols are used as parameters for min & max so we need two expressions
+    for (ColumnInfo c : selectOp.getSchema().getSignature()) {
+      String name = c.getInternalName();
+      ExprNodeColumnDesc p = new ExprNodeColumnDesc(new ColumnInfo(name, c.getType(), "", false));
+      params.add(p);
+      params.add(p);
+    }
+    // The last col is used as parameter for bloom so we need only one expression
+    params.remove(params.size() - 1);
+    return params;
+  }
+
+  private static List<ExprNodeDesc> createGroupByAggregationParameters(ReduceSinkOperator reduceOp) {
+    List<ExprNodeDesc> params = new ArrayList<>();
+    // There is a 1-1 mapping between columns and parameters for the aggregation functions min, max, bloom
+    for (ColumnInfo c : reduceOp.getSchema().getSignature()) {
+      String name = Utilities.ReduceField.VALUE + "." + c.getInternalName();
+      params.add(new ExprNodeColumnDesc(new ColumnInfo(name, c.getType(), "", false)));
+    }
+    return params;
+  }
+
+  private static ExprNodeGenericFuncDesc and(List<ExprNodeDesc> args) {
+    return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(), "and", args);
+  }
+
+  private static AggregationDesc minAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col) {
+    List<ExprNodeDesc> p = Collections.singletonList(col);
+    return new AggregationDesc("min", new GenericUDAFMin.GenericUDAFMinEvaluator(), p, false, mode);
+  }
+
+  private static AggregationDesc maxAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col) {
+    List<ExprNodeDesc> p = Collections.singletonList(col);
+    return new AggregationDesc("max", new GenericUDAFMax.GenericUDAFMaxEvaluator(), p, false, mode);
+  }
+
+  private static AggregationDesc bloomFilterAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col,
+      SelectOperator source, HiveConf conf) {
+    GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator bloomFilterEval =
+        new GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator();
+    bloomFilterEval.setSourceOperator(source);
+    bloomFilterEval.setMaxEntries(conf.getLongVar(HiveConf.ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+    bloomFilterEval.setMinEntries(conf.getLongVar(HiveConf.ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
+    bloomFilterEval.setFactor(conf.getFloatVar(HiveConf.ConfVars.TEZ_BLOOM_FILTER_FACTOR));
+    // TODO Setup hints
+    List<ExprNodeDesc> p = Collections.singletonList(col);
+    AggregationDesc bloom = new AggregationDesc("bloom_filter", bloomFilterEval, p, false, mode);
+    // TODO Why do we need to set it explicitly?

Review comment:
       To the question, I am not sure... I would assume it would be initialized correctly.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));
+
+      List<SelectOperator> selOps = new ArrayList<>(sjBrances.size());
+      for (ReduceSinkOperator rs : sjBrances) {
+        selOps.add(OperatorUtils.ancestor(rs, SelectOperator.class, 0, 0, 0, 0));
+      }
+      SelectOperator selectOp = mergeSelectOps(sjMergeCandidate.getKey().source, selOps);

Review comment:
       It seems we are somehow duplicating part of the logic to create SJs below. We can create a follow-up JIRA to try to consolidate this logic with the creation path for single column SJs.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/SemiJoinReductionMerge.java
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicValue;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicValueDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMurmurHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.util.NullOrdering;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class SemiJoinReductionMerge extends Transform {
+
+  public ParseContext transform(ParseContext parseContext) throws SemanticException {
+    Map<ReduceSinkOperator, SemiJoinBranchInfo> map = parseContext.getRsToSemiJoinBranchInfo();
+    if (map.isEmpty()) {
+      return parseContext;
+    }
+    HiveConf hiveConf = parseContext.getConf();
+
+    // Order does not really matter but it is necessary to keep plans stable
+    SortedMap<SJSourceTarget, List<ReduceSinkOperator>> sameTableSJ =
+        new TreeMap<>(Comparator.comparing(SJSourceTarget::toString));
+    for (Map.Entry<ReduceSinkOperator, SemiJoinBranchInfo> smjEntry : map.entrySet()) {
+      TableScanOperator ts = smjEntry.getValue().getTsOp();
+      // Semijoin optimization branch should look like <Parent>-SEL-GB1-RS1-GB2-RS2
+      SelectOperator selOp = OperatorUtils.ancestor(smjEntry.getKey(), SelectOperator.class, 0, 0, 0, 0);
+      assert selOp != null;
+      assert selOp.getParentOperators().size() == 1;
+      Operator<?> source = selOp.getParentOperators().get(0);
+      SJSourceTarget sjKey = new SJSourceTarget(source, ts);
+      List<ReduceSinkOperator> ops = sameTableSJ.computeIfAbsent(sjKey, tableScanOperator -> new ArrayList<>());
+      ops.add(smjEntry.getKey());
+    }
+    for (Map.Entry<SJSourceTarget, List<ReduceSinkOperator>> sjMergeCandidate : sameTableSJ.entrySet()) {
+      final List<ReduceSinkOperator> sjBrances = sjMergeCandidate.getValue();
+      if (sjBrances.size() < 2) {
+        continue;
+      }
+      // Order does not really matter but it is necessary to keep plans stable
+      sjBrances.sort(Comparator.comparing(Operator::getIdentifier));
+
+      List<SelectOperator> selOps = new ArrayList<>(sjBrances.size());
+      for (ReduceSinkOperator rs : sjBrances) {
+        selOps.add(OperatorUtils.ancestor(rs, SelectOperator.class, 0, 0, 0, 0));
+      }
+      SelectOperator selectOp = mergeSelectOps(sjMergeCandidate.getKey().source, selOps);
+
+      GroupByOperator gbPartialOp = createGroupBy(selectOp, selectOp, GroupByDesc.Mode.HASH, hiveConf);
+
+      ReduceSinkOperator rsPartialOp = createReduceSink(gbPartialOp, NullOrdering.defaultNullOrder(hiveConf));
+      rsPartialOp.getConf().setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.QUICKSTART));
+
+      GroupByOperator gbCompleteOp = createGroupBy(selectOp, rsPartialOp, GroupByDesc.Mode.FINAL, hiveConf);
+
+      ReduceSinkOperator rsCompleteOp = createReduceSink(gbCompleteOp, NullOrdering.defaultNullOrder(hiveConf));
+
+      final TableScanOperator sjTargetTable = sjMergeCandidate.getKey().target;
+      SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(sjTargetTable, false);
+      parseContext.getRsToSemiJoinBranchInfo().put(rsCompleteOp, sjInfo);
+
+      // Save the info that is required at query time to resolve dynamic/runtime values.
+      RuntimeValuesInfo valuesInfo = createRuntimeValuesInfo(rsCompleteOp, sjBrances, parseContext);
+      parseContext.getRsToRuntimeValuesInfoMap().put(rsCompleteOp, valuesInfo);
+
+      ExprNodeGenericFuncDesc sjPredicate = createSemiJoinPredicate(sjBrances, valuesInfo, parseContext);
+
+      // Update filter operators with the new semi-join predicate
+      for (Operator<?> op : sjTargetTable.getChildOperators()) {
+        if (op instanceof FilterOperator) {
+          FilterDesc filter = ((FilterOperator) op).getConf();
+          filter.setPredicate(and(Arrays.asList(filter.getPredicate(), sjPredicate)));
+        }
+      }
+      // Update tableScan with the new semi-join predicate
+      sjTargetTable.getConf().setFilterExpr(and(Arrays.asList(sjTargetTable.getConf().getFilterExpr(), sjPredicate)));
+
+      for (ReduceSinkOperator rs : sjBrances) {
+        GenTezUtils.removeSemiJoinOperator(parseContext, rs, sjTargetTable);
+        GenTezUtils.removeBranch(rs);
+      }
+
+      // TODO How to associate multi-cols with gb ?
+      // parseContext.getColExprToGBMap().put(key, gb);
+    }
+    return parseContext;
+  }
+
+  private static ExprNodeGenericFuncDesc createSemiJoinPredicate(List<ReduceSinkOperator> sjBranches,
+      RuntimeValuesInfo sjValueInfo, ParseContext context) {
+    // Performance note: To speed-up evaluation 'BETWEEN' predicates should come before the 'IN_BLOOM_FILTER'
+    Deque<String> dynamicIds = new ArrayDeque<>(sjValueInfo.getDynamicValueIDs());
+    List<ExprNodeDesc> sjPredicates = new ArrayList<>();
+    List<ExprNodeDesc> hashArgs = new ArrayList<>();
+    for (ReduceSinkOperator rs : sjBranches) {
+      RuntimeValuesInfo info = context.getRsToRuntimeValuesInfoMap().get(rs);
+      assert info.getTargetColumns().size() == 1;
+      final ExprNodeDesc targetColumn = info.getTargetColumns().get(0);
+      TypeInfo typeInfo = targetColumn.getTypeInfo();
+      DynamicValue minDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+      DynamicValue maxDynamic = new DynamicValue(dynamicIds.poll(), typeInfo);
+
+      List<ExprNodeDesc> betweenArgs = Arrays.asList(
+          // Use false to not invert between result
+          new ExprNodeConstantDesc(Boolean.FALSE),
+          targetColumn,
+          new ExprNodeDynamicValueDesc(minDynamic),
+          new ExprNodeDynamicValueDesc(maxDynamic));
+      ExprNodeDesc betweenExp =
+          new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFBetween(), "between", betweenArgs);
+      sjPredicates.add(betweenExp);
+      hashArgs.add(targetColumn);
+    }
+
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", hashArgs);
+
+    assert dynamicIds.size() == 1 : "There should be one column left untreated the one with the bloom filter";
+    DynamicValue bloomDynamic = new DynamicValue(dynamicIds.poll(), TypeInfoFactory.binaryTypeInfo);
+    sjPredicates.add(
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFInBloomFilter(), "in_bloom_filter",
+            Arrays.asList(hashExp, new ExprNodeDynamicValueDesc(bloomDynamic))));
+    return and(sjPredicates);
+  }
+
+  private static RuntimeValuesInfo createRuntimeValuesInfo(ReduceSinkOperator rs, List<ReduceSinkOperator> sjBranches,
+      ParseContext parseContext) {
+    List<ExprNodeDesc> valueCols = rs.getConf().getValueCols();
+    RuntimeValuesInfo info = new RuntimeValuesInfo();
+    TableDesc rsFinalTableDesc =
+        PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols, "_col"));
+    List<String> dynamicValueIDs = new ArrayList<>();
+    for (ExprNodeDesc rsCol : valueCols) {
+      dynamicValueIDs.add(rs.toString() + rsCol.getExprString());
+    }
+
+    info.setTableDesc(rsFinalTableDesc);
+    info.setDynamicValueIDs(dynamicValueIDs);
+    info.setColExprs(valueCols);
+    List<ExprNodeDesc> targetTableExpressions = new ArrayList<>();
+    for (ReduceSinkOperator sjBranch : sjBranches) {
+      RuntimeValuesInfo sjInfo = parseContext.getRsToRuntimeValuesInfoMap().get(sjBranch);
+      assert sjInfo.getTargetColumns().size() == 1;
+      targetTableExpressions.add(sjInfo.getTargetColumns().get(0));
+    }
+    info.setTargetColumns(targetTableExpressions);
+    return info;
+  }
+
+  private static SelectOperator mergeSelectOps(Operator<?> parent, List<SelectOperator> selectOperators) {
+    List<String> colNames = new ArrayList<>();
+    List<ExprNodeDesc> colDescs = new ArrayList<>();
+    List<ColumnInfo> columnInfos = new ArrayList<>();
+    Map<String, ExprNodeDesc> selectColumnExprMap = new HashMap<>();
+    for (SelectOperator sel : selectOperators) {
+      for (ExprNodeDesc col : sel.getConf().getColList()) {
+        String colName = HiveConf.getColumnInternalName(colDescs.size());
+        colNames.add(colName);
+        columnInfos.add(new ColumnInfo(colName, col.getTypeInfo(), "", false));
+        colDescs.add(col);
+        selectColumnExprMap.put(colName, col);
+      }
+    }
+    ExprNodeDesc hashExp =
+        new ExprNodeGenericFuncDesc(TypeInfoFactory.intTypeInfo, new GenericUDFMurmurHash(), "hash", colDescs);
+    String hashName = HiveConf.getColumnInternalName(colDescs.size() + 1);
+    colNames.add(hashName);
+    columnInfos.add(new ColumnInfo(hashName, hashExp.getTypeInfo(), "", false));
+
+    List<ExprNodeDesc> selDescs = new ArrayList<>(colDescs);
+    selDescs.add(hashExp);
+
+    SelectDesc select = new SelectDesc(selDescs, colNames);
+    SelectOperator selectOp =
+        (SelectOperator) OperatorFactory.getAndMakeChild(select, new RowSchema(columnInfos), parent);
+    selectOp.setColumnExprMap(selectColumnExprMap);
+    return selectOp;
+  }
+
+  private static ReduceSinkOperator createReduceSink(Operator<?> parentOp, NullOrdering nullOrder)
+      throws SemanticException {
+    List<ExprNodeDesc> valueCols = new ArrayList<>();
+    RowSchema parentSchema = parentOp.getSchema();
+    List<String> outColNames = new ArrayList<>();
+    for (int i = 0; i < parentSchema.getSignature().size(); i++) {
+      ColumnInfo colInfo = parentSchema.getSignature().get(i);
+      ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName(), "", false);
+      valueCols.add(colExpr);
+      outColNames.add(SemanticAnalyzer.getColumnInternalName(i));
+    }
+
+    ReduceSinkDesc rsDesc = PlanUtils
+        .getReduceSinkDesc(Collections.emptyList(), valueCols, outColNames, false, -1, 0, 1,
+            AcidUtils.Operation.NOT_ACID, nullOrder);
+    rsDesc.setColumnExprMap(Collections.emptyMap());
+    return (ReduceSinkOperator) OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(parentSchema), parentOp);
+  }
+
+  private static GroupByOperator createGroupBy(SelectOperator selectOp, Operator<?> parentOp, GroupByDesc.Mode gbMode,
+      HiveConf hiveConf) {
+
+    final List<ExprNodeDesc> params;
+    final GenericUDAFEvaluator.Mode udafMode = SemanticAnalyzer.groupByDescModeToUDAFMode(gbMode, false);
+    switch (gbMode) {
+    case FINAL:
+      params = createGroupByAggregationParameters((ReduceSinkOperator) parentOp);
+      break;
+    case HASH:
+      params = createGroupByAggregationParameters(selectOp);
+      break;
+    default:
+      throw new AssertionError(gbMode.toString() + " is not supported");
+    }
+
+    List<AggregationDesc> gbAggs = new ArrayList<>();
+    Deque<ExprNodeDesc> paramsCopy = new ArrayDeque<>(params);
+    while (paramsCopy.size() > 1) {
+      gbAggs.add(minAggregation(udafMode, paramsCopy.poll()));
+      gbAggs.add(maxAggregation(udafMode, paramsCopy.poll()));
+    }
+    gbAggs.add(bloomFilterAggregation(udafMode, paramsCopy.poll(), selectOp, hiveConf));
+    assert paramsCopy.size() == 0;
+
+    List<String> gbOutputNames = new ArrayList<>(gbAggs.size());
+    List<ColumnInfo> gbColInfos = new ArrayList<>(gbAggs.size());
+    for (int i = 0; i < params.size(); i++) {
+      String colName = HiveConf.getColumnInternalName(i);
+      gbOutputNames.add(colName);
+      final TypeInfo colType;
+      if (i == params.size() - 1) {
+        colType = TypeInfoFactory.binaryTypeInfo; // Bloom type
+      } else {
+        colType = params.get(i).getTypeInfo(); // Min/Max type
+      }
+      gbColInfos.add(new ColumnInfo(colName, colType, "", false));
+    }
+
+    float groupByMemoryUsage = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+    float memoryThreshold = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+    float minReductionHashAggr = HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVEMAPAGGRHASHMINREDUCTION);
+    GroupByDesc groupBy =
+        new GroupByDesc(gbMode, gbOutputNames, Collections.emptyList(), gbAggs, false, groupByMemoryUsage,
+            memoryThreshold, minReductionHashAggr, null, false, -1, false);
+    groupBy.setColumnExprMap(Collections.emptyMap());
+    return (GroupByOperator) OperatorFactory.getAndMakeChild(groupBy, new RowSchema(gbColInfos), parentOp);
+  }
+
+  private static List<ExprNodeDesc> createGroupByAggregationParameters(SelectOperator selectOp) {
+    List<ExprNodeDesc> params = new ArrayList<>();
+    // The first n-1 cols are used as parameters for min & max so we need two expressions
+    for (ColumnInfo c : selectOp.getSchema().getSignature()) {
+      String name = c.getInternalName();
+      ExprNodeColumnDesc p = new ExprNodeColumnDesc(new ColumnInfo(name, c.getType(), "", false));
+      params.add(p);
+      params.add(p);
+    }
+    // The last col is used as parameter for bloom so we need only one expression
+    params.remove(params.size() - 1);
+    return params;
+  }
+
+  private static List<ExprNodeDesc> createGroupByAggregationParameters(ReduceSinkOperator reduceOp) {
+    List<ExprNodeDesc> params = new ArrayList<>();
+    // There is a 1-1 mapping between columns and parameters for the aggregation functions min, max, bloom
+    for (ColumnInfo c : reduceOp.getSchema().getSignature()) {
+      String name = Utilities.ReduceField.VALUE + "." + c.getInternalName();
+      params.add(new ExprNodeColumnDesc(new ColumnInfo(name, c.getType(), "", false)));
+    }
+    return params;
+  }
+
+  private static ExprNodeGenericFuncDesc and(List<ExprNodeDesc> args) {
+    return new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, new GenericUDFOPAnd(), "and", args);
+  }
+
+  private static AggregationDesc minAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col) {
+    List<ExprNodeDesc> p = Collections.singletonList(col);
+    return new AggregationDesc("min", new GenericUDAFMin.GenericUDAFMinEvaluator(), p, false, mode);
+  }
+
+  private static AggregationDesc maxAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col) {
+    List<ExprNodeDesc> p = Collections.singletonList(col);
+    return new AggregationDesc("max", new GenericUDAFMax.GenericUDAFMaxEvaluator(), p, false, mode);
+  }
+
+  private static AggregationDesc bloomFilterAggregation(GenericUDAFEvaluator.Mode mode, ExprNodeDesc col,
+      SelectOperator source, HiveConf conf) {
+    GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator bloomFilterEval =
+        new GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator();
+    bloomFilterEval.setSourceOperator(source);
+    bloomFilterEval.setMaxEntries(conf.getLongVar(HiveConf.ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
+    bloomFilterEval.setMinEntries(conf.getLongVar(HiveConf.ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
+    bloomFilterEval.setFactor(conf.getFloatVar(HiveConf.ConfVars.TEZ_BLOOM_FILTER_FACTOR));
+    // TODO Setup hints
+    List<ExprNodeDesc> p = Collections.singletonList(col);
+    AggregationDesc bloom = new AggregationDesc("bloom_filter", bloomFilterEval, p, false, mode);
+    // TODO Why do we need to set it explicitly?
+    bloom.setGenericUDAFWritableEvaluator(bloomFilterEval);
+    return bloom;
+  }
+
+  private static final class SJSourceTarget {
+    private final Operator<?> source;
+    private final TableScanOperator target;
+
+    public SJSourceTarget(Operator<?> source, TableScanOperator target) {
+      this.source = source;
+      this.target = target;
+    }
+
+    @Override public boolean equals(Object o) {
+      if (this == o)
+        return true;
+      if (o == null || getClass() != o.getClass())
+        return false;
+
+      SJSourceTarget that = (SJSourceTarget) o;
+
+      if (!source.equals(that.source))
+        return false;
+      return target.equals(that.target);
+    }
+
+    @Override public int hashCode() {
+      int result = source.hashCode();

Review comment:
       `Objects.hash(source, target);`

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
##########
@@ -1737,35 +1737,46 @@ private static double getBloomFilterBenefit(
       }
     }
 
-    // Selectivity: key cardinality of semijoin / domain cardinality
-    // Benefit (rows filtered from ts): (1 - selectivity) * # ts rows
-    double selectivity = selKeyCardinality / (double) keyDomainCardinality;
-    selectivity = Math.min(selectivity, 1);
-    benefit = tsRows * (1 - selectivity);
-
     if (LOG.isDebugEnabled()) {
-      LOG.debug("BloomFilter benefit for " + selCol + " to " + tsCol
-          + ", selKeyCardinality=" + selKeyCardinality
-          + ", tsKeyCardinality=" + tsKeyCardinality
-          + ", tsRows=" + tsRows
-          + ", keyDomainCardinality=" + keyDomainCardinality);
-      LOG.debug("SemiJoin key selectivity=" + selectivity
-          + ", benefit=" + benefit);
+      LOG.debug("BloomFilter selectivity for " + selCol + " to " + tsCol + ", selKeyCardinality=" + selKeyCardinality
+          + ", tsKeyCardinality=" + tsKeyCardinality + ", keyDomainCardinality=" + keyDomainCardinality);
     }
+    // Selectivity: key cardinality of semijoin / domain cardinality
+    return selKeyCardinality / (double) keyDomainCardinality;
+  }
 
-    return benefit;
+  private static double getBloomFilterBenefit(
+      SelectOperator sel, List<ExprNodeDesc> selExpr,
+      Statistics filStats, List<ExprNodeDesc> tsExpr) {
+    if (sel.getStatistics() == null || filStats == null) {
+      LOG.debug("No stats available to compute BloomFilter benefit");
+      return -1;
+    }
+    double selectivity = 0.0;
+    for (int i = 0; i < tsExpr.size(); i++) {
+      selectivity = Math.max(selectivity, getBloomFilterSelectivity(sel, selExpr.get(i), filStats, tsExpr.get(i)));

Review comment:
       Shouldn't this be the `min` (selectivity initialized to `1`)? The existing code was overly complex with the dichotomy between selectivity and benefit. However, I think we are trying to get the lowest selectivity so we can obtain the highest benefit? Or am I getting confused?

##########
File path: ql/src/test/queries/clientpositive/dynamic_semijoin_reduction_multicol.q
##########
@@ -0,0 +1,64 @@
+--! qt:dataset:tpch_0_001.lineitem
+--! qt:dataset:tpch_0_001.partsupp
+use tpch_0_001;
+-- The test is meant to verify the plan, results, and Tez execution stats/counters
+-- of the same query in three cases:
+--   case 1: no semi-join reducers
+--   case 2: one single column semi-join reducer
+--   case 3: one multi column semi-join reducer
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.tez.bigtable.minsize.semijoin.reduction=6000;
+-- Use TezSummaryPrinter hook to verify the impact of bloom filters by

Review comment:
       Neat!

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
##########
@@ -1887,13 +1898,14 @@ private void removeSemijoinOptimizationByBenefit(OptimizeTezProcContext procCtx)
         // Check the ndv/rows from the SEL vs the destination tablescan the semijoin opt is going to.
         TableScanOperator ts = sjInfo.getTsOp();
         RuntimeValuesInfo rti = procCtx.parseContext.getRsToRuntimeValuesInfoMap().get(rs);
-        ExprNodeDesc tsExpr = rti.getTsColExpr();
-        // In the SEL operator of the semijoin branch, there should be only one column in the operator
-        ExprNodeDesc selExpr = sel.getConf().getColList().get(0);
+        List<ExprNodeDesc> targetColumns = rti.getTargetColumns();
+        // In multi column semijoin branches the last column of the SEL operator is hash(c1, c2, ..., cn)
+        // so we shouldn't consider it.
+        List<ExprNodeDesc> sourceColumns = sel.getConf().getColList().subList(0, targetColumns.size());

Review comment:
       This does not seem to distinguish between single column and multi column. Could we clarify in the comment?

##########
File path: ql/src/test/results/clientpositive/llap/dynamic_semijoin_reduction_2.q.out
##########
@@ -79,27 +79,25 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:

Review comment:
       Interesting: New plan seems more compact (less stages), which is good. I am wondering if this has to do with Shared Work Optimizer finding further optimization opportunities? Or maybe something else? Have you checked?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
##########
@@ -2054,7 +2067,8 @@ private void markSemiJoinForDPP(OptimizeTezProcContext procCtx)
               // Lookup nDVs on TS side.
               RuntimeValuesInfo rti = procCtx.parseContext
                       .getRsToRuntimeValuesInfoMap().get(rs);
-              ExprNodeDesc tsExpr = rti.getTsColExpr();
+              // TODO Adapt for multi column semi-joins.

Review comment:
       TODO?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org