You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/07/08 14:23:45 UTC

[GitHub] [hive] zeroflag opened a new pull request #1228: HIVE-23817. Pushing TopN Key operator PKFK inner joins (amagyar)

zeroflag opened a new pull request #1228:
URL: https://github.com/apache/hive/pull/1228


   ## NOTICE (work in progress)
   
   ### Pushing the TopNKey operator through PK-FK inner joins.
   
   Example: 
   
   Customer table:
   
   ID (PK) | LAST_NAME
   -- | --
   1 | Robinson
   2 | Jones
   3 | Smith
   4 | Heisenberg
   
   Order table:
   
   CUSTOMER_ID (FK) | AMOUNT
   -- | --
   1 | 100
   1 | 50
   2 | 200
   3 | 30
   3 | 40
   
   #### Requirements for doing TopN Key pushdown.
   
   * The PRIMARY KEY constraint on Customer.ID that forbids NULL and duplicate values.
   * The NOT_NULL constraint on Order.CUSTOMER_ID that forbids NULL values.
   * Plus the FOREIGN KEY constraint between Customer.ID and Order.CUSTOMER_ID ensures that exactly one row exists in the Customer table for any given row in the Order table.
   
   In general if the first n of the order by columns are coming from the child table (FK) then we can copy the TopNKey operator with the first n columns and put it before the join. If all columns are coming from the child table we can move the TopNKey operator without keeping the original.
   
   ```
   SELECT * FROM Customer, Order 
   WHERE Customer.ID = Order.CUSTOMER_ID 
   ORDER BY Order.AMOUNT, [Order.*], [Customer.*] LIMIT 3;
   ```
   
   Result:
   
   CUSTOMER.ID (PK) | CUSTOMER.LAST_NAME | ORDER.AMOUNT
   -- | -- | --
   3 | Smith | 30
   3 | Smith | 40
   1 | Robinson | 50
   1 | Robinson | 100
   2 | Jones | 200
   
   Plan
   
   ```
   Top N Key Operator
             sort order: +
             keys: ORDER.AMOUNT, [ORDER.*]
             top n: 3
     Select Operator (Order)
     [...]
     Join
       [...]
        Top N Key Operator
             sort order: +
             keys: ORDER.AMOUNT, [ORDER.*], [Customer.*]
             top n: 3
    
   ```
   
   #### Implementation notes
   
   PkFk join information is extracted on the calcite side and it is attached (child table index & name) to the AST as a query hint.
   At the physical plan level we make use of this information to decide if we can push through the topn key operator. We also need to get the origins of the columns (in the order by) to see if they're coming from the child table.
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] kasakrisz commented on a change in pull request #1228: HIVE-23817. Pushing TopN Key operator PKFK inner joins (amagyar)

Posted by GitBox <gi...@apache.org>.
kasakrisz commented on a change in pull request #1228:
URL: https://github.com/apache/hive/pull/1228#discussion_r458020139



##########
File path: ql/src/test/queries/clientpositive/topnkey_inner_join.q
##########
@@ -0,0 +1,49 @@
+drop table if exists customer;
+drop table if exists orders;
+
+create table customer (id int, name string, email string);
+create table orders (customer_id int not null enforced, amount int);
+
+alter table customer add constraint pk_customer_id primary key (id) disable novalidate rely;
+alter table orders add constraint fk_order_customer_id foreign key (customer_id) references customer(id) disable novalidate rely;
+
+insert into customer values
+  (4, 'Heisenberg', 'heisenberg@email.com'),
+  (3, 'Smith', 'smith@email.com'),
+  (2, 'Jones', 'jones@email.com'),
+  (1, 'Robinson', 'robinson@email.com');
+
+insert into orders values
+  (2, 200),
+  (3, 40),
+  (1, 100),
+  (1, 50),
+  (3, 30);
+
+set hive.optimize.topnkey=true;
+set hive.optimize.limittranspose=false;
+
+select 'positive: order by columns are coming from child table';
+explain select * from orders join customer on customer.id = orders.customer_id order by orders.amount limit 3;
+explain select * from orders join customer on customer.id = orders.customer_id order by orders.customer_id, orders.amount limit 3;
+explain select * from customer join orders on orders.customer_id = customer.id order by orders.amount, orders.customer_id limit 3;
+
+select 'negative: order by columns are coming from referenced table';
+explain select * from orders join customer on customer.id = orders.customer_id order by customer.name limit 3;
+explain select * from orders join customer on customer.id = orders.customer_id order by customer.email, customer.name limit 3;
+
+select 'negative: 1st order by columns is coming from referenced table';
+explain select * from orders join customer on customer.id = orders.customer_id order by customer.name, orders.amount limit 3;
+
+select 'mixed/positive: 1st n order by columns are coming from child table';
+explain select * from orders join customer on customer.id = orders.customer_id order by orders.amount, customer.name limit 3;
+
+select 'negative: no PK/FK';
+alter table customer drop constraint pk_customer_id;
+alter table orders drop constraint fk_order_customer_id;
+explain select * from customer join orders on customer.id = orders.customer_id order by customer.id limit 3;
+
+select 'negatie: no RELY';
+alter table customer add constraint pk_customer_id primary key (id) disable novalidate;
+alter table orders add constraint fk_order_customer_id foreign key (customer_id) references customer(id) disable novalidate;
+explain select * from customer join orders on customer.id = orders.customer_id order by customer.id limit 3;

