You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/09/23 04:50:19 UTC

[pinot] branch master updated: [multistage] adding support for range predicate (#9445)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a07f757e40 [multistage] adding support for range predicate (#9445)
a07f757e40 is described below

commit a07f757e409b2c0e0bd4e5377a729c82960c9399
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Sep 22 21:50:14 2022 -0700

    [multistage] adding support for range predicate (#9445)
    
    * adding rules to decompose range predicate into OR joined simple comparison.
    * if the look up is point or complemented points, we should still use IN / NOT IN
    * if the search result in a constant value (true of false) it might trigger logical value literal node, which is also supported here.
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../rel/rules/PinotFilterExpandSearchRule.java     | 75 ++++++++++++++++++++
 .../calcite/rel/rules/PinotQueryRuleSets.java      |  1 +
 .../query/planner/logical/RelToStageConverter.java | 10 ++-
 .../query/planner/stage/StageNodeSerDeUtils.java   |  2 +
 .../pinot/query/planner/stage/ValueNode.java       | 54 +++++++++++++++
 .../apache/pinot/query/QueryCompilationTest.java   |  2 -
 .../runtime/executor/WorkerQueryExecutor.java      |  4 ++
 .../runtime/operator/LiteralValueOperator.java     | 79 ++++++++++++++++++++++
 .../pinot/query/runtime/QueryRunnerTest.java       | 10 +++
 9 files changed, 234 insertions(+), 3 deletions(-)

diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterExpandSearchRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterExpandSearchRule.java
new file mode 100644
index 0000000000..37c2aabc74
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotFilterExpandSearchRule.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+public class PinotFilterExpandSearchRule extends RelOptRule {
+  public static final PinotFilterExpandSearchRule INSTANCE =
+      new PinotFilterExpandSearchRule(PinotRuleUtils.PINOT_REL_FACTORY);
+
+  public PinotFilterExpandSearchRule(RelBuilderFactory factory) {
+    super(operand(LogicalFilter.class, any()), factory, null);
+  }
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1) {
+      return false;
+    }
+    if (call.rel(0) instanceof Filter) {
+      Filter filter = call.rel(0);
+      return containsRangeSearch(filter.getCondition());
+    }
+    return false;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Filter filter = call.rel(0);
+    RexNode newCondition = RexUtil.expandSearch(filter.getCluster().getRexBuilder(), null, filter.getCondition());
+    call.transformTo(LogicalFilter.create(filter.getInput(), newCondition));
+  }
+
+  private boolean containsRangeSearch(RexNode condition) {
+    switch (condition.getKind()) {
+      case AND:
+      case OR:
+        for (RexNode operand : ((RexCall) condition).getOperands()) {
+          if (containsRangeSearch(operand)) {
+            return true;
+          }
+        }
+        return false;
+      case SEARCH:
+        return true;
+      default:
+        return false;
+    }
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 84865be084..e5361bc8ec 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -90,6 +90,7 @@ public class PinotQueryRuleSets {
           PruneEmptyRules.UNION_INSTANCE,
 
           // Pinot specific rules
+          PinotFilterExpandSearchRule.INSTANCE,
           PinotJoinExchangeNodeInsertRule.INSTANCE,
           PinotAggregateExchangeNodeInsertRule.INSTANCE,
           PinotSortExchangeNodeInsertRule.INSTANCE
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index 8dddf7a441..76015f38c8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -31,6 +31,7 @@ import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
@@ -45,6 +46,7 @@ import org.apache.pinot.query.planner.stage.ProjectNode;
 import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
 
 
 /**
@@ -77,11 +79,17 @@ public final class RelToStageConverter {
       return convertLogicalAggregate((LogicalAggregate) node, currentStageId);
     } else if (node instanceof LogicalSort) {
       return convertLogicalSort((LogicalSort) node, currentStageId);
+    } else if (node instanceof LogicalValues) {
+      return convertLogicalValues((LogicalValues) node, currentStageId);
     } else {
-        throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
+      throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
     }
   }
 
+  private static StageNode convertLogicalValues(LogicalValues node, int currentStageId) {
+    return new ValueNode(currentStageId, toDataSchema(node.getRowType()), node.tuples);
+  }
+
   private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
     int fetch = node.fetch == null ? 0 : ((RexLiteral) node.fetch).getValueAs(Integer.class);
     int offset = node.offset == null ? 0 : ((RexLiteral) node.offset).getValueAs(Integer.class);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index 4e8e1c4e61..69de8bcba6 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -81,6 +81,8 @@ public final class StageNodeSerDeUtils {
         return new MailboxSendNode(stageId);
       case "MailboxReceiveNode":
         return new MailboxReceiveNode(stageId);
+      case "ValueNode":
+        return new ValueNode(stageId);
       default:
         throw new IllegalArgumentException("Unknown node name: " + nodeName);
     }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
new file mode 100644
index 0000000000..082a4d3b36
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+public class ValueNode extends AbstractStageNode {
+  @ProtoProperties
+  private List<List<RexExpression>> _literalRows;
+
+  public ValueNode(int stageId) {
+    super(stageId);
+  }
+
+  public ValueNode(int currentStageId, DataSchema dataSchema,
+      ImmutableList<ImmutableList<RexLiteral>> literalTuples) {
+    super(currentStageId, dataSchema);
+    _literalRows = new ArrayList<>();
+    for (List<RexLiteral> literalTuple : literalTuples) {
+      List<RexExpression> literalRow = new ArrayList<>();
+      for (RexLiteral literal : literalTuple) {
+        literalRow.add(RexExpression.toRexExpression(literal));
+      }
+      _literalRows.add(literalRow);
+    }
+  }
+
+  public List<List<RexExpression>> getLiteralRows() {
+    return _literalRows;
+  }
+}
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 4223a1717a..0531cfaba8 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -257,8 +257,6 @@ public class QueryCompilationTest extends QueryEnvironmentTestBase {
         new Object[]{"SELECT a.col1, SUM(a.col3) FROM a", "'a.col1' is not being grouped"},
         // empty IN clause fails compilation
         new Object[]{"SELECT a.col1 FROM a WHERE a.col1 IN ()", "Encountered \"\" at line"},
-        // range filter queries are not supported right now
-        new Object[]{"SELECT a.col1 FROM a WHERE a.col1 > 'x' AND a.col1 < 'y'", "Range is not implemented yet"}
     };
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 58d5371691..95744c55a0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -37,11 +37,13 @@ import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
 import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.AggregateOperator;
 import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
+import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.operator.SortOperator;
@@ -144,6 +146,8 @@ public class WorkerQueryExecutor {
       return new SortOperator(getOperator(requestId, sortNode.getInputs().get(0), metadataMap),
           sortNode.getCollationKeys(), sortNode.getCollationDirections(), sortNode.getFetch(), sortNode.getOffset(),
           sortNode.getDataSchema());
+    } else if (stageNode instanceof ValueNode) {
+      return new LiteralValueOperator(stageNode.getDataSchema(), ((ValueNode) stageNode).getLiteralRows());
     } else {
       throw new UnsupportedOperationException(
           String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
new file mode 100644
index 0000000000..9ded3df8f2
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+public class LiteralValueOperator extends BaseOperator<TransferableBlock> {
+  private static final String EXPLAIN_NAME = "LITERAL_VALUE_PROVIDER";
+
+  private final DataSchema _dataSchema;
+  private final TransferableBlock _rexLiteralBlock;
+  private boolean _isLiteralBlockReturned;
+
+  public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows) {
+    _dataSchema = dataSchema;
+    _rexLiteralBlock = constructBlock(rexLiteralRows);
+    _isLiteralBlockReturned = false;
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    // WorkerExecutor doesn't use getChildOperators, returns null here.
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    if (!_isLiteralBlockReturned) {
+      _isLiteralBlockReturned = true;
+      return _rexLiteralBlock;
+    } else {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+    }
+  }
+
+  private TransferableBlock constructBlock(List<List<RexExpression>> rexLiteralRows) {
+    List<Object[]> blockContent = new ArrayList<>();
+    for (List<RexExpression> rexLiteralRow : rexLiteralRows) {
+      Object[] row = new Object[_dataSchema.size()];
+      for (int i = 0; i < _dataSchema.size(); i++) {
+        row[i] = ((RexExpression.Literal) rexLiteralRow.get(i)).getValue();
+      }
+      blockContent.add(row);
+    }
+    return new TransferableBlock(blockContent, _dataSchema, BaseDataBlock.Type.ROW);
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 5bfd3511bd..0aeecd58cf 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -200,6 +200,16 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
             + " WHERE a.col1 IN ('foo') AND b.col2 NOT IN ('')"},
 
+        // Range conditions with continuous and non-continuous range.
+        new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
+            + " WHERE a.col3 IN (1, 2, 3) OR (a.col3 > 10 AND a.col3 < 50)"},
+
+        new Object[]{"SELECT col1, SUM(col3) FROM a WHERE a.col3 BETWEEN 23 AND 36 "
+            + " GROUP BY col1 HAVING SUM(col3) > 10.0 AND MIN(col3) <> 123 AND MAX(col3) BETWEEN 10 AND 20"},
+
+        new Object[]{"SELECT col1, SUM(col3) FROM a WHERE (col3 > 0 AND col3 < 45) AND (col3 > 15 AND col3 < 50) "
+            + " GROUP BY col1 HAVING (SUM(col3) > 10 AND SUM(col3) < 20) AND (SUM(col3) > 30 AND SUM(col3) < 40)"},
+
         // Projection pushdown
         new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'"},
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org