You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/09/23 04:50:19 UTC
[pinot] branch master updated: [multistage] adding support for range predicate (#9445)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a07f757e40 [multistage] adding support for range predicate (#9445)
a07f757e40 is described below
commit a07f757e409b2c0e0bd4e5377a729c82960c9399
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Sep 22 21:50:14 2022 -0700
[multistage] adding support for range predicate (#9445)
* adding rules to decompose range predicate into OR joined simple comparison.
* if the look up is point or complemented points, we should still use IN / NOT IN
* if the search result in a constant value (true of false) it might trigger logical value literal node, which is also supported here.
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../rel/rules/PinotFilterExpandSearchRule.java | 75 ++++++++++++++++++++
.../calcite/rel/rules/PinotQueryRuleSets.java | 1 +
.../query/planner/logical/RelToStageConverter.java | 10 ++-
.../query/planner/stage/StageNodeSerDeUtils.java | 2 +
.../pinot/query/planner/stage/ValueNode.java | 54 +++++++++++++++
.../apache/pinot/query/QueryCompilationTest.java | 2 -
.../runtime/executor/WorkerQueryExecutor.java | 4 ++
.../runtime/operator/LiteralValueOperator.java | 79 ++++++++++++++++++++++
.../pinot/query/runtime/QueryRunnerTest.java | 10 +++
9 files changed, 234 insertions(+), 3 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterExpandSearchRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterExpandSearchRule.java
new file mode 100644
index 0000000000..37c2aabc74
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterExpandSearchRule.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+public class PinotFilterExpandSearchRule extends RelOptRule {
+ public static final PinotFilterExpandSearchRule INSTANCE =
+ new PinotFilterExpandSearchRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+ public PinotFilterExpandSearchRule(RelBuilderFactory factory) {
+ super(operand(LogicalFilter.class, any()), factory, null);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public boolean matches(RelOptRuleCall call) {
+ if (call.rels.length < 1) {
+ return false;
+ }
+ if (call.rel(0) instanceof Filter) {
+ Filter filter = call.rel(0);
+ return containsRangeSearch(filter.getCondition());
+ }
+ return false;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ RexNode newCondition = RexUtil.expandSearch(filter.getCluster().getRexBuilder(), null, filter.getCondition());
+ call.transformTo(LogicalFilter.create(filter.getInput(), newCondition));
+ }
+
+ private boolean containsRangeSearch(RexNode condition) {
+ switch (condition.getKind()) {
+ case AND:
+ case OR:
+ for (RexNode operand : ((RexCall) condition).getOperands()) {
+ if (containsRangeSearch(operand)) {
+ return true;
+ }
+ }
+ return false;
+ case SEARCH:
+ return true;
+ default:
+ return false;
+ }
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 84865be084..e5361bc8ec 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -90,6 +90,7 @@ public class PinotQueryRuleSets {
PruneEmptyRules.UNION_INSTANCE,
// Pinot specific rules
+ PinotFilterExpandSearchRule.INSTANCE,
PinotJoinExchangeNodeInsertRule.INSTANCE,
PinotAggregateExchangeNodeInsertRule.INSTANCE,
PinotSortExchangeNodeInsertRule.INSTANCE
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 8dddf7a441..76015f38c8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -31,6 +31,7 @@ import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
@@ -45,6 +46,7 @@ import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
/**
@@ -77,11 +79,17 @@ public final class RelToStageConverter {
return convertLogicalAggregate((LogicalAggregate) node, currentStageId);
} else if (node instanceof LogicalSort) {
return convertLogicalSort((LogicalSort) node, currentStageId);
+ } else if (node instanceof LogicalValues) {
+ return convertLogicalValues((LogicalValues) node, currentStageId);
} else {
- throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
+ throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
}
}
+ private static StageNode convertLogicalValues(LogicalValues node, int currentStageId) {
+ return new ValueNode(currentStageId, toDataSchema(node.getRowType()), node.tuples);
+ }
+
private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
int fetch = node.fetch == null ? 0 : ((RexLiteral) node.fetch).getValueAs(Integer.class);
int offset = node.offset == null ? 0 : ((RexLiteral) node.offset).getValueAs(Integer.class);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index 4e8e1c4e61..69de8bcba6 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -81,6 +81,8 @@ public final class StageNodeSerDeUtils {
return new MailboxSendNode(stageId);
case "MailboxReceiveNode":
return new MailboxReceiveNode(stageId);
+ case "ValueNode":
+ return new ValueNode(stageId);
default:
throw new IllegalArgumentException("Unknown node name: " + nodeName);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
new file mode 100644
index 0000000000..082a4d3b36
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+public class ValueNode extends AbstractStageNode {
+ @ProtoProperties
+ private List<List<RexExpression>> _literalRows;
+
+ public ValueNode(int stageId) {
+ super(stageId);
+ }
+
+ public ValueNode(int currentStageId, DataSchema dataSchema,
+ ImmutableList<ImmutableList<RexLiteral>> literalTuples) {
+ super(currentStageId, dataSchema);
+ _literalRows = new ArrayList<>();
+ for (List<RexLiteral> literalTuple : literalTuples) {
+ List<RexExpression> literalRow = new ArrayList<>();
+ for (RexLiteral literal : literalTuple) {
+ literalRow.add(RexExpression.toRexExpression(literal));
+ }
+ _literalRows.add(literalRow);
+ }
+ }
+
+ public List<List<RexExpression>> getLiteralRows() {
+ return _literalRows;
+ }
+}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 4223a1717a..0531cfaba8 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -257,8 +257,6 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
new Object[]{"SELECT a.col1, SUM(a.col3) FROM a", "'a.col1' is not being grouped"},
// empty IN clause fails compilation
new Object[]{"SELECT a.col1 FROM a WHERE a.col1 IN ()", "Encountered \"\" at line"},
- // range filter queries are not supported right now
- new Object[]{"SELECT a.col1 FROM a WHERE a.col1 > 'x' AND a.col1 < 'y'", "Range is not implemented yet"}
};
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 58d5371691..95744c55a0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -37,11 +37,13 @@ import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.FilterOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
+import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.SortOperator;
@@ -144,6 +146,8 @@ public class WorkerQueryExecutor {
return new SortOperator(getOperator(requestId, sortNode.getInputs().get(0), metadataMap),
sortNode.getCollationKeys(), sortNode.getCollationDirections(), sortNode.getFetch(), sortNode.getOffset(),
sortNode.getDataSchema());
+ } else if (stageNode instanceof ValueNode) {
+ return new LiteralValueOperator(stageNode.getDataSchema(), ((ValueNode) stageNode).getLiteralRows());
} else {
throw new UnsupportedOperationException(
String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
new file mode 100644
index 0000000000..9ded3df8f2
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+public class LiteralValueOperator extends BaseOperator<TransferableBlock> {
+ private static final String EXPLAIN_NAME = "LITERAL_VALUE_PROVIDER";
+
+ private final DataSchema _dataSchema;
+ private final TransferableBlock _rexLiteralBlock;
+ private boolean _isLiteralBlockReturned;
+
+ public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows) {
+ _dataSchema = dataSchema;
+ _rexLiteralBlock = constructBlock(rexLiteralRows);
+ _isLiteralBlockReturned = false;
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ // WorkerExecutor doesn't use getChildOperators, returns null here.
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ if (!_isLiteralBlockReturned) {
+ _isLiteralBlockReturned = true;
+ return _rexLiteralBlock;
+ } else {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+ }
+ }
+
+ private TransferableBlock constructBlock(List<List<RexExpression>> rexLiteralRows) {
+ List<Object[]> blockContent = new ArrayList<>();
+ for (List<RexExpression> rexLiteralRow : rexLiteralRows) {
+ Object[] row = new Object[_dataSchema.size()];
+ for (int i = 0; i < _dataSchema.size(); i++) {
+ row[i] = ((RexExpression.Literal) rexLiteralRow.get(i)).getValue();
+ }
+ blockContent.add(row);
+ }
+ return new TransferableBlock(blockContent, _dataSchema, BaseDataBlock.Type.ROW);
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 5bfd3511bd..0aeecd58cf 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -200,6 +200,16 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
+ " WHERE a.col1 IN ('foo') AND b.col2 NOT IN ('')"},
+ // Range conditions with continuous and non-continuous range.
+ new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
+ + " WHERE a.col3 IN (1, 2, 3) OR (a.col3 > 10 AND a.col3 < 50)"},
+
+ new Object[]{"SELECT col1, SUM(col3) FROM a WHERE a.col3 BETWEEN 23 AND 36 "
+ + " GROUP BY col1 HAVING SUM(col3) > 10.0 AND MIN(col3) <> 123 AND MAX(col3) BETWEEN 10 AND 20"},
+
+ new Object[]{"SELECT col1, SUM(col3) FROM a WHERE (col3 > 0 AND col3 < 45) AND (col3 > 15 AND col3 < 50) "
+ + " GROUP BY col1 HAVING (SUM(col3) > 10 AND SUM(col3) < 20) AND (SUM(col3) > 30 AND SUM(col3) < 40)"},
+
// Projection pushdown
new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'"},
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org