You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2017/04/20 17:18:54 UTC

[3/5] hive git commit: HIVE-16423: Add hint to enforce semi join optimization (Deepak Jaiswal, reviewed by Jason Dere)

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index d58f447..83e89af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -266,11 +266,14 @@ public class GenTezUtils {
             }
           }
           // This TableScanOperator could be part of semijoin optimization.
-          Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap =
-                  context.parseContext.getRsOpToTsOpMap();
-          for (ReduceSinkOperator rs : rsOpToTsOpMap.keySet()) {
-            if (rsOpToTsOpMap.get(rs) == orig) {
-              rsOpToTsOpMap.put(rs, (TableScanOperator) newRoot);
+          Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo =
+                  context.parseContext.getRsToSemiJoinBranchInfo();
+          for (ReduceSinkOperator rs : rsToSemiJoinBranchInfo.keySet()) {
+            SemiJoinBranchInfo sjInfo = rsToSemiJoinBranchInfo.get(rs);
+            if (sjInfo.getTsOp() == orig) {
+              SemiJoinBranchInfo newSJInfo = new SemiJoinBranchInfo(
+                      (TableScanOperator)newRoot, sjInfo.getIsHint());
+              rsToSemiJoinBranchInfo.put(rs, newSJInfo);
             }
           }
         }
@@ -516,19 +519,18 @@ public class GenTezUtils {
     return EdgeType.SIMPLE_EDGE;
   }
 
-  public static void processDynamicMinMaxPushDownOperator(
+  public static void processDynamicSemiJoinPushDownOperator(
           GenTezProcContext procCtx, RuntimeValuesInfo runtimeValuesInfo,
           ReduceSinkOperator rs)
           throws SemanticException {
-    TableScanOperator ts = procCtx.parseContext.getRsOpToTsOpMap().get(rs);
+    SemiJoinBranchInfo sjInfo = procCtx.parseContext.getRsToSemiJoinBranchInfo().get(rs);
 
     List<BaseWork> rsWorkList = procCtx.childToWorkMap.get(rs);
-    if (ts == null || rsWorkList == null) {
+    if (sjInfo == null || rsWorkList == null) {
       // This happens when the ReduceSink's edge has been removed by cycle
       // detection logic. Nothing to do here.
       return;
     }
-    LOG.debug("ResduceSink " + rs + " to TableScan " + ts);
 
     if (rsWorkList.size() != 1) {
       StringBuilder sb = new StringBuilder();
@@ -541,6 +543,9 @@ public class GenTezUtils {
       throw new SemanticException(rs + " belongs to multiple BaseWorks: " + sb.toString());
     }
 
+    TableScanOperator ts = sjInfo.getTsOp();
+    LOG.debug("ResduceSink " + rs + " to TableScan " + ts);
+
     BaseWork parentWork = rsWorkList.get(0);
     BaseWork childWork = procCtx.rootToWorkMap.get(ts);
 
@@ -611,7 +616,7 @@ public class GenTezUtils {
         skip = true;
       }
     }
-    context.getRsOpToTsOpMap().remove(rs);
+    context.getRsToSemiJoinBranchInfo().remove(rs);
   }
 
   private static class DynamicValuePredicateContext implements NodeProcessorCtx {

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g
index 8e70a46..e110fb3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g
@@ -31,6 +31,7 @@ tokens {
   TOK_MAPJOIN;
   TOK_STREAMTABLE;
   TOK_HINTARGLIST;
+  TOK_LEFTSEMIJOIN;
 }
 
 @header {
@@ -69,6 +70,7 @@ hintItem
 hintName
     :
     KW_MAPJOIN -> TOK_MAPJOIN
+    | KW_SEMI -> TOK_LEFTSEMIJOIN
     | KW_STREAMTABLE -> TOK_STREAMTABLE
     ;
 
@@ -80,4 +82,5 @@ hintArgs
 hintArgName
     :
     Identifier
+    | Number
     ;

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
index 3f9f76c..9a69f90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
@@ -33,17 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -126,11 +116,10 @@ public class ParseContext {
   private boolean needViewColumnAuthorization;
   private Set<FileSinkDesc> acidFileSinks = Collections.emptySet();
 
-  // Map to store mapping between reduce sink Operator and TS Operator for semijoin
-  private Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap =
-          new HashMap<ReduceSinkOperator, TableScanOperator>();
   private Map<ReduceSinkOperator, RuntimeValuesInfo> rsToRuntimeValuesInfo =
           new HashMap<ReduceSinkOperator, RuntimeValuesInfo>();
+  private Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo =
+          new HashMap<>();
 
   public ParseContext() {
   }
@@ -666,11 +655,11 @@ public class ParseContext {
     return rsToRuntimeValuesInfo;
   }
 
-  public void setRsOpToTsOpMap(Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap) {
-    this.rsOpToTsOpMap = rsOpToTsOpMap;
+  public void setRsToSemiJoinBranchInfo(Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo) {
+    this.rsToSemiJoinBranchInfo = rsToSemiJoinBranchInfo;
   }
 
-  public Map<ReduceSinkOperator, TableScanOperator> getRsOpToTsOpMap() {
-    return rsOpToTsOpMap;
+  public Map<ReduceSinkOperator, SemiJoinBranchInfo> getRsToSemiJoinBranchInfo() {
+    return rsToSemiJoinBranchInfo;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
index ec76fb7..bcef252 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.util.Arrays;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -82,6 +84,7 @@ public class QBJoinTree implements Serializable, Cloneable {
    * We then add a Filter Operator after the Join Operator for this QBJoinTree.
    */
   private final List<ASTNode> postJoinFilters;
+  private Map<String, SemiJoinHint> semiJoinHint;
 
   /**
    * constructor.
@@ -429,4 +432,17 @@ public class QBJoinTree implements Serializable, Cloneable {
 
     return cloned;
   }
+
+  public void setSemiJoinHint(Map<String, SemiJoinHint> semiJoinHint) {
+    this.semiJoinHint = semiJoinHint;
+  }
+
+  public Map<String, SemiJoinHint> getSemiJoinHint() {
+    return semiJoinHint;
+  }
+
+  @Override
+  public String toString() {
+    return "QBJoinTree [leftAlias=" + leftAlias + ", rightAliases=" + Arrays.toString(rightAliases) + ", leftAliases=" + Arrays.toString(leftAliases) + ", semiJoinHint=" + semiJoinHint + "]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index b5a5645..e4ca25b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8122,6 +8122,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     JoinDesc desc = new JoinDesc(exprMap, outputColumnNames,
         join.getNoOuterJoin(), joinCondns, filterMap, joinKeys);
+    desc.setSemiJoinHints(join.getSemiJoinHint());
     desc.setReversedExprs(reversedExprs);
     desc.setFilterMap(join.getFilterMap());
     // For outer joins, add filters that apply to more than one input
@@ -8669,6 +8670,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       parseStreamTables(joinTree, qb);
     }
 
+    if (qb.getParseInfo().getHints() != null) {
+      // TODO: do we need this for unique join?
+      joinTree.setSemiJoinHint(parseSemiJoinHint(qb.getParseInfo().getHints()));
+    }
     return joinTree;
   }
 
@@ -8967,6 +8972,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       if ((conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) == false) {
         parseStreamTables(joinTree, qb);
       }
+
+      joinTree.setSemiJoinHint(parseSemiJoinHint(qb.getParseInfo().getHints()));
     }
 
     return joinTree;
@@ -9014,6 +9021,62 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     joinTree.setStreamAliases(streamAliases);
   }
 
+  /** Parses semjoin hints in the query and returns the table names mapped to filter size, or -1 if not specified.
+   *  Hints can be in 3 formats
+   *  1. TableName, ColumnName, bloom filter entries
+   *  2. TableName, bloom filter entries, and
+   *  3. TableName, ColumnName
+   *  */
+  public Map<String, SemiJoinHint> parseSemiJoinHint(ASTNode hints) throws SemanticException {
+    if (hints == null) return null;
+    Map<String, SemiJoinHint> result = null;
+    for (Node hintNode : hints.getChildren()) {
+      ASTNode hint = (ASTNode) hintNode;
+      if (hint.getChild(0).getType() != HintParser.TOK_LEFTSEMIJOIN) continue;
+      if (result == null) {
+        result = new HashMap<>();
+      }
+      String alias = null;
+      String colName = null;
+      Tree args = hint.getChild(1);
+      for (int i = 0; i < args.getChildCount(); i++) {
+        // We can have table names, column names or sizes here (or incorrect hint if the user is so inclined).
+        String text = args.getChild(i).getText();
+        Integer number = null;
+        try {
+          number = Integer.parseInt(text);
+        } catch (NumberFormatException ex) { // Ignore.
+        }
+        if (number != null) {
+          if (alias == null) {
+            throw new SemanticException("Invalid semijoin hint - arg " + i + " ("
+                + text + ") is a number but the previous one is not an alias");
+          }
+          SemiJoinHint sjHint = new SemiJoinHint(alias, colName, number);
+          result.put(alias, sjHint);
+          alias = null;
+          colName = null;
+        } else {
+          if (alias == null) {
+            alias = text;
+          } else if (colName == null ){
+            colName = text;
+          } else {
+            // No bloom filter entries provided.
+            SemiJoinHint sjHint = new SemiJoinHint(alias, colName, null);
+            result.put(alias, sjHint);
+            alias = text;
+            colName = null;
+          }
+        }
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Semijoin hint parsed: " + result);
+    }
+    return result;
+  }
+
   /**
    * Merges node to target
    */