You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/29 18:36:00 UTC
[pinot] branch master updated: [multistage] support inequality JOIN (#9448)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3057712fdc [multistage] support inequality JOIN (#9448)
3057712fdc is described below
commit 3057712fdc468951d72be9d676c17b5a8f0af850
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Sep 29 11:35:53 2022 -0700
[multistage] support inequality JOIN (#9448)
* support inequality JOIN
* also support pure inequality join
* address diff comment and add a SEMI join test case for this as well
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../calcite/rel/rules/PinotFilterIntoJoinRule.java | 39 ----------------------
.../rel/rules/PinotJoinExchangeNodeInsertRule.java | 18 ++++------
.../calcite/rel/rules/PinotQueryRuleSets.java | 4 +--
.../query/planner/logical/RelToStageConverter.java | 7 ++--
.../pinot/query/planner/logical/StagePlanner.java | 6 ++--
.../apache/pinot/query/planner/stage/JoinNode.java | 25 +++++++++-----
.../runtime/executor/WorkerQueryExecutor.java | 4 +--
.../query/runtime/operator/HashJoinOperator.java | 19 ++++++++---
.../pinot/query/runtime/QueryRunnerTest.java | 7 ++--
9 files changed, 54 insertions(+), 75 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java
deleted file mode 100644
index b505d3cdca..0000000000
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterIntoJoinRule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.calcite.rel.rules;
-
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.sql.SqlKind;
-
-
-/**
- * Customized rule extends {@link FilterJoinRule.FilterIntoJoinRule}, since Pinot only support equality JOIN.
- */
-public class PinotFilterIntoJoinRule extends FilterJoinRule.FilterIntoJoinRule {
- public static final PinotFilterIntoJoinRule INSTANCE = new PinotFilterIntoJoinRule();
- protected PinotFilterIntoJoinRule() {
- super(ImmutableFilterIntoJoinRuleConfig.of((join, joinType, exp) ->
- exp.getKind() == SqlKind.AND || exp.getKind() == SqlKind.EQUALS)
- .withOperandSupplier(b0 ->
- b0.operand(Filter.class).oneInput(b1 ->
- b1.operand(Join.class).anyInputs()))
- .withSmart(true));
- }
-}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 3083f1cb1e..03e8d842bd 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -19,18 +19,15 @@
package org.apache.calcite.rel.rules;
import com.google.common.collect.ImmutableList;
-import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.pinot.query.planner.hints.PinotRelationalHints;
/**
@@ -58,21 +55,20 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
@Override
public void onMatch(RelOptRuleCall call) {
- // TODO: this only works for single equality JOIN. add generic condition parser
Join join = call.rel(0);
RelNode leftInput = join.getInput(0);
RelNode rightInput = join.getInput(1);
RelNode leftExchange;
RelNode rightExchange;
- List<RelHint> hints = join.getHints();
- if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
- // TODO: determine which side should be the broadcast table based on table metadata
- // TODO: support SINGLETON exchange if the non-broadcast table size is small enough to stay local.
- leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+ JoinInfo joinInfo = join.analyzeCondition();
+
+ if (joinInfo.leftKeys.isEmpty()) {
+ // when there's no JOIN key, use broadcast.
+ leftExchange = LogicalExchange.create(leftInput, RelDistributions.SINGLETON);
rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
- } else { // if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
- JoinInfo joinInfo = join.analyzeCondition();
+ } else {
+ // when join key exists, use hash distribution.
leftExchange = LogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
rightExchange = LogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index e5361bc8ec..c9a55e91cf 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -37,8 +37,8 @@ public class PinotQueryRuleSets {
EnumerableRules.ENUMERABLE_PROJECT_RULE, EnumerableRules.ENUMERABLE_SORT_RULE,
EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
- // push a filter into a join, replaced CoreRules.FILTER_INTO_JOIN with special config
- PinotFilterIntoJoinRule.INSTANCE,
+ // push a filter into a join
+ CoreRules.FILTER_INTO_JOIN,
// push filter through an aggregation
CoreRules.FILTER_AGGREGATE_TRANSPOSE,
// push filter through set operation
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 76015f38c8..e45b24944e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -19,7 +19,6 @@
package org.apache.pinot.query.planner.logical;
import com.google.common.base.Preconditions;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelNode;
@@ -126,9 +125,9 @@ public final class RelToStageConverter {
JoinInfo joinInfo = node.analyzeCondition();
FieldSelectionKeySelector leftFieldSelectionKeySelector = new FieldSelectionKeySelector(joinInfo.leftKeys);
FieldSelectionKeySelector rightFieldSelectionKeySelector = new FieldSelectionKeySelector(joinInfo.rightKeys);
- Preconditions.checkState(joinInfo.nonEquiConditions.isEmpty());
- return new JoinNode(currentStageId, toDataSchema(node.getRowType()), joinType, Collections.singletonList(
- new JoinNode.JoinClause(leftFieldSelectionKeySelector, rightFieldSelectionKeySelector)));
+ return new JoinNode(currentStageId, toDataSchema(node.getRowType()), joinType,
+ new JoinNode.JoinKeys(leftFieldSelectionKeySelector, rightFieldSelectionKeySelector),
+ joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList()));
}
private static DataSchema toDataSchema(RelDataType rowType) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index dd03a1cef8..8c940294ef 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -206,11 +206,11 @@ public class StagePlanner {
int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
Set<Integer> leftPartitionKeys = node.getInputs().get(0).getPartitionKeys();
Set<Integer> rightPartitionKeys = node.getInputs().get(1).getPartitionKeys();
- // TODO: currently JOIN criteria guarantee to only have one FieldSelectionKeySelector. Support more.
+ // Currently, JOIN criteria guarantee to only have one FieldSelectionKeySelector.
FieldSelectionKeySelector leftJoinKeySelector =
- (FieldSelectionKeySelector) ((JoinNode) node).getCriteria().get(0).getLeftJoinKeySelector();
+ (FieldSelectionKeySelector) ((JoinNode) node).getJoinKeys().getLeftJoinKeySelector();
FieldSelectionKeySelector rightJoinKeySelector =
- (FieldSelectionKeySelector) ((JoinNode) node).getCriteria().get(0).getRightJoinKeySelector();
+ (FieldSelectionKeySelector) ((JoinNode) node).getJoinKeys().getRightJoinKeySelector();
Set<Integer> newPartitionKeys = new HashSet<>();
for (int i = 0; i < leftJoinKeySelector.getColumnIndices().size(); i++) {
int leftIndex = leftJoinKeySelector.getColumnIndices().get(i);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
index eeec1b31e9..222d04d9ca 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.stage;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -30,36 +31,44 @@ public class JoinNode extends AbstractStageNode {
@ProtoProperties
private JoinRelType _joinRelType;
@ProtoProperties
- private List<JoinClause> _criteria;
+ private JoinKeys _joinKeys;
+ @ProtoProperties
+ private List<RexExpression> _joinClause;
public JoinNode(int stageId) {
super(stageId);
}
- public JoinNode(int stageId, DataSchema dataSchema, JoinRelType joinRelType, List<JoinClause> criteria) {
+ public JoinNode(int stageId, DataSchema dataSchema, JoinRelType joinRelType, JoinKeys joinKeys,
+ List<RexExpression> joinClause) {
super(stageId, dataSchema);
_joinRelType = joinRelType;
- _criteria = criteria;
+ _joinKeys = joinKeys;
+ _joinClause = joinClause;
}
public JoinRelType getJoinRelType() {
return _joinRelType;
}
- public List<JoinClause> getCriteria() {
- return _criteria;
+ public JoinKeys getJoinKeys() {
+ return _joinKeys;
+ }
+
+ public List<RexExpression> getJoinClauses() {
+ return _joinClause;
}
- public static class JoinClause {
+ public static class JoinKeys {
@ProtoProperties
private KeySelector<Object[], Object[]> _leftJoinKeySelector;
@ProtoProperties
private KeySelector<Object[], Object[]> _rightJoinKeySelector;
- public JoinClause() {
+ public JoinKeys() {
}
- public JoinClause(FieldSelectionKeySelector leftKeySelector, FieldSelectionKeySelector rightKeySelector) {
+ public JoinKeys(FieldSelectionKeySelector leftKeySelector, FieldSelectionKeySelector rightKeySelector) {
_leftJoinKeySelector = leftKeySelector;
_rightJoinKeySelector = rightKeySelector;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 95744c55a0..0bccbc3b36 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -125,8 +125,8 @@ public class WorkerQueryExecutor {
BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
return new HashJoinOperator(leftOperator, joinNode.getInputs().get(0).getDataSchema(), rightOperator,
- joinNode.getInputs().get(1).getDataSchema(), joinNode.getDataSchema(), joinNode.getCriteria(),
- joinNode.getJoinRelType());
+ joinNode.getInputs().get(1).getDataSchema(), joinNode.getDataSchema(), joinNode.getJoinKeys(),
+ joinNode.getJoinClauses(), joinNode.getJoinRelType());
} else if (stageNode instanceof AggregateNode) {
AggregateNode aggregateNode = (AggregateNode) stageNode;
BaseOperator<TransferableBlock> inputOperator =
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index a3b3b215b1..0b2d0e2a7b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -29,10 +29,12 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
/**
@@ -54,6 +56,7 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
private final DataSchema _leftTableSchema;
private final DataSchema _rightTableSchema;
private final int _resultRowSize;
+ private final List<FilterOperand> _joinClauseEvaluators;
private boolean _isHashTableBuilt;
private TransferableBlock _upstreamErrorBlock;
private KeySelector<Object[], Object[]> _leftKeySelector;
@@ -61,14 +64,18 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator, DataSchema leftSchema,
BaseOperator<TransferableBlock> rightTableOperator, DataSchema rightSchema, DataSchema outputSchema,
- List<JoinNode.JoinClause> criteria, JoinRelType joinType) {
- _leftKeySelector = criteria.get(0).getLeftJoinKeySelector();
- _rightKeySelector = criteria.get(0).getRightJoinKeySelector();
+ JoinNode.JoinKeys joinKeys, List<RexExpression> joinClauses, JoinRelType joinType) {
+ _leftKeySelector = joinKeys.getLeftJoinKeySelector();
+ _rightKeySelector = joinKeys.getRightJoinKeySelector();
_leftTableOperator = leftTableOperator;
_rightTableOperator = rightTableOperator;
_resultSchema = outputSchema;
_leftTableSchema = leftSchema;
_rightTableSchema = rightSchema;
+ _joinClauseEvaluators = new ArrayList<>(joinClauses.size());
+ for (RexExpression joinClause : joinClauses) {
+ _joinClauseEvaluators.add(FilterOperand.toFilterOperand(joinClause, _resultSchema));
+ }
_joinType = joinType;
_resultRowSize = _resultSchema.size();
_isHashTableBuilt = false;
@@ -132,7 +139,11 @@ public class HashJoinOperator extends BaseOperator<TransferableBlock> {
List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(
_leftKeySelector.computeHash(leftRow), Collections.emptyList());
for (Object[] rightRow : hashCollection) {
- rows.add(joinRow(leftRow, rightRow));
+ Object[] resultRow = joinRow(leftRow, rightRow);
+ if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream().allMatch(
+ evaluator -> evaluator.apply(resultRow))) {
+ rows.add(resultRow);
+ }
}
}
return new TransferableBlock(rows, _resultSchema, BaseDataBlock.Type.ROW);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index e3f8f4da51..67b1195a33 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -220,9 +220,11 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
// Projection pushdown
new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'"},
- // Partial filter pushdown
+ // Inequality JOIN & partial filter pushdown
new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3"},
+ new Object[]{"SELECT * FROM a, b WHERE a.col1 > b.col2 AND a.col3 > b.col3"},
+
// Aggregation with group by
new Object[]{"SELECT a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 GROUP BY a.col1"},
@@ -269,7 +271,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
// Sub-query with IN clause to SEMI JOIN.
new Object[]{"SELECT b.col1, b.col2, SUM(b.col3) * 100 / COUNT(b.col3) FROM b WHERE b.col1 IN "
- + "(SELECT a.col2 FROM a WHERE a.col2 != 'foo') GROUP BY b.col1, b.col2"},
+ + " (SELECT a.col2 FROM a WHERE a.col2 != 'foo') GROUP BY b.col1, b.col2"},
+ new Object[]{"SELECT SUM(b.col3) FROM b WHERE b.col3 > (SELECT AVG(a.col3) FROM a WHERE a.col2 != 'bar')"},
// Aggregate query with HAVING clause, "foo" and "bar" occurred 6/2 times each and "alice" occurred 3/1 times
// numbers are cycle in (1, 42, 1, 42, 1), and (foo, bar, alice, foo, bar)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org