Review comment:
       It is also worth to execute these queries to catch correctness issues




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] kasakrisz merged pull request #1228: HIVE-23817. Pushing TopN Key operator PKFK inner joins (amagyar)

Posted by GitBox <gi...@apache.org>.
kasakrisz merged pull request #1228:
URL: https://github.com/apache/hive/pull/1228


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] jcamachor commented on a change in pull request #1228: HIVE-23817. Pushing TopN Key operator PKFK inner joins (amagyar)

Posted by GitBox <gi...@apache.org>.
jcamachor commented on a change in pull request #1228:
URL: https://github.com/apache/hive/pull/1228#discussion_r457824052



##########
File path: parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
##########
@@ -471,6 +471,8 @@ Number
     (Digit)+ ( DOT (Digit)* (Exponent)? | Exponent)?
     ;
 
+PKFK_JOIN: 'PKFK_JOIN';

Review comment:
       Can we prefix with `KW_` and move above with rest of keywords?

##########
File path: ql/src/test/queries/clientpositive/topnkey_inner_join.q
##########
@@ -0,0 +1,50 @@
+drop table if exists customer;
+drop table if exists orders;
+
+create table customer (id int, name string, email string);
+create table orders (customer_id int not null enforced, amount int);
+
+alter table customer add constraint pk_customer_id primary key (id) disable novalidate rely;
+alter table orders add constraint fk_order_customer_id foreign key (customer_id) references customer(id) disable novalidate rely;
+
+insert into customer values
+  (4, 'Heisenberg', 'heisenberg@email.com'),
+  (3, 'Smith', 'smith@email.com'),
+  (2, 'Jones', 'jones@email.com'),
+  (1, 'Robinson', 'robinson@email.com');
+
+insert into orders values
+  (2, 200),
+  (3, 40),
+  (1, 100),
+  (1, 50),
+  (3, 30);
+
+set hive.optimize.topnkey=true;
+set hive.optimize.limittranspose=false;
+
+select 'positive: order by columns are coming from child table';
+-- FIXME: explain select * from customer join orders on customer.id = orders.customer_id order by customer.id limit 3;

Review comment:
       I see this example is below. Can FIXME be removed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java
##########
@@ -442,6 +450,40 @@ private QueryBlockInfo convertSource(RelNode r) throws CalciteSemanticException
     return new QueryBlockInfo(s, ast);
   }
 
