You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/08 19:55:39 UTC
[pinot] branch master updated: [multisage] support ORDER BY (#9279)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 43f8903de0 [multisage] support ORDER BY (#9279)
43f8903de0 is described below
commit 43f8903de0287cec93b627c8b325804bee95d3e3
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Sep 8 12:55:32 2022 -0700
[multisage] support ORDER BY (#9279)
* unset limit
* initial support for order by
* adding tests
- this is not ready, since we dont do final stage reduce sort
* add exchange before sort
* fix test failure
* unset unset limit
* fix rebase
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../query/selection/SelectionOperatorService.java | 85 +-----------
.../query/selection/SelectionOperatorUtils.java | 81 ++++++++++++
.../rel/rules/PinotJoinExchangeNodeInsertRule.java | 12 +-
.../calcite/rel/rules/PinotQueryRuleSets.java | 2 +-
.../apache/calcite/rel/rules/PinotRuleUtils.java | 39 ++++++
.../rel/rules/PinotSortExchangeNodeInsertRule.java | 73 +++++++++++
.../query/parser/CalciteRexExpressionParser.java | 23 ++--
.../query/planner/logical/RelToStageConverter.java | 14 +-
.../apache/pinot/query/planner/stage/SortNode.java | 70 ++++++++++
.../query/planner/stage/StageNodeSerDeUtils.java | 2 +
.../pinot/query/QueryEnvironmentTestBase.java | 3 +-
.../runtime/executor/WorkerQueryExecutor.java | 7 +
.../pinot/query/runtime/operator/SortOperator.java | 142 +++++++++++++++++++++
.../query/runtime/utils/ServerRequestUtils.java | 14 +-
.../pinot/query/runtime/QueryRunnerTest.java | 7 +-
15 files changed, 461 insertions(+), 113 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 441e20dc99..16ca57d91f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -18,13 +18,10 @@
*/
package org.apache.pinot.core.query.selection;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
-import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -82,86 +79,8 @@ public class SelectionOperatorService {
_numRowsToKeep = _offset + queryContext.getLimit();
assert queryContext.getOrderByExpressions() != null;
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
- getTypeCompatibleComparator(queryContext.getOrderByExpressions()));
- }
-
- /**
- * Helper method to get the type-compatible {@link Comparator} for selection rows. (Inter segment)
- * <p>Type-compatible comparator allows compatible types to compare with each other.
- *
- * @return flexible {@link Comparator} for selection rows.
- */
- private Comparator<Object[]> getTypeCompatibleComparator(List<OrderByExpressionContext> orderByExpressions) {
- ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
-
- // Compare all single-value columns
- int numOrderByExpressions = orderByExpressions.size();
- List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
- for (int i = 0; i < numOrderByExpressions; i++) {
- if (!columnDataTypes[i].isArray()) {
- valueIndexList.add(i);
- }
- }
-
- int numValuesToCompare = valueIndexList.size();
- int[] valueIndices = new int[numValuesToCompare];
- boolean[] useDoubleComparison = new boolean[numValuesToCompare];
- // Use multiplier -1 or 1 to control ascending/descending order
- int[] multipliers = new int[numValuesToCompare];
- for (int i = 0; i < numValuesToCompare; i++) {
- int valueIndex = valueIndexList.get(i);
- valueIndices[i] = valueIndex;
- if (columnDataTypes[valueIndex].isNumber()) {
- useDoubleComparison[i] = true;
- }
- multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
- }
-
- if (_queryContext.isNullHandlingEnabled()) {
- return (o1, o2) -> {
- for (int i = 0; i < numValuesToCompare; i++) {
- int index = valueIndices[i];
- Object v1 = o1[index];
- Object v2 = o2[index];
- if (v1 == null) {
- // The default null ordering is: 'NULLS LAST'.
- return v2 == null ? 0 : -multipliers[i];
- } else if (v2 == null) {
- return multipliers[i];
- }
- int result;
- if (useDoubleComparison[i]) {
- result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
- } else {
- //noinspection unchecked
- result = ((Comparable) v1).compareTo(v2);
- }
- if (result != 0) {
- return result * multipliers[i];
- }
- }
- return 0;
- };
- } else {
- return (o1, o2) -> {
- for (int i = 0; i < numValuesToCompare; i++) {
- int index = valueIndices[i];
- Object v1 = o1[index];
- Object v2 = o2[index];
- int result;
- if (useDoubleComparison[i]) {
- result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
- } else {
- //noinspection unchecked
- result = ((Comparable) v1).compareTo(v2);
- }
- if (result != 0) {
- return result * multipliers[i];
- }
- }
- return 0;
- };
- }
+ SelectionOperatorUtils.getTypeCompatibleComparator(queryContext.getOrderByExpressions(), _dataSchema,
+ _queryContext.isNullHandlingEnabled()));
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index ac0f8d5b91..851608330d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -542,6 +543,86 @@ public class SelectionOperatorUtils {
return columnToIndexMap;
}
+ /**
+ * Helper method to get the type-compatible {@link Comparator} for selection rows. (Inter segment)
+ * <p>Type-compatible comparator allows compatible types to compare with each other.
+ *
+ * @return flexible {@link Comparator} for selection rows.
+ */
+ public static Comparator<Object[]> getTypeCompatibleComparator(List<OrderByExpressionContext> orderByExpressions,
+ DataSchema dataSchema, boolean isNullHandlingEnabled) {
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+ // Compare all single-value columns
+ int numOrderByExpressions = orderByExpressions.size();
+ List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+ for (int i = 0; i < numOrderByExpressions; i++) {
+ if (!columnDataTypes[i].isArray()) {
+ valueIndexList.add(i);
+ }
+ }
+
+ int numValuesToCompare = valueIndexList.size();
+ int[] valueIndices = new int[numValuesToCompare];
+ boolean[] useDoubleComparison = new boolean[numValuesToCompare];
+ // Use multiplier -1 or 1 to control ascending/descending order
+ int[] multipliers = new int[numValuesToCompare];
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int valueIndex = valueIndexList.get(i);
+ valueIndices[i] = valueIndex;
+ if (columnDataTypes[valueIndex].isNumber()) {
+ useDoubleComparison[i] = true;
+ }
+ multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
+ }
+
+ if (isNullHandlingEnabled) {
+ return (o1, o2) -> {
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int index = valueIndices[i];
+ Object v1 = o1[index];
+ Object v2 = o2[index];
+ if (v1 == null) {
+ // The default null ordering is: 'NULLS LAST'.
+ return v2 == null ? 0 : -multipliers[i];
+ } else if (v2 == null) {
+ return multipliers[i];
+ }
+ int result;
+ if (useDoubleComparison[i]) {
+ result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
+ } else {
+ //noinspection unchecked
+ result = ((Comparable) v1).compareTo(v2);
+ }
+ if (result != 0) {
+ return result * multipliers[i];
+ }
+ }
+ return 0;
+ };
+ } else {
+ return (o1, o2) -> {
+ for (int i = 0; i < numValuesToCompare; i++) {
+ int index = valueIndices[i];
+ Object v1 = o1[index];
+ Object v2 = o2[index];
+ int result;
+ if (useDoubleComparison[i]) {
+ result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
+ } else {
+ //noinspection unchecked
+ result = ((Comparable) v1).compareTo(v2);
+ }
+ if (result != 0) {
+ return result * multipliers[i];
+ }
+ }
+ return 0;
+ };
+ }
+ }
+
/**
* Helper method to add a value to a {@link PriorityQueue}.
*
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 072e639733..39af45cff7 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -22,10 +22,8 @@ import com.google.common.collect.ImmutableList;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.hint.RelHint;
@@ -55,7 +53,7 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
}
if (call.rel(0) instanceof Join) {
Join join = call.rel(0);
- return !isExchange(join.getLeft()) && !isExchange(join.getRight());
+ return !PinotRuleUtils.isExchange(join.getLeft()) && !PinotRuleUtils.isExchange(join.getRight());
}
return false;
}
@@ -92,12 +90,4 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
call.transformTo(newJoinNode);
}
-
- private static boolean isExchange(RelNode rel) {
- RelNode reference = rel;
- if (reference instanceof HepRelVertex) {
- reference = ((HepRelVertex) reference).getCurrentRel();
- }
- return reference instanceof Exchange;
- }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 55202805b2..fcba253d20 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -89,6 +89,6 @@ public class PinotQueryRuleSets {
// Pinot specific rules
PinotJoinExchangeNodeInsertRule.INSTANCE,
PinotAggregateExchangeNodeInsertRule.INSTANCE,
- PinotLogicalSortFetchEliminationRule.INSTANCE
+ PinotSortExchangeNodeInsertRule.INSTANCE
);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
new file mode 100644
index 0000000000..97b1074b7f
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+
+
+public class PinotRuleUtils {
+
+ private PinotRuleUtils() {
+ // do not instantiate.
+ }
+
+ public static boolean isExchange(RelNode rel) {
+ RelNode reference = rel;
+ if (reference instanceof HepRelVertex) {
+ reference = ((HepRelVertex) reference).getCurrentRel();
+ }
+ return reference instanceof Exchange;
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
new file mode 100644
index 0000000000..c590f2b652
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import java.util.Collections;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * Special rule for Pinot, this rule is fixed to always insert exchange after JOIN node.
+ */
+public class PinotSortExchangeNodeInsertRule extends RelOptRule {
+ public static final PinotSortExchangeNodeInsertRule INSTANCE =
+ new PinotSortExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
+
+ public PinotSortExchangeNodeInsertRule(RelBuilderFactory factory) {
+ super(operand(LogicalSort.class, any()), factory, null);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ if (call.rels.length < 1) {
+ return false;
+ }
+ if (call.rel(0) instanceof Sort) {
+ Sort sort = call.rel(0);
+ return sort.getCollation().getFieldCollations().size() > 0 && !PinotRuleUtils.isExchange(sort.getInput());
+ }
+ return false;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Sort sort = call.rel(0);
+ // TODO: this is a single value
+ LogicalExchange exchange = LogicalExchange.create(sort.getInput(), RelDistributions.hash(Collections.emptyList()));
+ call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
+ }
+
+ private static boolean isExchange(RelNode rel) {
+ RelNode reference = rel;
+ if (reference instanceof HepRelVertex) {
+ reference = ((HepRelVertex) reference).getCurrentRel();
+ }
+ return reference instanceof Exchange;
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index 70ee68b21c..804825cff1 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -18,10 +18,10 @@
*/
package org.apache.pinot.query.parser;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.ExpressionType;
@@ -88,28 +88,25 @@ public class CalciteRexExpressionParser {
return selectExpr;
}
- private static List<Expression> convertOrderByList(RexExpression.FunctionCall rexCall, PinotQuery pinotQuery) {
- Preconditions.checkState(rexCall.getKind() == SqlKind.ORDER_BY);
+ public static List<Expression> convertOrderByList(List<RexExpression> rexInputRefs,
+ List<RelFieldCollation.Direction> directions, PinotQuery pinotQuery) {
List<Expression> orderByExpr = new ArrayList<>();
- final Iterator<RexExpression> iterator = rexCall.getFunctionOperands().iterator();
- while (iterator.hasNext()) {
- final RexExpression next = iterator.next();
- orderByExpr.add(convertOrderBy(next, pinotQuery));
+ for (int i = 0; i < rexInputRefs.size(); i++) {
+ orderByExpr.add(convertOrderBy(rexInputRefs.get(i), directions.get(i), pinotQuery));
}
return orderByExpr;
}
- private static Expression convertOrderBy(RexExpression rexNode, PinotQuery pinotQuery) {
- final SqlKind kind = rexNode.getKind();
+ private static Expression convertOrderBy(RexExpression rexNode, RelFieldCollation.Direction direction,
+ PinotQuery pinotQuery) {
Expression expression;
- switch (kind) {
+ switch (direction) {
case DESCENDING:
- RexExpression.FunctionCall rexCall = (RexExpression.FunctionCall) rexNode;
expression = RequestUtils.getFunctionExpression("DESC");
- expression.getFunctionCall().addToOperands(toExpression(rexCall.getFunctionOperands().get(0), pinotQuery));
+ expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
break;
- case IDENTIFIER:
+ case ASCENDING:
default:
expression = RequestUtils.getFunctionExpression("ASC");
expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index c7daa877f8..0729ad1ab9 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -28,11 +28,13 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
@@ -40,6 +42,7 @@ import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
@@ -72,11 +75,20 @@ public final class RelToStageConverter {
return convertLogicalFilter((LogicalFilter) node, currentStageId);
} else if (node instanceof LogicalAggregate) {
return convertLogicalAggregate((LogicalAggregate) node, currentStageId);
+ } else if (node instanceof LogicalSort) {
+ return convertLogicalSort((LogicalSort) node, currentStageId);
} else {
- throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
+ throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
}
}
+ private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
+ int fetch = node.fetch == null ? -1 : ((RexLiteral) node.fetch).getValueAs(Integer.class);
+ int offset = node.offset == null ? -1 : ((RexLiteral) node.offset).getValueAs(Integer.class);
+ return new SortNode(currentStageId, node.getCollation().getFieldCollations(), fetch, offset,
+ toDataSchema(node.getRowType()));
+ }
+
private static StageNode convertLogicalAggregate(LogicalAggregate node, int currentStageId) {
return new AggregateNode(currentStageId, toDataSchema(node.getRowType()), node.getAggCallList(),
node.getGroupSet());
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
new file mode 100644
index 0000000000..6dc27ce12e
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+public class SortNode extends AbstractStageNode {
+ @ProtoProperties
+ private List<RexExpression> _collationKeys;
+ @ProtoProperties
+ private List<RelFieldCollation.Direction> _collationDirections;
+ @ProtoProperties
+ private int _fetch;
+ @ProtoProperties
+ private int _offset;
+
+ public SortNode(int stageId) {
+ super(stageId);
+ }
+
+ public SortNode(int stageId, List<RelFieldCollation> fieldCollations, int fetch, int offset, DataSchema dataSchema) {
+ super(stageId, dataSchema);
+ _collationDirections = new ArrayList<>(fieldCollations.size());
+ _collationKeys = new ArrayList<>(fieldCollations.size());
+ _fetch = fetch;
+ _offset = offset;
+ for (RelFieldCollation fieldCollation : fieldCollations) {
+ _collationDirections.add(fieldCollation.getDirection());
+ _collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+ }
+ }
+
+ public List<RexExpression> getCollationKeys() {
+ return _collationKeys;
+ }
+
+ public List<RelFieldCollation.Direction> getCollationDirections() {
+ return _collationDirections;
+ }
+
+ public int getFetch() {
+ return _fetch;
+ }
+
+ public int getOffset() {
+ return _offset;
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index c76792371f..4e8e1c4e61 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -75,6 +75,8 @@ public final class StageNodeSerDeUtils {
return new FilterNode(stageId);
case "AggregateNode":
return new AggregateNode(stageId);
+ case "SortNode":
+ return new SortNode(stageId);
case "MailboxSendNode":
return new MailboxSendNode(stageId);
case "MailboxReceiveNode":
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index f740709d09..f83eea0fb3 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -43,7 +43,8 @@ public class QueryEnvironmentTestBase {
@DataProvider(name = "testQueryDataProvider")
protected Object[][] provideQueries() {
return new Object[][] {
- new Object[]{"SELECT * FROM a LIMIT 10"},
+ new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 10"},
+ new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 10"},
new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2"},
new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0"},
new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3"},
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index f77b1a0286..018b3dc82c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -35,6 +35,7 @@ import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -43,6 +44,7 @@ import org.apache.pinot.query.runtime.operator.FilterOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.SortOperator;
import org.apache.pinot.query.runtime.operator.TransformOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -136,6 +138,11 @@ public class WorkerQueryExecutor {
ProjectNode projectNode = (ProjectNode) stageNode;
return new TransformOperator(getOperator(requestId, projectNode.getInputs().get(0), metadataMap),
projectNode.getDataSchema(), projectNode.getProjects(), projectNode.getInputs().get(0).getDataSchema());
+ } else if (stageNode instanceof SortNode) {
+ SortNode sortNode = (SortNode) stageNode;
+ return new SortOperator(getOperator(requestId, sortNode.getInputs().get(0), metadataMap),
+ sortNode.getCollationKeys(), sortNode.getCollationDirections(), sortNode.getFetch(), sortNode.getOffset(),
+ sortNode.getDataSchema());
} else {
throw new UnsupportedOperationException(
String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
new file mode 100644
index 0000000000..70a4536fde
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+public class SortOperator extends BaseOperator<TransferableBlock> {
+ private static final String EXPLAIN_NAME = "SORT";
+ private final BaseOperator<TransferableBlock> _upstreamOperator;
+ private final int _fetch;
+ private final int _offset;
+ private final DataSchema _dataSchema;
+ private final PriorityQueue<Object[]> _rows;
+ private final int _numRowsToKeep;
+
+ private boolean _isSortedBlockConstructed;
+ private TransferableBlock _upstreamErrorBlock;
+
+ public SortOperator(BaseOperator<TransferableBlock> upstreamOperator, List<RexExpression> collationKeys,
+ List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema) {
+ _upstreamOperator = upstreamOperator;
+ _fetch = fetch;
+ _offset = offset;
+ _dataSchema = dataSchema;
+ _upstreamErrorBlock = null;
+ _isSortedBlockConstructed = false;
+ _numRowsToKeep = _fetch > 0 ? Math.min(SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
+ _fetch + (Math.max(_offset, 0))) : SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY;
+ _rows = new PriorityQueue<>(_numRowsToKeep, SelectionOperatorUtils.getTypeCompatibleComparator(
+ toOrderByExpressions(collationKeys, collationDirections), dataSchema, false));
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ // WorkerExecutor doesn't use getChildOperators, returns null here.
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ try {
+ consumeInputBlocks();
+ return produceSortedBlock();
+ } catch (Exception e) {
+ return TransferableBlockUtils.getErrorTransferableBlock(e);
+ }
+ }
+
+ private TransferableBlock produceSortedBlock()
+ throws IOException {
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ }
+ if (!_isSortedBlockConstructed) {
+ int currentOffset = 0;
+ List<Object[]> rows = new ArrayList<>(_rows.size());
+ while (_rows.size() > 0) {
+ Object[] row = _rows.poll();
+ if (currentOffset > _offset) {
+ rows.add(row);
+ }
+ currentOffset++;
+ }
+ _isSortedBlockConstructed = true;
+ if (rows.size() == 0) {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+ } else {
+ return new TransferableBlock(rows, _dataSchema, BaseDataBlock.Type.ROW);
+ }
+ } else {
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+ }
+ }
+
+ private void consumeInputBlocks() {
+ if (!_isSortedBlockConstructed) {
+ TransferableBlock block = _upstreamOperator.nextBlock();
+ while (!TransferableBlockUtils.isEndOfStream(block)) {
+ BaseDataBlock dataBlock = block.getDataBlock();
+ int numRows = dataBlock.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+ SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
+ }
+ block = _upstreamOperator.nextBlock();
+ }
+ // setting upstream error block
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock = block;
+ }
+ }
+ }
+
+ private List<OrderByExpressionContext> toOrderByExpressions(List<RexExpression> collationKeys,
+ List<RelFieldCollation.Direction> collationDirections) {
+ List<OrderByExpressionContext> orderByExpressionContextList = new ArrayList<>(collationKeys.size());
+ for (int i = 0; i < collationKeys.size(); i++) {
+ orderByExpressionContextList.add(new OrderByExpressionContext(ExpressionContext.forIdentifier(
+ _dataSchema.getColumnName(((RexExpression.InputRef) collationKeys.get(i)).getIndex())),
+ !collationDirections.get(i).isDescending()));
+ }
+ return orderByExpressionContextList;
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 738992fe52..07e02bdb8b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -33,6 +33,7 @@ import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -46,7 +47,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
* conversion step is needed so that the V2 query plan can be converted into a compatible format to run V1 executor.
*/
public class ServerRequestUtils {
- private static final int DEFAULT_LEAF_NODE_LIMIT = 1_000_000;
+ private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
private ServerRequestUtils() {
// do not instantiate.
@@ -116,6 +117,17 @@ public class ServerRequestUtils {
// set group-by list
pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(
((AggregateNode) node).getGroupSet(), pinotQuery));
+ } else if (node instanceof SortNode) {
+ if (((SortNode) node).getCollationKeys().size() > 0) {
+ pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(((SortNode) node).getCollationKeys(),
+ ((SortNode) node).getCollationDirections(), pinotQuery));
+ }
+ if (((SortNode) node).getFetch() > 0) {
+ pinotQuery.setLimit(((SortNode) node).getFetch());
+ }
+ if (((SortNode) node).getOffset() > 0) {
+ pinotQuery.setOffset(((SortNode) node).getOffset());
+ }
} else if (node instanceof MailboxSendNode) {
// TODO: MailboxSendNode should be the root of the leaf stage. but ignore for now since it is handle seperately
// in QueryRunner as a single step sender.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 462c5da0c3..89da8574ea 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -66,8 +66,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
@DataProvider(name = "testDataWithSqlToFinalRowCount")
private Object[][] provideTestSqlAndRowCount() {
return new Object[][] {
- new Object[]{"SELECT * FROM b", 5},
- new Object[]{"SELECT * FROM a", 15},
+ new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3", 3},
+ new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20", 15},
// No match filter
new Object[]{"SELECT * FROM b WHERE col3 < 0", 0},
@@ -154,6 +154,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
new Object[]{"SELECT a.col2, COUNT(*), MAX(a.col3), MIN(a.col3), SUM(a.col3) FROM a GROUP BY a.col2 "
+ "HAVING COUNT(*) < 5 OR (COUNT(*) > 5 AND SUM(a.col3) >= 10)"
+ "OR (MIN(a.col3) != 20 AND SUM(a.col3) = 100)", 3},
+
+ // Order-by
+ new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON a.col1 = b.col1 ORDER BY a.col3, b.col3 DESC", 15},
};
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org