You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/08/03 23:03:00 UTC

[jira] [Work logged] (HIVE-23716) Support Anti Join in Hive

     [ https://issues.apache.org/jira/browse/HIVE-23716?focusedWorklogId=465942&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-465942 ]

ASF GitHub Bot logged work on HIVE-23716:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Aug/20 23:02
            Start Date: 03/Aug/20 23:02
    Worklog Time Spent: 10m 
      Work Description: jcamachor commented on a change in pull request #1147:
URL: https://github.com/apache/hive/pull/1147#discussion_r464673502



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOptUtil.java
##########
@@ -747,6 +747,8 @@ public static RewritablePKFKJoinInfo isRewritablePKFKJoin(Join join,
     final RelNode nonFkInput = leftInputPotentialFK ? join.getRight() : join.getLeft();
     final RewritablePKFKJoinInfo nonRewritable = RewritablePKFKJoinInfo.of(false, null);
 
+    // TODO : Need to handle Anti join.

Review comment:
       Thanks for creating HIVE-23906. Can we simply return `nonRewritable` if it is an anti-join for the time being, rather than proceeding? This certainly requires a bit of extra thinking and specific tests to make sure it is working as expected (for which we already have HIVE-23906).

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinConstraintsRule.java
##########
@@ -183,6 +189,7 @@ public void onMatch(RelOptRuleCall call) {
     switch (joinType) {
     case SEMI:
     case INNER:
+    case ANTI:

Review comment:
       This should be removed to avoid confusion, since we bail out above.

##########
File path: ql/src/test/queries/clientpositive/subquery_in_having.q
##########
@@ -140,6 +140,22 @@ CREATE TABLE src_null_n4 (key STRING COMMENT 'default', value STRING COMMENT 'de
 LOAD DATA LOCAL INPATH "../../data/files/kv1.txt" INTO TABLE src_null_n4;
 INSERT INTO src_null_n4 values('5444', null);
 
+explain
+select key, value, count(*)

Review comment:
       Should we execute this query with conversion=true?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java
##########
@@ -1233,4 +1233,21 @@ public FixNullabilityShuttle(RexBuilder rexBuilder,
     }
   }
 
+  // Checks if any of the expression given as list expressions are from right side of the join.

Review comment:
       nit. Change comment to javadoc

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveAntiSemiJoinRule.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAntiJoin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Planner rule that converts a join plus filter to anti join.
+ */
+public class HiveAntiSemiJoinRule extends RelOptRule {
+  protected static final Logger LOG = LoggerFactory.getLogger(HiveAntiSemiJoinRule.class);
+  public static final HiveAntiSemiJoinRule INSTANCE = new HiveAntiSemiJoinRule();
+
+  //    HiveProject(fld=[$0])
+  //      HiveFilter(condition=[IS NULL($1)])
+  //        HiveJoin(condition=[=($0, $1)], joinType=[left], algorithm=[none], cost=[not available])
+  //
+  // TO
+  //
+  //    HiveProject(fld_tbl=[$0])
+  //      HiveAntiJoin(condition=[=($0, $1)], joinType=[anti])
+  //
+  public HiveAntiSemiJoinRule() {
+    super(operand(Project.class, operand(Filter.class, operand(Join.class, RelOptRule.any()))),
+            "HiveJoinWithFilterToAntiJoinRule:filter");
+  }
+
+  // is null filter over a left join.
+  public void onMatch(final RelOptRuleCall call) {
+    final Project project = call.rel(0);
+    final Filter filter = call.rel(1);
+    final Join join = call.rel(2);
+    perform(call, project, filter, join);
+  }
+
+  protected void perform(RelOptRuleCall call, Project project, Filter filter, Join join) {
+    LOG.debug("Start Matching HiveAntiJoinRule");
+
+    //TODO : Need to support this scenario.
+    if (join.getCondition().isAlwaysTrue()) {
+      return;
+    }
+
+    //We support conversion from left outer join only.
+    if (join.getJoinType() != JoinRelType.LEFT) {
+      return;
+    }
+
+    assert (filter != null);
+
+    // If null filter is not present from right side then we can not convert to anti join.
+    List<RexNode> aboveFilters = RelOptUtil.conjunctions(filter.getCondition());
+    Stream<RexNode> nullFilters = aboveFilters.stream().filter(filterNode -> filterNode.getKind() == SqlKind.IS_NULL);
+    boolean hasNullFilter = HiveCalciteUtil.hasAnyExpressionFromRightSide(join, nullFilters.collect(Collectors.toList()));
+    if (!hasNullFilter) {
+      return;
+    }
+
+    // If any projection is there from right side, then we can not convert to anti join.
+    boolean hasProjection = HiveCalciteUtil.hasAnyExpressionFromRightSide(join, project.getProjects());
+    if (hasProjection) {
+      return;
+    }
+
+    LOG.debug("Matched HiveAntiJoinRule");
+
+    // Build anti join with same left, right child and condition as original left outer join.
+    Join anti = HiveAntiJoin.getAntiJoin(join.getLeft().getCluster(), join.getLeft().getTraitSet(),
+            join.getLeft(), join.getRight(), join.getCondition());
+    RelNode newProject = project.copy(project.getTraitSet(), anti, project.getProjects(), project.getRowType());
+    call.transformTo(newProject);

Review comment:
       It seems `aboveFilters` are ignored but we should not. For instance, condition is `AND(func(x,y), z IS NULL)`, where `x` comes from the left input, and `y` and `z` from the right input. You would still need to apply first conjunct for correctness? Could we try to add such test too?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOptUtil.java
##########
@@ -854,7 +856,7 @@ public static RewritablePKFKJoinInfo isRewritablePKFKJoin(Join join,
             if (ecT.getEquivalenceClassesMap().containsKey(uniqueKeyColumnRef) &&
                 ecT.getEquivalenceClassesMap().get(uniqueKeyColumnRef).contains(foreignKeyColumnRef)) {
               if (foreignKeyColumnType.isNullable()) {
-                if (joinType == JoinRelType.INNER || join.isSemiJoin()) {
+                if (joinType == JoinRelType.INNER || join.isSemiJoin() || joinType == JoinRelType.ANTI) {

Review comment:
       Can we remove this change? We will address this in HIVE-23906.

##########
File path: ql/src/test/results/clientpositive/llap/subquery_notexists_having.q.out
##########
@@ -31,7 +31,8 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)

Review comment:
       This is resulting in an additional task. Is this expected?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
##########
@@ -2129,6 +2133,16 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv
             HiveRemoveSqCountCheck.INSTANCE);
       }
 
+      // 10. Convert left outer join + null filter on right side table column to anti join. Add this
+      // rule after all the optimization for which calcite support for anti join is missing.
+      // Needs to be done before ProjectRemoveRule as it expect a project over filter.
+      // This is done before join re-ordering as join re-ordering is converting the left outer

Review comment:
       This is concerning. In fact, I believe we should rather apply this rule after join reordering, since there is no support built-in for ANTIJOIN in the reordering algorithm, while there is support for outer join... which could lead to worse join order.
   If the problem is that the inputs are being swapped, we can add a variant of `HiveJoinCommuteRule` / `JoinCommuteRule` to this program that is applied on RIGHT OUTER joins.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java
##########
@@ -92,7 +104,7 @@ public void onMatch(RelOptRuleCall call) {
     Set<String> rightPushedPredicates = Sets.newHashSet(registry.getPushedPredicates(join, 1));
 
     boolean genPredOnLeft = join.getJoinType() == JoinRelType.RIGHT || join.getJoinType() == JoinRelType.INNER || join.isSemiJoin();
-    boolean genPredOnRight = join.getJoinType() == JoinRelType.LEFT || join.getJoinType() == JoinRelType.INNER || join.isSemiJoin();
+    boolean genPredOnRight = join.getJoinType() == JoinRelType.LEFT || join.getJoinType() == JoinRelType.INNER || join.isSemiJoin()|| join.getJoinType() == JoinRelType.ANTI;

Review comment:
       I did not check the physical implementation in much detail: Just wanted to confirm that you are considering there the case for empty (0 rows) right input, correct? It came to mind as the introduced filter could filter all rows.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java
##########
@@ -203,6 +203,7 @@ private boolean filterExists(ReduceSinkOperator target, ExprNodeDesc replaced) {
           vector.add(right, left);
           break;
         case JoinDesc.LEFT_OUTER_JOIN:
+        case JoinDesc.ANTI_JOIN: //TODO : need to test

Review comment:
       test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 465942)
    Time Spent: 13h 50m  (was: 13h 40m)

> Support Anti Join in Hive 
> --------------------------
>
>                 Key: HIVE-23716
>                 URL: https://issues.apache.org/jira/browse/HIVE-23716
>             Project: Hive
>          Issue Type: Bug
>            Reporter: mahesh kumar behera
>            Assignee: mahesh kumar behera
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-23716.01.patch
>
>          Time Spent: 13h 50m
>  Remaining Estimate: 0h
>
> Currently hive does not support Anti join. The query for anti join is converted to left outer join and null filter on right side join key is added to get the desired result. This is causing
>  # Extra computation — The left outer join projects the redundant columns from right side. Along with that, filtering is done to remove the redundant rows. This is can be avoided in case of anti join as anti join will project only the required columns and rows from the left side table.
>  # Extra shuffle — In case of anti join the duplicate records moved to join node can be avoided from the child node. This can reduce significant amount of data movement if the number of distinct rows( join keys) is significant.
>  # Extra Memory Usage - In case of map based anti join , hash set is sufficient as just the key is required to check  if the records matches the join condition. In case of left join, we need the key and the non key columns also and thus a hash table will be required.
> For a query like
> {code:java}
>  select wr_order_number FROM web_returns LEFT JOIN web_sales  ON wr_order_number = ws_order_number WHERE ws_order_number IS NULL;{code}
> The number of distinct ws_order_number in web_sales table in a typical 10TB TPCDS set up is just 10% of total records. So when we convert this query to anti join, instead of 7 billion rows, only 600 million rows are moved to join node.
> In the current patch, just one conversion is done. The pattern of project->filter->left-join is converted to project->anti-join. This will take care of sub queries with “not exists” clause. The queries with “not exists” are converted first to filter + left-join and then its converted to anti join. The queries with “not in” are not handled in the current patch.
> From execution side, both merge join and map join with vectorized execution  is supported for anti join.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)