+  /**
+   * Add PK-FK join information to the AST as a query hint
+   * @param ast
+   * @param join
+   * @param swapSides whether the left and right input of the join is swapped
+   */
+  private void addPkFkInfoToAST(ASTNode ast, Join join, boolean swapSides) {
+    List<RexNode> joinFilters = new ArrayList<>(RelOptUtil.conjunctions(join.getCondition()));
+    RelMetadataQuery mq = join.getCluster().getMetadataQuery();
+    HiveRelOptUtil.PKFKJoinInfo rightInputResult =
+            HiveRelOptUtil.extractPKFKJoin(join, joinFilters, false, mq);
+    HiveRelOptUtil.PKFKJoinInfo leftInputResult =
+            HiveRelOptUtil.extractPKFKJoin(join, joinFilters, true, mq);
+    // Add the fkJoinIndex (0=left, 1=right, if swapSides is false) to the AST
+    // check if the nonFK side is filtered
+    if (leftInputResult.isPkFkJoin && leftInputResult.additionalPredicates.isEmpty()) {
+      RelNode nonFkInput = join.getRight();
+      ast.addChild(pkFkHint(swapSides ? 1 : 0, HiveRelOptUtil.isRowFilteringPlan(mq, nonFkInput)));
+    } else if (rightInputResult.isPkFkJoin && rightInputResult.additionalPredicates.isEmpty()) {
+      RelNode nonFkInput = join.getLeft();
+      ast.addChild(pkFkHint(swapSides ? 0 : 1, HiveRelOptUtil.isRowFilteringPlan(mq, nonFkInput)));
+    }
+  }
+
+  private ASTNode pkFkHint(int fkTableIndex, boolean nonFkSideIsFiltered) {
+    ParseDriver parseDriver = new ParseDriver();
+    try {
+      return parseDriver.parseHint(String.format("PKFK_JOIN(%d, %s)",
+              fkTableIndex, nonFkSideIsFiltered ? NON_FK_FILTERED : "notFiltered"));

Review comment:
       Naming (NON_FK_FILTERED : "notFiltered") is a bit confusing. We can simplify to NON_FK_FILTERED vs NON_FK_NOT_FILTERED? Create String in converter for both (or enum).

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyPushdownProcessor.java
##########
@@ -255,6 +260,88 @@ private void pushdownThroughLeftOuterJoin(TopNKeyOperator topNKey) throws Semant
     }
   }
 
+  private void pushdownInnerJoin(TopNKeyOperator topNKey, int fkJoinInputIndex, boolean nonFkSideIsFiltered) throws SemanticException {
+    TopNKeyDesc topNKeyDesc = topNKey.getConf();
+    CommonJoinOperator<? extends JoinDesc> join =
+            (CommonJoinOperator<? extends JoinDesc>) topNKey.getParentOperators().get(0);
+    List<Operator<? extends OperatorDesc>> joinInputs = join.getParentOperators();
+    ReduceSinkOperator fkJoinInput = (ReduceSinkOperator) joinInputs.get(fkJoinInputIndex);
+    if (nonFkSideIsFiltered) {
+      LOG.debug("Not pushing {} through {} as non FK side of the join is filtered", topNKey.getName(), join.getName());
+      return;
+    }
+    // Check column origins:
+    //  1. If all OrderBy columns are coming from the child (FK) table:
+    //    -> move TopNKeyOperator
+    //  2. If the first n OrderBy columns are coming from the child (FK) table:
+    //    -> copy TopNKeyOperator with the first n columns, and leave the original in place
+    int prefixLength = keyColumnPrefixLength(join, topNKey, fkJoinInputIndex, topNKey.getConf().getKeyColumns());
+    if (prefixLength == 0) {
+      LOG.debug("Not pushing {} through {} as common key column prefix length is 0", topNKey.getName(), join.getName());
+      return;
+    }
+    LOG.debug("Pushing a copy of {} through {} and {}",
+            topNKey.getName(), join.getName(), fkJoinInput.getName());
+    TopNKeyDesc newTopNKeyDesc = topNKeyDesc.withKeyColumns(prefixLength);
+    newTopNKeyDesc.setKeyColumns(remapColumns(join, fkJoinInput, newTopNKeyDesc.getKeyColumns()));
+    pushdown(copyDown(fkJoinInput, newTopNKeyDesc));
+    if (topNKeyDesc.getKeyColumns().size() == prefixLength) {
+      LOG.debug("Removing {} above {}", topNKey.getName(), join.getName());
+      join.removeChildAndAdoptItsChildren(topNKey);
+    }
+  }
+
+  private int keyColumnPrefixLength(CommonJoinOperator<? extends JoinDesc> join, TopNKeyOperator topNKeyOperator, int expectedTag, List<ExprNodeDesc> keyColumns) {

Review comment:
       nit. Some lines are very long. Please split in multiple lines.
   
   Additionally, can we some comments to these utility methods?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyPushdownProcessor.java
##########
@@ -255,6 +260,88 @@ private void pushdownThroughLeftOuterJoin(TopNKeyOperator topNKey) throws Semant
     }
   }
 
+  private void pushdownInnerJoin(TopNKeyOperator topNKey, int fkJoinInputIndex, boolean nonFkSideIsFiltered) throws SemanticException {

Review comment:
       Can we add a comment to this method? In which cases we can pushdown through inner join?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org