You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/29 18:36:00 UTC

[pinot] branch master updated: [multistage] support inequality JOIN (#9448)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3057712fdc [multistage] support inequality JOIN (#9448)
3057712fdc is described below

commit 3057712fdc468951d72be9d676c17b5a8f0af850
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Sep 29 11:35:53 2022 -0700

    [multistage] support inequality JOIN (#9448)
    
    * support inequality JOIN
    
    * also support pure inequality join
    
    * address diff comment and add a SEMI join test case for this as well
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../calcite/rel/rules/PinotFilterIntoJoinRule.java | 39 ----------------------
 .../rel/rules/PinotJoinExchangeNodeInsertRule.java | 18 ++++------
 .../calcite/rel/rules/PinotQueryRuleSets.java      |  4 +--
 .../query/planner/logical/RelToStageConverter.java |  7 ++--
 .../pinot/query/planner/logical/StagePlanner.java  |  6 ++--
 .../apache/pinot/query/planner/stage/JoinNode.java | 25 +++++++++-----
 .../runtime/executor/WorkerQueryExecutor.java      |  4 +--
 .../query/runtime/operator/HashJoinOperator.java   | 19 ++++++++---
 .../pinot/query/runtime/QueryRunnerTest.java       |  7 ++--
 9 files changed, 54 insertions(+), 75 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java
deleted file mode 100644
index b505d3cdca..0000000000
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.calcite.rel.rules;
-
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.sql.SqlKind;
-
-
-/**
- * Customized rule extends {@link FilterJoinRule.FilterIntoJoinRule}, since Pinot only support equality JOIN.
- */
-public class PinotFilterIntoJoinRule extends FilterJoinRule.FilterIntoJoinRule {
-  public static final PinotFilterIntoJoinRule INSTANCE = new PinotFilterIntoJoinRule();
-  protected PinotFilterIntoJoinRule() {
-    super(ImmutableFilterIntoJoinRuleConfig.of((join, joinType, exp) ->
-            exp.getKind() == SqlKind.AND || exp.getKind() == SqlKind.EQUALS)
-        .withOperandSupplier(b0 ->
-            b0.operand(Filter.class).oneInput(b1 ->
-                b1.operand(Join.class).anyInputs()))
-        .withSmart(true));
-  }
-}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 3083f1cb1e..03e8d842bd 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -19,18 +19,15 @@
 package org.apache.calcite.rel.rules;
 
 import com.google.common.collect.ImmutableList;
-import java.util.List;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.pinot.query.planner.hints.PinotRelationalHints;
 
 
 /**
@@ -58,21 +55,20 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    // TODO: this only works for single equality JOIN. add generic condition parser
     Join join = call.rel(0);
     RelNode leftInput = join.getInput(0);
     RelNode rightInput = join.getInput(1);
 
     RelNode leftExchange;
     RelNode rightExchange;
-    List<RelHint> hints = join.getHints();
-    if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
-      // TODO: determine which side should be the broadcast table based on table metadata
-      // TODO: support SINGLETON exchange if the non-broadcast table size is small enough to stay local.
-      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+    JoinInfo joinInfo = join.analyzeCondition();
+
+    if (joinInfo.leftKeys.isEmpty()) {
+      // when there's no JOIN key, use broadcast.
+      leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
       rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
-    } else { // if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
-      JoinInfo joinInfo = join.analyzeCondition();
+    } else {
+      // when join key exists, use hash distribution.
       leftExchange = LogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
       rightExchange = LogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
     }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index e5361bc8ec..c9a55e91cf 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -37,8 +37,8 @@ public class PinotQueryRuleSets {
           EnumerableRules.ENUMERABLE_PROJECT_RULE, EnumerableRules.ENUMERABLE_SORT_RULE,
           EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
 
-          // push a filter into a join, replaced CoreRules.FILTER_INTO_JOIN with special config
-          PinotFilterIntoJoinRule.INSTANCE,
+          // push a filter into a join
+          CoreRules.FILTER_INTO_JOIN,
           // push filter through an aggregation
           CoreRules.FILTER_AGGREGATE_TRANSPOSE,
           // push filter through set operation
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 76015f38c8..e45b24944e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.query.planner.logical;
 
 import com.google.common.base.Preconditions;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelNode;
@@ -126,9 +125,9 @@ public final class RelToStageConverter {
     JoinInfo joinInfo = node.analyzeCondition();
     FieldSelectionKeySelector leftFieldSelectionKeySelector = new FieldSelectionKeySelector(joinInfo.leftKeys);
     FieldSelectionKeySelector rightFieldSelectionKeySelector = new FieldSelectionKeySelector(joinInfo.rightKeys);
-    Preconditions.checkState(joinInfo.nonEquiConditions.isEmpty());
-    return new JoinNode(currentStageId, toDataSchema(node.getRowType()), joinType, Collections.singletonList(
-        new JoinNode.JoinClause(leftFieldSelectionKeySelector, rightFieldSelectionKeySelector)));
+    return new JoinNode(currentStageId, toDataSchema(node.getRowType()), joinType,
+        new JoinNode.JoinKeys(leftFieldSelectionKeySelector, rightFieldSelectionKeySelector),
+        joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList()));
   }
 
   private static DataSchema toDataSchema(RelDataType rowType) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index dd03a1cef8..8c940294ef 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -206,11 +206,11 @@ public class StagePlanner {
       int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
       Set<Integer> leftPartitionKeys = node.getInputs().get(0).getPartitionKeys();
       Set<Integer> rightPartitionKeys = node.getInputs().get(1).getPartitionKeys();
-      // TODO: currently JOIN criteria guarantee to only have one FieldSelectionKeySelector. Support more.
+      // Currently, JOIN criteria guarantee to only have one FieldSelectionKeySelector.
       FieldSelectionKeySelector leftJoinKeySelector =
-          (FieldSelectionKeySelector) ((JoinNode) node).getCriteria().get(0).getLeftJoinKeySelector();
+          (FieldSelectionKeySelector) ((JoinNode) node).getJoinKeys().getLeftJoinKeySelector();
       FieldSelectionKeySelector rightJoinKeySelector =
-          (FieldSelectionKeySelector) ((JoinNode) node).getCriteria().get(0).getRightJoinKeySelector();
+          (FieldSelectionKeySelector) ((JoinNode) node).getJoinKeys().getRightJoinKeySelector();
       Set<Integer> newPartitionKeys = new HashSet<>();
       for (int i = 0; i < leftJoinKeySelector.getColumnIndices().size(); i++) {
         int leftIndex = leftJoinKeySelector.getColumnIndices().get(i);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
index eeec1b31e9..222d04d9ca 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.stage;
 import java.util.List;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -30,36 +31,44 @@ public class JoinNode extends AbstractStageNode {
   @ProtoProperties
   private JoinRelType _joinRelType;
   @ProtoProperties
-  private List<JoinClause> _criteria;
+  private JoinKeys _joinKeys;
+  @ProtoProperties
+  private List<RexExpression> _joinClause;
 
   public JoinNode(int stageId) {
     super(stageId);
   }
 
-  public JoinNode(int stageId, DataSchema dataSchema, JoinRelType joinRelType, List<JoinClause> criteria) {
+  public JoinNode(int stageId, DataSchema dataSchema, JoinRelType joinRelType, JoinKeys joinKeys,
+      List<RexExpression> joinClause) {
     super(stageId, dataSchema);
     _joinRelType = joinRelType;
-    _criteria = criteria;
+    _joinKeys = joinKeys;
+    _joinClause = joinClause;
   }
 
   public JoinRelType getJoinRelType() {
     return _joinRelType;
   }
 
-  public List<JoinClause> getCriteria() {
-    return _criteria;
+  public JoinKeys getJoinKeys() {
+    return _joinKeys;
+  }
+
+  public List<RexExpression> getJoinClauses() {
+    return _joinClause;
   }
 
-  public static class JoinClause {
+  public static class JoinKeys {
     @ProtoProperties
     private KeySelector<Object[], Object[]> _leftJoinKeySelector;
     @ProtoProperties
     private KeySelector<Object[], Object[]> _rightJoinKeySelector;
 
-    public JoinClause() {
+    public JoinKeys() {
     }
 
-    public JoinClause(FieldSelectionKeySelector leftKeySelector, FieldSelectionKeySelector rightKeySelector) {
+    public JoinKeys(FieldSelectionKeySelector leftKeySelector, FieldSelectionKeySelector rightKeySelector) {
       _leftJoinKeySelector = leftKeySelector;
       _rightJoinKeySelector = rightKeySelector;
     }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 95744c55a0..0bccbc3b36 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -125,8 +125,8 @@ public class WorkerQueryExecutor {
       BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
       BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
       return new HashJoinOperator(leftOperator, joinNode.getInputs().get(0).getDataSchema(), rightOperator,
-          joinNode.getInputs().get(1).getDataSchema(), joinNode.getDataSchema(), joinNode.getCriteria(),
-          joinNode.getJoinRelType());
+          joinNode.getInputs().get(1).getDataSchema(), joinNode.getDataSchema(), joinNode.getJoinKeys(),
+          joinNode.getJoinClauses(), joinNode.getJoinRelType());
     } else if (stageNode instanceof AggregateNode) {
       AggregateNode aggregateNode = (AggregateNode) stageNode;
       BaseOperator<TransferableBlock> inputOperator =
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index a3b3b215b1..0b2d0e2a7b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -29,10 +29,12 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockUtils;
 import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
 
 
 /**
@@ -54,6 +56,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
   private final DataSchema _leftTableSchema;
   private final DataSchema _rightTableSchema;
   private final int _resultRowSize;
+  private final List<FilterOperand> _joinClauseEvaluators;
   private boolean _isHashTableBuilt;
   private TransferableBlock _upstreamErrorBlock;
   private KeySelector<Object[], Object[]> _leftKeySelector;
@@ -61,14 +64,18 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
 
   public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator, DataSchema leftSchema,
       BaseOperator<TransferableBlock> rightTableOperator, DataSchema rightSchema, DataSchema outputSchema,
-      List<JoinNode.JoinClause> criteria, JoinRelType joinType) {
-    _leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
-    _rightKeySelector = criteria.get(0).getRightJoinKeySelector();
+      JoinNode.JoinKeys joinKeys, List<RexExpression> joinClauses, JoinRelType joinType) {
+    _leftKeySelector = joinKeys.getLeftJoinKeySelector();
+    _rightKeySelector = joinKeys.getRightJoinKeySelector();
     _leftTableOperator = leftTableOperator;
     _rightTableOperator = rightTableOperator;
     _resultSchema = outputSchema;
     _leftTableSchema = leftSchema;
     _rightTableSchema = rightSchema;
+    _joinClauseEvaluators = new ArrayList<>(joinClauses.size());
+    for (RexExpression joinClause : joinClauses) {
+      _joinClauseEvaluators.add(FilterOperand.toFilterOperand(joinClause, _resultSchema));
+    }
     _joinType = joinType;
     _resultRowSize = _resultSchema.size();
     _isHashTableBuilt = false;
@@ -132,7 +139,11 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
         List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
             _leftKeySelector.computeHash(leftRow), Collections.emptyList());
         for (Object[] rightRow : hashCollection) {
-          rows.add(joinRow(leftRow, rightRow));
+          Object[] resultRow = joinRow(leftRow, rightRow);
+          if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
+              evaluator -> evaluator.apply(resultRow))) {
+            rows.add(resultRow);
+          }
         }
       }
       return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index e3f8f4da51..67b1195a33 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -220,9 +220,11 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         // Projection pushdown
         new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'"},
 
-        // Partial filter pushdown
+        // Inequality JOIN & partial filter pushdown
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3"},
 
+        new Object[]{"SELECT * FROM a, b WHERE a.col1 > b.col2 AND a.col3 > b.col3"},
+
         // Aggregation with group by
         new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1"},
 
@@ -269,7 +271,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
 
         // Sub-query with IN clause to SEMI JOIN.
         new Object[]{"SELECT b.col1, b.col2, SUM(b.col3) * 100 / COUNT(b.col3) FROM b WHERE b.col1 IN "
-            + "(SELECT a.col2 FROM a WHERE a.col2 != 'foo') GROUP BY b.col1, b.col2"},
+            + " (SELECT a.col2 FROM a WHERE a.col2 != 'foo') GROUP BY b.col1, b.col2"},
+        new Object[]{"SELECT SUM(b.col3) FROM b WHERE b.col3 > (SELECT AVG(a.col3) FROM a WHERE a.col2 != 'bar')"},
 
         // Aggregate query with HAVING clause, "foo" and "bar" occurred 6/2 times each and "alice" occurred 3/1 times
         // numbers are cycle in (1, 42, 1, 42, 1), and (foo, bar, alice, foo, bar)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org