You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/09/03 00:57:05 UTC

[pinot] branch master updated: [multistage] do not pushdown inequality join rules (#9328)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c3c9e2e84a [multistage] do not pushdown inequality join rules (#9328)
c3c9e2e84a is described below

commit c3c9e2e84a3acbaf5df8c6400d964bb7624d8bd9
Author: Rong Rong <wa...@gmail.com>
AuthorDate: Fri Sep 2 17:56:59 2022 -0700

    [multistage] do not pushdown inequality join rules (#9328)
    
    * rename planner util to get index ref
    
    * modify pushdown rule
    
    * also refactor the rules into calcite namespace
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../PinotAggregateExchangeNodeInsertRule.java      |  2 +-
 .../calcite/rel/rules/PinotFilterIntoJoinRule.java | 39 ++++++++++++++++++++++
 .../rules/PinotJoinExchangeNodeInsertRule.java     |  4 +--
 .../PinotLogicalSortFetchEliminationRule.java      |  2 +-
 .../rel}/rules/PinotQueryRuleSets.java             |  8 ++---
 .../org/apache/pinot/query/QueryEnvironment.java   |  2 +-
 .../apache/pinot/query/planner/PlannerUtils.java   |  4 +--
 .../query/planner/logical/RelToStageConverter.java |  2 +-
 .../pinot/query/QueryEnvironmentTestBase.java      |  1 +
 .../apache/pinot/query/QueryServerEnclosure.java   |  2 +-
 .../pinot/query/runtime/QueryRunnerTest.java       | 29 ++++++++++------
 11 files changed, 70 insertions(+), 25 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
similarity index 99%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
rename to pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
index 38c52e6656..fba5f4fc12 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.rules;
+package org.apache.calcite.rel.rules;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java
new file mode 100644
index 0000000000..b505d3cdca
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.sql.SqlKind;
+
+
+/**
+ * Customized rule extends {@link FilterJoinRule.FilterIntoJoinRule}, since Pinot only support equality JOIN.
+ */
+public class PinotFilterIntoJoinRule extends FilterJoinRule.FilterIntoJoinRule {
+  public static final PinotFilterIntoJoinRule INSTANCE = new PinotFilterIntoJoinRule();
+  protected PinotFilterIntoJoinRule() {
+    super(ImmutableFilterIntoJoinRuleConfig.of((join, joinType, exp) ->
+            exp.getKind() == SqlKind.AND || exp.getKind() == SqlKind.EQUALS)
+        .withOperandSupplier(b0 ->
+            b0.operand(Filter.class).oneInput(b1 ->
+                b1.operand(Join.class).anyInputs()))
+        .withSmart(true));
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
similarity index 96%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
rename to pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index f766afb756..072e639733 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.rules;
+package org.apache.calcite.rel.rules;
 
 import com.google.common.collect.ImmutableList;
 import java.util.List;
@@ -78,7 +78,7 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
     } else { // if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
       RexCall joinCondition = (RexCall) join.getCondition();
       int leftNodeOffset = join.getLeft().getRowType().getFieldNames().size();
-      List<List<Integer>> conditions = PlannerUtils.parseJoinConditions(joinCondition, leftNodeOffset);
+      List<List<Integer>> conditions = PlannerUtils.getJoinKeyFromConditions(joinCondition, leftNodeOffset);
       leftExchange = LogicalExchange.create(leftInput,
           RelDistributions.hash(conditions.get(0)));
       rightExchange = LogicalExchange.create(rightInput,
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotLogicalSortFetchEliminationRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java
similarity index 98%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotLogicalSortFetchEliminationRule.java
rename to pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java
index fadce600e2..a85035b85b 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotLogicalSortFetchEliminationRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.rules;
+package org.apache.calcite.rel.rules;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
similarity index 94%
rename from pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
rename to pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 060a78cf29..55202805b2 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -16,14 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.rules;
+package org.apache.calcite.rel.rules;
 
 import java.util.Arrays;
 import java.util.Collection;
 import org.apache.calcite.adapter.enumerable.EnumerableRules;
 import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.rel.rules.CoreRules;
-import org.apache.calcite.rel.rules.PruneEmptyRules;
 
 
 /**
@@ -39,8 +37,8 @@ public class PinotQueryRuleSets {
           EnumerableRules.ENUMERABLE_PROJECT_RULE, EnumerableRules.ENUMERABLE_SORT_RULE,
           EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
 
-          // push a filter into a join
-          CoreRules.FILTER_INTO_JOIN,
+          // push a filter into a join, replaced CoreRules.FILTER_INTO_JOIN with special config
+          PinotFilterIntoJoinRule.INSTANCE,
           // push filter through an aggregation
           CoreRules.FILTER_AGGREGATE_TRANSPOSE,
           // push filter through set operation
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 9d512369d1..da13d03a35 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -36,6 +36,7 @@ import org.apache.calcite.prepare.Prepare;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.hint.HintStrategyTable;
+import org.apache.calcite.rel.rules.PinotQueryRuleSets;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.sql.SqlExplain;
@@ -55,7 +56,6 @@ import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.logical.LogicalPlanner;
 import org.apache.pinot.query.planner.logical.StagePlanner;
 import org.apache.pinot.query.routing.WorkerManager;
-import org.apache.pinot.query.rules.PinotQueryRuleSets;
 import org.apache.pinot.query.type.TypeFactory;
 import org.apache.pinot.query.validate.Validator;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
index b6ebb214a1..774fdc153c 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/PlannerUtils.java
@@ -44,7 +44,7 @@ public class PlannerUtils {
     // do not instantiate.
   }
 
-  public static List<List<Integer>> parseJoinConditions(RexCall joinCondition, int leftNodeOffset) {
+  public static List<List<Integer>> getJoinKeyFromConditions(RexCall joinCondition, int leftNodeOffset) {
     switch (joinCondition.getOperator().getKind()) {
       case EQUALS:
         RexNode left = joinCondition.getOperands().get(0);
@@ -59,7 +59,7 @@ public class PlannerUtils {
         predicateColumns.add(new ArrayList<>());
         for (RexNode operand : joinCondition.getOperands()) {
           Preconditions.checkState(operand instanceof RexCall);
-          List<List<Integer>> subPredicate = parseJoinConditions((RexCall) operand, leftNodeOffset);
+          List<List<Integer>> subPredicate = getJoinKeyFromConditions((RexCall) operand, leftNodeOffset);
           predicateColumns.get(0).addAll(subPredicate.get(0));
           predicateColumns.get(1).addAll(subPredicate.get(1));
         }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 2d528b3a74..c7daa877f8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -104,7 +104,7 @@ public final class RelToStageConverter {
 
     // Parse out all equality JOIN conditions
     int leftNodeOffset = node.getLeft().getRowType().getFieldList().size();
-    List<List<Integer>> predicateColumns = PlannerUtils.parseJoinConditions(joinCondition, leftNodeOffset);
+    List<List<Integer>> predicateColumns = PlannerUtils.getJoinKeyFromConditions(joinCondition, leftNodeOffset);
 
     FieldSelectionKeySelector leftFieldSelectionKeySelector = new FieldSelectionKeySelector(predicateColumns.get(0));
     FieldSelectionKeySelector rightFieldSelectionKeySelector = new FieldSelectionKeySelector(predicateColumns.get(1));
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 4bd1dc3b39..f740709d09 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -46,6 +46,7 @@ public class QueryEnvironmentTestBase {
         new Object[]{"SELECT * FROM a LIMIT 10"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0"},
+        new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3"},
         new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2"},
         new Object[]{"SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 "
             + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0"},
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 2ef4958432..22f4eed5ca 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -73,7 +73,7 @@ public class QueryServerEnclosure {
   private static final int NUM_ROWS = 5;
   private static final int DEFAULT_EXECUTOR_THREAD_NUM = 5;
   private static final String[] STRING_FIELD_LIST = new String[]{"foo", "bar", "alice", "bob", "charlie"};
-  private static final int[] INT_FIELD_LIST = new int[]{1, 2, 42};
+  private static final int[] INT_FIELD_LIST = new int[]{1, 42};
 
   private final ExecutorService _testExecutor;
   private final int _queryRunnerPort;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index f74b7dcffc..3d89315868 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -76,17 +76,19 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         // thus the final JOIN result will be 15 x 1 = 15.
         // Next join with table C which has (5 on server1 and 10 on server2), since data is identical. each of the row
         // of the A JOIN B will have identical value of col3 as table C.col3 has. Since the values are cycling between
-        // (1, 2, 42, 1, 2). we will have 6 1s, 6 2s, and 3 42s, total result count will be 36 + 36 + 9 = 81
-        new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col3 = c.col3", 81},
+        // (1, 42, 1, 42, 1). we will have 9 1s, and 6 42s, total result count will be 9 * 9 + 6 * 6 = 117
+        new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col3 = c.col3", 117},
 
         // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1),
         // thus the final JOIN result will be 15 x 1 = 15.
         new Object[]{"SELECT * FROM a JOIN b on a.col1 = b.col1", 15},
 
-        // Query with function in JOIN keys, table A and B are both (1, 2, 42, 1, 2), with table A cycling 3 times.
-        // Final result would have 6 x 2 = 12 (6 (1)s on with MOD result 1, on both tables)
-        //     + 9 x 1 = 9 (6 (2)s & 3 (42)s on table A MOD 2 = 0, 1 (42)s on table B MOD 3 = 0): 21 rows in total.
-        new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON MOD(a.col3, 2) = MOD(b.col3, 3)", 21},
+        // Query with function in JOIN keys, table A and B are both (1, 42, 1, 42, 1), with table A cycling 3 times.
+        // Because:
+        //   - MOD(a.col3, 2) will have 6 (42)s equal to 0 and 9 (1)s equals to 1
+        //   - MOD(b.col3, 3) will have 2 (42)s equal to 0 and 3 (1)s equals to 1;
+        // final results are 6 * 2 + 9 * 3 = 27 rows
+        new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON MOD(a.col3, 2) = MOD(b.col3, 3)", 39},
 
         // Specifically table A has 15 rows (10 on server1 and 5 on server2) and table B has 5 rows (all on server1),
         // thus the final JOIN result will be 15 x 1 = 15.
@@ -101,6 +103,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         // Projection pushdown
         new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'", 3},
 
+        // Partial filter pushdown
+        new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3", 3},
+
         // Aggregation with group by
         new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1", 5},
 
@@ -140,13 +145,15 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
             + "  (SELECT a.col2 AS joinKey, MAX(a.col3) AS maxVal FROM a GROUP BY a.col2) AS i "
             + "  ON b.col1 = i.joinKey", 3},
 
-        // Aggregate query with HAVING clause,
-        // - "foo" and "bar" occurred 6 times each and "alice" occurred 3 times. --> COUNT(*) < 5 matches "alice"
-        // - col2=="foo"<->col3==1, col2=="bar"<->col3==2, col2="alice"<->col3==42, so SUM(col3) >= 10 matches "bar"
+        // Aggregate query with HAVING clause, "foo" and "bar" occurred 6/2 times each and "alice" occurred 3/1 times
+        // numbers are cycle in (1, 42, 1, 42, 1), and (foo, bar, alice, foo, bar)
+        // - COUNT(*) < 5 matches "alice" (3 times)
+        // - COUNT(*) > 5 matches "foo" and "bar" (6 times); so both will be selected out SUM(a.col3) = (1 + 42) * 3
         // - last condition doesn't match anything.
+        // total to 3 rows.
         new Object[]{"SELECT a.col2, COUNT(*), MAX(a.col3), MIN(a.col3), SUM(a.col3) FROM a GROUP BY a.col2 "
-            + "HAVING COUNT(*) < 5 OR (COUNT(*) > 5 AND SUM(a.col3) >= 10) "
-            + "OR (MIN(a.col3) != 20 AND SUM(a.col3) = 100)", 2},
+            + "HAVING COUNT(*) < 5 OR (COUNT(*) > 5 AND SUM(a.col3) >= 10)"
+            + "OR (MIN(a.col3) != 20 AND SUM(a.col3) = 100)", 3},
     };
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org