You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/30 18:25:40 UTC
[pinot] branch master updated: [multistage] support sort push-down (#9832)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 041865a80f [multistage] support sort push-down (#9832)
041865a80f is described below
commit 041865a80f7a2359270571a2343049bb1f294fe5
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Wed Nov 30 10:25:34 2022 -0800
[multistage] support sort push-down (#9832)
---
.../query/selection/SelectionOperatorService.java | 17 +-
.../query/selection/SelectionOperatorUtils.java | 15 +
.../rel/rules/ImmutableSortExchangeCopyRule.java | 414 +++++++++++++++++++++
.../calcite/rel/rules/PinotQueryRuleSets.java | 9 +-
.../rel/rules/PinotSortExchangeCopyRule.java | 114 ++++++
.../rel/rules/PinotSortExchangeNodeInsertRule.java | 18 +-
.../query/planner/logical/RelToStageConverter.java | 5 +-
.../query/planner/logical/RexExpressionUtils.java | 10 +
.../pinot/query/planner/logical/StagePlanner.java | 6 +-
.../apache/pinot/query/planner/stage/SortNode.java | 4 +-
.../rel/rules/PinotSortExchangeCopyRuleTest.java | 249 +++++++++++++
.../apache/pinot/query/runtime/QueryRunner.java | 46 ++-
.../pinot/query/runtime/operator/SortOperator.java | 2 +-
.../src/test/resources/queries/OrderBy.json | 232 ++++++++++++
14 files changed, 1111 insertions(+), 30 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index f77c936a21..525ab7af9e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -25,7 +25,6 @@ import java.util.PriorityQueue;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.utils.LoopUtils;
import org.roaringbitmap.RoaringBitmap;
@@ -134,18 +133,7 @@ public class SelectionOperatorService {
public ResultTable renderResultTableWithOrdering() {
int[] columnIndices = SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
int numColumns = columnIndices.length;
-
- // Construct the result data schema
- String[] columnNames = _dataSchema.getColumnNames();
- ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
- String[] resultColumnNames = new String[numColumns];
- ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
- for (int i = 0; i < numColumns; i++) {
- int columnIndex = columnIndices[i];
- resultColumnNames[i] = columnNames[columnIndex];
- resultColumnDataTypes[i] = columnDataTypes[columnIndex];
- }
- DataSchema resultDataSchema = new DataSchema(resultColumnNames, resultColumnDataTypes);
+ DataSchema resultDataSchema = SelectionOperatorUtils.getSchemaForProjection(_dataSchema, columnIndices);
// Extract the result rows
LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
@@ -156,9 +144,10 @@ public class SelectionOperatorService {
for (int i = 0; i < numColumns; i++) {
Object value = row[columnIndices[i]];
if (value != null) {
- extractedRow[i] = resultColumnDataTypes[i].convertAndFormat(value);
+ extractedRow[i] = resultDataSchema.getColumnDataType(i).convertAndFormat(value);
}
}
+
rowsInSelectionResults.addFirst(extractedRow);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index e53b4f2982..bda3efab0b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -656,4 +656,19 @@ public class SelectionOperatorUtils {
queue.offer(value);
}
}
+
+ public static DataSchema getSchemaForProjection(DataSchema dataSchema, int[] columnIndices) {
+ int numColumns = columnIndices.length;
+
+ String[] columnNames = dataSchema.getColumnNames();
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ String[] resultColumnNames = new String[numColumns];
+ ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ int columnIndex = columnIndices[i];
+ resultColumnNames[i] = columnNames[columnIndex];
+ resultColumnDataTypes[i] = columnDataTypes[columnIndex];
+ }
+ return new DataSchema(resultColumnNames, resultColumnDataTypes);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java
new file mode 100644
index 0000000000..eca7fc6110
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+// NOTE: this file was generated using Calcite's code generator, but instead of pulling in all
+// the dependencies for codegen we just manually generate it and check it in. If active development
+// on this needs to happen, re-generate it using Calcite's generator.
+
+// CHECKSTYLE:OFF
+
+import com.google.common.base.MoreObjects;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * {@code ImmutableSortExchangeCopyRule} contains immutable implementation classes generated from
+ * abstract value types defined as nested inside {@link SortExchangeCopyRule}.
+ * @see ImmutableSortExchangeCopyRule.Config
+ */
+@SuppressWarnings({"all"})
+final class ImmutableSortExchangeCopyRule {
+ private ImmutableSortExchangeCopyRule() {
+ }
+
+ /**
+ * Immutable implementation of {@link SortExchangeCopyRule.Config}.
+ * <p>
+ * Use the builder to create immutable instances:
+ * {@code ImmutableSortExchangeCopyRule.Config.builder()}.
+ * Use the static factory method to get the default singleton instance:
+ * {@code ImmutableSortExchangeCopyRule.Config.of()}.
+ */
+ static final class Config implements PinotSortExchangeCopyRule.Config {
+ private final RelBuilderFactory relBuilderFactory;
+ private final @Nullable String description;
+ private final RelRule.OperandTransform operandSupplier;
+
+ private Config() {
+ this.description = null;
+ this.relBuilderFactory = initShim.relBuilderFactory();
+ this.operandSupplier = initShim.operandSupplier();
+ this.initShim = null;
+ }
+
+ private Config(ImmutableSortExchangeCopyRule.Config.Builder builder) {
+ this.description = builder.description;
+ if (builder.relBuilderFactory != null) {
+ initShim.withRelBuilderFactory(builder.relBuilderFactory);
+ }
+ if (builder.operandSupplier != null) {
+ initShim.withOperandSupplier(builder.operandSupplier);
+ }
+ this.relBuilderFactory = initShim.relBuilderFactory();
+ this.operandSupplier = initShim.operandSupplier();
+ this.initShim = null;
+ }
+
+ private Config(RelBuilderFactory relBuilderFactory,
+ @Nullable String description,
+ RelRule.OperandTransform operandSupplier) {
+ this.relBuilderFactory = relBuilderFactory;
+ this.description = description;
+ this.operandSupplier = operandSupplier;
+ this.initShim = null;
+ }
+
+ private static final byte STAGE_INITIALIZING = -1;
+ private static final byte STAGE_UNINITIALIZED = 0;
+ private static final byte STAGE_INITIALIZED = 1;
+ @SuppressWarnings("Immutable")
+ private transient volatile InitShim initShim = new InitShim();
+
+ private final class InitShim {
+ private byte relBuilderFactoryBuildStage = STAGE_UNINITIALIZED;
+ private RelBuilderFactory relBuilderFactory;
+
+ RelBuilderFactory relBuilderFactory() {
+ if (relBuilderFactoryBuildStage == STAGE_INITIALIZING) {
+ throw new IllegalStateException(formatInitCycleMessage());
+ }
+ if (relBuilderFactoryBuildStage == STAGE_UNINITIALIZED) {
+ relBuilderFactoryBuildStage = STAGE_INITIALIZING;
+ this.relBuilderFactory = Objects.requireNonNull(relBuilderFactoryInitialize(), "relBuilderFactory");
+ relBuilderFactoryBuildStage = STAGE_INITIALIZED;
+ }
+ return this.relBuilderFactory;
+ }
+
+ void withRelBuilderFactory(RelBuilderFactory relBuilderFactory) {
+ this.relBuilderFactory = relBuilderFactory;
+ relBuilderFactoryBuildStage = STAGE_INITIALIZED;
+ }
+
+ private byte operandSupplierBuildStage = STAGE_UNINITIALIZED;
+ private RelRule.OperandTransform operandSupplier;
+
+ RelRule.OperandTransform operandSupplier() {
+ if (operandSupplierBuildStage == STAGE_INITIALIZING) {
+ throw new IllegalStateException(formatInitCycleMessage());
+ }
+ if (operandSupplierBuildStage == STAGE_UNINITIALIZED) {
+ operandSupplierBuildStage = STAGE_INITIALIZING;
+ this.operandSupplier = Objects.requireNonNull(operandSupplierInitialize(), "operandSupplier");
+ operandSupplierBuildStage = STAGE_INITIALIZED;
+ }
+ return this.operandSupplier;
+ }
+
+ void withOperandSupplier(RelRule.OperandTransform operandSupplier) {
+ this.operandSupplier = operandSupplier;
+ operandSupplierBuildStage = STAGE_INITIALIZED;
+ }
+
+ private String formatInitCycleMessage() {
+ List<String> attributes = new ArrayList<>();
+ if (relBuilderFactoryBuildStage == STAGE_INITIALIZING) {
+ attributes.add("relBuilderFactory");
+ }
+ if (operandSupplierBuildStage == STAGE_INITIALIZING) {
+ attributes.add("operandSupplier");
+ }
+ return "Cannot build Config, attribute initializers form cycle " + attributes;
+ }
+ }
+
+ private RelBuilderFactory relBuilderFactoryInitialize() {
+ return PinotSortExchangeCopyRule.Config.super.relBuilderFactory();
+ }
+
+ private RelRule.OperandTransform operandSupplierInitialize() {
+ return PinotSortExchangeCopyRule.Config.super.operandSupplier();
+ }
+
+ /**
+ * @return The value of the {@code relBuilderFactory} attribute
+ */
+ @Override
+ public RelBuilderFactory relBuilderFactory() {
+ InitShim shim = this.initShim;
+ return shim != null ? shim.relBuilderFactory() : this.relBuilderFactory;
+ }
+
+ /**
+ * @return The value of the {@code description} attribute
+ */
+ @Override
+ public @Nullable String description() {
+ return description;
+ }
+
+ /**
+ * @return The value of the {@code operandSupplier} attribute
+ */
+ @Override
+ public RelRule.OperandTransform operandSupplier() {
+ InitShim shim = this.initShim;
+ return shim != null ? shim.operandSupplier() : this.operandSupplier;
+ }
+
+ /**
+ * Copy the current immutable object by setting a value for the
+ * {@link SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory} attribute.
+ * A shallow reference equality check is used to prevent copying of the same value by returning {@code this}.
+ * @param value A new value for relBuilderFactory
+ * @return A modified copy of the {@code this} object
+ */
+ public final ImmutableSortExchangeCopyRule.Config withRelBuilderFactory(RelBuilderFactory value) {
+ if (this.relBuilderFactory == value) {
+ return this;
+ }
+ RelBuilderFactory newValue = Objects.requireNonNull(value, "relBuilderFactory");
+ return validate(new ImmutableSortExchangeCopyRule.Config(newValue, this.description, this.operandSupplier));
+ }
+
+ /**
+ * Copy the current immutable object by setting a value for the {@link SortExchangeCopyRule.Config#description()
+ * description} attribute.
+ * An equals check used to prevent copying of the same value by returning {@code this}.
+ * @param value A new value for description (can be {@code null})
+ * @return A modified copy of the {@code this} object
+ */
+ public final ImmutableSortExchangeCopyRule.Config withDescription(
+ @Nullable String value) {
+ if (Objects.equals(this.description, value)) {
+ return this;
+ }
+ return validate(new ImmutableSortExchangeCopyRule.Config(this.relBuilderFactory, value, this.operandSupplier));
+ }
+
+ /**
+ * Copy the current immutable object by setting a value for the
+ * {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier} attribute.
+ * A shallow reference equality check is used to prevent copying of the same value by returning {@code this}.
+ * @param value A new value for operandSupplier
+ * @return A modified copy of the {@code this} object
+ */
+ public final ImmutableSortExchangeCopyRule.Config withOperandSupplier(RelRule.OperandTransform value) {
+ if (this.operandSupplier == value) {
+ return this;
+ }
+ RelRule.OperandTransform newValue = Objects.requireNonNull(value, "operandSupplier");
+ return validate(new ImmutableSortExchangeCopyRule.Config(this.relBuilderFactory, this.description, newValue));
+ }
+
+ /**
+ * This instance is equal to all instances of {@code Config} that have equal attribute values.
+ * @return {@code true} if {@code this} is equal to {@code another} instance
+ */
+ @Override
+ public boolean equals(@Nullable Object another) {
+ if (this == another) {
+ return true;
+ }
+ return another instanceof ImmutableSortExchangeCopyRule.Config && equalTo(
+ (ImmutableSortExchangeCopyRule.Config) another);
+ }
+
+ private boolean equalTo(ImmutableSortExchangeCopyRule.Config another) {
+ return relBuilderFactory.equals(another.relBuilderFactory) && Objects.equals(description, another.description)
+ && operandSupplier.equals(another.operandSupplier);
+ }
+
+ /**
+ * Computes a hash code from attributes: {@code relBuilderFactory}, {@code description}, {@code operandSupplier}.
+ * @return hashCode value
+ */
+ @Override
+ public int hashCode() {
+ int h = 5381;
+ h += (h << 5) + relBuilderFactory.hashCode();
+ h += (h << 5) + Objects.hashCode(description);
+ h += (h << 5) + operandSupplier.hashCode();
+ return h;
+ }
+
+ /**
+ * Prints the immutable value {@code Config} with attribute values.
+ * @return A string representation of the value
+ */
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper("Config").omitNullValues().add("relBuilderFactory", relBuilderFactory)
+ .add("description", description).add("operandSupplier", operandSupplier).toString();
+ }
+
+ private static final ImmutableSortExchangeCopyRule.Config INSTANCE =
+ validate(new ImmutableSortExchangeCopyRule.Config());
+
+ /**
+ * Returns the default immutable singleton value of {@code Config}
+ * @return An immutable instance of Config
+ */
+ public static ImmutableSortExchangeCopyRule.Config of() {
+ return INSTANCE;
+ }
+
+ private static ImmutableSortExchangeCopyRule.Config validate(ImmutableSortExchangeCopyRule.Config instance) {
+ return INSTANCE != null && INSTANCE.equalTo(instance) ? INSTANCE : instance;
+ }
+
+ /**
+ * Creates an immutable copy of a {@link SortExchangeCopyRule.Config} value.
+ * Uses accessors to get values to initialize the new immutable instance.
+ * If an instance is already immutable, it is returned as is.
+ * @param instance The instance to copy
+ * @return A copied immutable Config instance
+ */
+ public static ImmutableSortExchangeCopyRule.Config copyOf(PinotSortExchangeCopyRule.Config instance) {
+ if (instance instanceof ImmutableSortExchangeCopyRule.Config) {
+ return (ImmutableSortExchangeCopyRule.Config) instance;
+ }
+ return ImmutableSortExchangeCopyRule.Config.builder().from(instance).build();
+ }
+
+ /**
+ * Creates a builder for {@link ImmutableSortExchangeCopyRule.Config Config}.
+ * <pre>
+ * ImmutableSortExchangeCopyRule.Config.builder()
+ * .withRelBuilderFactory(org.apache.calcite.tools.RelBuilderFactory)
+ * // optional {@link SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory}
+ * .withDescription(@org.checkerframework.checker.nullness.qual.Nullable String | null)
+ * // nullable {@link SortExchangeCopyRule.Config#description() description}
+ * .withOperandSupplier(org.apache.calcite.plan.RelRule.OperandTransform)
+ * // optional {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier}
+ * .build();
+ * </pre>
+ * @return A new Config builder
+ */
+ public static ImmutableSortExchangeCopyRule.Config.Builder builder() {
+ return new ImmutableSortExchangeCopyRule.Config.Builder();
+ }
+
+ /**
+ * Builds instances of type {@link ImmutableSortExchangeCopyRule.Config Config}.
+ * Initialize attributes and then invoke the {@link #build()} method to create an
+ * immutable instance.
+ * <p><em>{@code Builder} is not thread-safe and generally should not be stored in a field or collection,
+ * but instead used immediately to create instances.</em>
+ */
+ @NotThreadSafe
+ public static final class Builder {
+ private @Nullable RelBuilderFactory relBuilderFactory;
+ private @Nullable String description;
+ private @Nullable RelRule.OperandTransform operandSupplier;
+
+ private Builder() {
+ }
+
+ /**
+ * Fill a builder with attribute values from the provided {@code org.apache.calcite.plan.RelRule.Config} instance.
+ * @param instance The instance from which to copy values
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder from(RelRule.Config instance) {
+ Objects.requireNonNull(instance, "instance");
+ from((Object) instance);
+ return this;
+ }
+
+ /**
+ * Fill a builder with attribute values from the provided {@code org.apache.calcite.rel.rules
+ * .SortExchangeCopyRule.Config} instance.
+ * @param instance The instance from which to copy values
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder from(PinotSortExchangeCopyRule.Config instance) {
+ Objects.requireNonNull(instance, "instance");
+ from((Object) instance);
+ return this;
+ }
+
+ private void from(Object object) {
+ if (object instanceof RelRule.Config) {
+ RelRule.Config instance = (RelRule.Config) object;
+ withRelBuilderFactory(instance.relBuilderFactory());
+ withOperandSupplier(instance.operandSupplier());
+ @Nullable
+ String descriptionValue =
+ instance.description();
+ if (descriptionValue != null) {
+ withDescription(descriptionValue);
+ }
+ }
+ }
+
+ /**
+ * Initializes the value for the {@link SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory}
+ * attribute.
+ * <p><em>If not set, this attribute will have a default value as returned by the initializer of
+ * {@link SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory}.</em>
+ * @param relBuilderFactory The value for relBuilderFactory
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder withRelBuilderFactory(RelBuilderFactory relBuilderFactory) {
+ this.relBuilderFactory = Objects.requireNonNull(relBuilderFactory, "relBuilderFactory");
+ return this;
+ }
+
+ /**
+ * Initializes the value for the {@link SortExchangeCopyRule.Config#description() description} attribute.
+ * @param description The value for description (can be {@code null})
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder withDescription(
+ @Nullable String description) {
+ this.description = description;
+ return this;
+ }
+
+ /**
+ * Initializes the value for the {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier} attribute.
+ * <p><em>If not set, this attribute will have a default value as returned by the initializer of
+ * {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier}.</em>
+ * @param operandSupplier The value for operandSupplier
+ * @return {@code this} builder for use in a chained invocation
+ */
+ public final Builder withOperandSupplier(RelRule.OperandTransform operandSupplier) {
+ this.operandSupplier = Objects.requireNonNull(operandSupplier, "operandSupplier");
+ return this;
+ }
+
+ /**
+ * Builds a new {@link ImmutableSortExchangeCopyRule.Config Config}.
+ * @return An immutable instance of Config
+ * @throws java.lang.IllegalStateException if any required attributes are missing
+ */
+ public ImmutableSortExchangeCopyRule.Config build() {
+ return ImmutableSortExchangeCopyRule.Config.validate(new ImmutableSortExchangeCopyRule.Config(this));
+ }
+ }
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 4ed6be0a23..1828abd0e8 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -59,9 +59,15 @@ public class PinotQueryRuleSets {
CoreRules.PROJECT_MERGE,
// remove identity project
CoreRules.PROJECT_REMOVE,
+ // add an extra exchange for sort
+ PinotSortExchangeNodeInsertRule.INSTANCE,
+ // copy exchanges down
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY,
// reorder sort and projection
CoreRules.SORT_PROJECT_TRANSPOSE,
+ // TODO: evaluate the SORT_JOIN_TRANSPOSE and SORT_JOIN_COPY rules
+
// join rules
CoreRules.JOIN_PUSH_EXPRESSIONS,
@@ -89,7 +95,6 @@ public class PinotQueryRuleSets {
// Pinot specific rules
PinotFilterExpandSearchRule.INSTANCE,
PinotJoinExchangeNodeInsertRule.INSTANCE,
- PinotAggregateExchangeNodeInsertRule.INSTANCE,
- PinotSortExchangeNodeInsertRule.INSTANCE
+ PinotAggregateExchangeNodeInsertRule.INSTANCE
);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
new file mode 100644
index 0000000000..7f163ca99b
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.metadata.RelMdUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.pinot.query.planner.logical.RexExpressionUtils;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+
+
+public class PinotSortExchangeCopyRule extends RelRule<RelRule.Config> {
+
+ public static final PinotSortExchangeCopyRule SORT_EXCHANGE_COPY =
+ PinotSortExchangeCopyRule.Config.DEFAULT.toRule();
+ private static final TypeFactory TYPE_FACTORY = new TypeFactory(new TypeSystem());
+
+ /**
+ * Creates a PinotSortExchangeCopyRule.
+ */
+ protected PinotSortExchangeCopyRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Sort sort = call.rel(0);
+ final SortExchange exchange = call.rel(1);
+ final RelMetadataQuery metadataQuery = call.getMetadataQuery();
+
+ if (RelMdUtil.checkInputForCollationAndLimit(
+ metadataQuery,
+ exchange.getInput(),
+ sort.getCollation(),
+ sort.offset,
+ sort.fetch)) {
+ // Don't rewrite anything if the input is already sorted AND the
+ // input node would already return fewer than sort.offset + sort.fetch
+ // rows (e.g. there is already an inner limit applied)
+ return;
+ }
+
+ RelCollation collation = sort.getCollation();
+ Preconditions.checkArgument(
+ collation.equals(exchange.getCollation()),
+ "Expected collation on exchange and sort to be the same"
+ );
+
+ final RexNode fetch;
+ if (sort.fetch == null) {
+ fetch = null;
+ } else if (sort.offset == null) {
+ fetch = sort.fetch;
+ } else {
+ RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
+ int total = RexExpressionUtils.getValueAsInt(sort.fetch) + RexExpressionUtils.getValueAsInt(sort.offset);
+ fetch = rexBuilder.makeLiteral(total, TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+ }
+
+ final RelNode newExchangeInput = sort.copy(sort.getTraitSet(), exchange.getInput(), collation, null, fetch);
+ final RelNode exchangeCopy = exchange.copy(exchange.getTraitSet(), newExchangeInput, exchange.getDistribution());
+ final RelNode sortCopy = sort.copy(sort.getTraitSet(), exchangeCopy, collation, sort.offset, sort.fetch);
+
+ call.transformTo(sortCopy);
+ }
+
+ public interface Config extends RelRule.Config {
+
+ Config DEFAULT = ImmutableSortExchangeCopyRule.Config.of()
+ .withOperandFor(LogicalSort.class, LogicalSortExchange.class);
+
+ @Override default PinotSortExchangeCopyRule toRule() {
+ return new PinotSortExchangeCopyRule(this);
+ }
+
+ /** Defines an operand tree for the given classes. */
+
+ default Config withOperandFor(Class<? extends Sort> sortClass,
+ Class<? extends SortExchange> exchangeClass) {
+ return withOperandSupplier(b0 ->
+ b0.operand(sortClass).oneInput(b1 ->
+ b1.operand(exchangeClass).anyInputs()))
+ .as(Config.class);
+ }
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
index fe57992074..f03b78dc3d 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -23,13 +23,21 @@ import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.tools.RelBuilderFactory;
/**
- * Special rule for Pinot, this rule is fixed to always insert exchange after SORT node.
+ * Rewrite any sort into a relation that adds an exchange and pushes down the collation
+ * to the rest of the tree. This happens for two reasons:
+ * <ol>
+ * <li>Sort needs to be a distributed operation, if there are multiple nodes that are
+ * scanning data the sort ordering must be applied globally.</li>
+ * <li>It is ideal to push down the sort ordering as far as possible. If upstream nodes
+ * can send data in sorted order, then we can apply N-way merge sort and early terminate
+ * once all nodes have sent data that is no longer in the top OFFSET+LIMIT.</li>
+ * </ol>
*/
public class PinotSortExchangeNodeInsertRule extends RelOptRule {
public static final PinotSortExchangeNodeInsertRule INSTANCE =
@@ -54,8 +62,10 @@ public class PinotSortExchangeNodeInsertRule extends RelOptRule {
@Override
public void onMatch(RelOptRuleCall call) {
Sort sort = call.rel(0);
- // TODO: this is a single value
- LogicalExchange exchange = LogicalExchange.create(sort.getInput(), RelDistributions.hash(Collections.emptyList()));
+ LogicalSortExchange exchange = LogicalSortExchange.create(
+ sort.getInput(),
+ RelDistributions.hash(Collections.emptyList()),
+ sort.getCollation());
call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index de6a87ca88..cd29edac30 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -33,7 +33,6 @@ import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.stage.AggregateNode;
@@ -88,8 +87,8 @@ public final class RelToStageConverter {
}
private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
- int fetch = node.fetch == null ? 0 : ((RexLiteral) node.fetch).getValueAs(Integer.class);
- int offset = node.offset == null ? 0 : ((RexLiteral) node.offset).getValueAs(Integer.class);
+ int fetch = RexExpressionUtils.getValueAsInt(node.fetch);
+ int offset = RexExpressionUtils.getValueAsInt(node.offset);
return new SortNode(currentStageId, node.getCollation().getFieldCollations(), fetch, offset,
toDataSchema(node.getRowType()));
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index cc66047651..314b0baa54 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -86,4 +86,14 @@ public class RexExpressionUtils {
}
return result;
}
+
+ public static Integer getValueAsInt(RexNode in) {
+ if (in == null) {
+ return 0;
+ }
+
+ Preconditions.checkArgument(in instanceof RexLiteral, "expected literal, got " + in);
+ RexLiteral literal = (RexLiteral) in;
+ return literal.getValueAs(Integer.class);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 5f46b23d26..736794c283 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.core.Exchange;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
@@ -92,7 +92,7 @@ public class StagePlanner {
private StageNode walkRelPlan(RelNode node, int currentStageId) {
if (isExchangeNode(node)) {
StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
- RelDistribution distribution = ((LogicalExchange) node).getDistribution();
+ RelDistribution distribution = ((Exchange) node).getDistribution();
return createSendReceivePair(nextStageRoot, distribution, currentStageId);
} else {
StageNode stageNode = RelToStageConverter.toStageNode(node, currentStageId);
@@ -125,7 +125,7 @@ public class StagePlanner {
}
private boolean isExchangeNode(RelNode node) {
- return (node instanceof LogicalExchange);
+ return (node instanceof Exchange);
}
private int getNewStageId() {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
index 38b2da6c56..4ffe901b20 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
@@ -70,7 +70,9 @@ public class SortNode extends AbstractStageNode {
@Override
public String explain() {
- return "SORT" + (_fetch > 0 ? " (LIMIT " + _fetch + ")" : "");
+ return String.format("SORT%s%s",
+ (_fetch > 0) ? " LIMIT " + _fetch : "",
+ (_offset > 0) ? " OFFSET " + _offset : "");
}
@Override
diff --git a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
new file mode 100644
index 0000000000..b92fd79352
--- /dev/null
+++ b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class PinotSortExchangeCopyRuleTest {
+
+ public static final TypeFactory TYPE_FACTORY = new TypeFactory(new TypeSystem());
+ private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+ private AutoCloseable _mocks;
+
+ @Mock
+ private RelOptRuleCall _call;
+ @Mock
+ private RelNode _input;
+ @Mock
+ private RelOptCluster _cluster;
+ @Mock
+ private RelMetadataQuery _query;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ RelTraitSet traits = RelTraitSet.createEmpty();
+ Mockito.when(_input.getTraitSet()).thenReturn(traits);
+ Mockito.when(_input.getCluster()).thenReturn(_cluster);
+ Mockito.when(_call.getMetadataQuery()).thenReturn(_query);
+ Mockito.when(_query.getMaxRowCount(Mockito.any())).thenReturn(null);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void shouldMatchLimitNoOffsetNoSort() {
+ // Given:
+ SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+ Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, null, literal(1));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation().getKeys().size(), 0);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(1));
+ }
+
+ @Test
+ public void shouldMatchLimitNoOffsetYesSort() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+ Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation(), collation);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(1));
+ }
+
+ @Test
+ public void shouldMatchNoSortAndPushDownLimitPlusOffset() {
+ // Given:
+ SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+ Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(2), literal(1));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation().getKeys().size(), 0);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(3));
+ }
+
+ @Test
+ public void shouldMatchSortOnly() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+ Sort sort = LogicalSort.create(exchange, collation, null, null);
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation(), collation);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertNull((innerSort).fetch);
+ }
+
+ @Test
+ public void shouldMatchLimitOffsetAndSort() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+ Sort sort = LogicalSort.create(exchange, collation, literal(1), literal(2));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation(), collation);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(3));
+ }
+
+ @Test
+ public void shouldNotMatchOnlySortAlreadySorted() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+ Sort sort = LogicalSort.create(exchange, collation, null, null);
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+ Mockito.when(_query.collations(_input)).thenReturn(ImmutableList.of(collation));
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ Mockito.verify(_call, Mockito.never()).transformTo(Mockito.any(), Mockito.anyMap());
+ }
+
+ @Test
+ public void shouldNotMatchOffsetNoLimitNoSort() {
+ // Given:
+ SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+ Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(1), null);
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ Mockito.verify(_call, Mockito.never()).transformTo(Mockito.any(), Mockito.anyMap());
+ }
+
+ private static RexNode literal(int i) {
+ return REX_BUILDER.makeLiteral(i, TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index eaf119880b..32dcc2c0f6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -20,8 +20,10 @@ package org.apache.pinot.query.runtime;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -29,10 +31,13 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
@@ -183,10 +188,47 @@ public class QueryRunner {
private InstanceResponseBlock processServerQuery(ServerQueryRequest serverQueryRequest,
ExecutorService executorService) {
try {
- return _serverExecutor.execute(serverQueryRequest, executorService);
+ InstanceResponseBlock result = _serverExecutor.execute(serverQueryRequest, executorService);
+
+ if (result.getRows() != null && serverQueryRequest.getQueryContext().getOrderByExpressions() != null) {
+ // we only re-arrange columns to match the projection in the case of order by - this is to ensure
+ // that V1 results match what the expected projection schema in the calcite logical operator; if
+ // we realize that there are other situations where we need to post-process v1 results to adhere to
+ // the expected results we should factor this out and also apply the canonicalization of the data
+ // types during this post-process step (also see LeafStageTransferableBlockOperator#canonicalizeRow)
+ DataSchema dataSchema = result.getDataSchema();
+ List<String> selectionColumns =
+ SelectionOperatorUtils.getSelectionColumns(serverQueryRequest.getQueryContext(), dataSchema);
+
+ int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, dataSchema);
+ int numColumns = columnIndices.length;
+
+ DataSchema resultDataSchema = SelectionOperatorUtils.getSchemaForProjection(dataSchema, columnIndices);
+
+ // Extract the result rows
+ LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
+ for (Object[] row : result.getRows()) {
+ assert row != null;
+ Object[] extractedRow = new Object[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ Object value = row[columnIndices[i]];
+ if (value != null) {
+ extractedRow[i] = value;
+ }
+ }
+
+ rowsInSelectionResults.addFirst(extractedRow);
+ }
+
+ return new InstanceResponseBlock(
+ new SelectionResultsBlock(resultDataSchema, rowsInSelectionResults),
+ serverQueryRequest.getQueryContext());
+ } else {
+ return result;
+ }
} catch (Exception e) {
InstanceResponseBlock errorResponse = new InstanceResponseBlock();
- errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage());
+ errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, Objects.toString(e.getMessage()));
return errorResponse;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 64a3ed6810..79dcd1f6a6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -106,7 +106,7 @@ public class SortOperator extends BaseOperator<TransferableBlock> {
LinkedList<Object[]> rows = new LinkedList<>();
while (_rows.size() > _offset) {
Object[] row = _rows.poll();
- rows.addFirst(row);
+ rows.addFirst(row);
}
_isSortedBlockConstructed = true;
if (rows.size() == 0) {
diff --git a/pinot-query-runtime/src/test/resources/queries/OrderBy.json b/pinot-query-runtime/src/test/resources/queries/OrderBy.json
new file mode 100644
index 0000000000..2c7708743c
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/OrderBy.json
@@ -0,0 +1,232 @@
+{
+ "basic_order_by": {
+ "tables": {
+ "basic": {
+ "schema": [
+ {"name": "col0", "type": "INT"},
+ {"name": "col1", "type": "INT"},
+ {"name": "col2", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, 2, "a"],
+ [2, 3, "b"],
+ [3, 1, "c"],
+ [4, 4, "d"],
+ [5, 5, "e"],
+ [6, 6, "f"]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col0 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT 2 OFFSET 0"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col2, col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1, col2 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 2"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 100"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 OFFSET 100"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 DESC"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 DESC LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 ASC"},
+ {"sql": "SELECT * FROM {basic} ORDER BY col1 ASC LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} ORDER BY power(col1, 2)"},
+ {"sql": "SELECT * FROM {basic} WHERE col1 > 3 ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} WHERE col0 > 3 ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {basic} WHERE col1 > 3 AND col1 < 5 ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT col1 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT col2 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT col1, col2 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT col1, col0, col2 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+ {
+ "ignored": true,
+ "comment": "we don't support ALL statement",
+ "sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT ALL"
+ },
+ {
+ "ignored": true,
+ "comment": "we don't support LIMIT NULL",
+ "sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT NULL"
+ }
+ ]
+ },
+ "order_by_agg": {
+ "tables": {
+ "agg": {
+ "schema": [
+ {"name": "val", "type": "INT"},
+ {"name": "g", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, "a"],
+ [2, "a"],
+ [3, "b"],
+ [4, "b"],
+ [5, "c"]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY sum"},
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY sum LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g HAVING SUM(val) > 3 ORDER BY sum LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY g"},
+ {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY SUM(val)"},
+ {"sql": "SELECT SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY g"}
+ ]
+ },
+ "order_by_join": {
+ "tables": {
+ "l": {
+ "schema": [
+ {"name": "key", "type": "STRING"},
+ {"name": "lval", "type": "INT"}
+ ],
+ "inputs": [
+ ["foo", 1],
+ ["foo", 3],
+ ["foo", 5],
+ ["bar", 2],
+ ["bar", 4],
+ ["bar", 6]
+ ]
+ },
+ "r": {
+ "schema": [
+ {"name": "key", "type": "STRING"},
+ {"name": "rval", "type": "INT"}
+ ],
+ "inputs": [
+ ["foo", 1],
+ ["foo", 3],
+ ["foo", 7],
+ ["bar", 2],
+ ["bar", 4],
+ ["bar", 8]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY lval, rval LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT lval FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY lval, rval LIMIT 2 OFFSET 1"},
+ {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY {l}.key"},
+ {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY {r}.key"},
+ {"sql": "SELECT {l}.key, SUM(lval), SUM(rval) FROM {l} JOIN {r} ON {l}.key = {r}.key GROUP BY {l}.key ORDER BY {l}.key"}
+ ]
+ },
+ "order_by_subqueries": {
+ "tables": {
+ "tbl": {
+ "schema": [
+ {"name": "col0", "type": "INT"},
+ {"name": "col1", "type": "INT"},
+ {"name": "col2", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, 2, "a"],
+ [2, 3, "b"],
+ [3, 1, "c"],
+ [4, 4, "d"],
+ [5, 5, "e"],
+ [6, 6, "f"]
+ ]
+ }
+ },
+ "queries": [
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col0)"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2)"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET 1)"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET 1) ORDER BY col1 LIMIT 3"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET 1) ORDER BY col2 LIMIT 3 OFFSET 1"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 5 OFFSET 1) ORDER BY col1 LIMIT 3 OFFSET 1"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 5 OFFSET 1) ORDER BY col1 OFFSET 1"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2) LIMIT 1"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1) ORDER BY col0"},
+ {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1) ORDER BY col0 LIMIT 1"}
+ ]
+ },
+ "order_by_float": {
+ "tables": {
+ "floats": {
+ "schema": [
+ {"name": "col0", "type": "FLOAT"},
+ {"name": "col1", "type": "FLOAT"}
+ ],
+ "inputs": [
+ [0.0, 2.2],
+ [3.3, -3.3],
+ [4.4, 1.1],
+ [-5.5, 4.4],
+ [6.6, 5.0],
+ [7.7, -6.6]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT col1 FROM {floats} ORDER BY col1 LIMIT 2 OFFSET 1",
+ "description": "Basic test for ordering by a float column with a limit and offset"
+ },
+ {
+ "sql": "SELECT col0 FROM {floats} ORDER BY col0 * 2 LIMIT 2 OFFSET 1",
+ "description": "Testing order by with an expression involving floats"
+ }
+ ]
+ },
+ "order_by_double": {
+ "tables": {
+ "doubles": {
+ "schema": [
+ {"name": "col0", "type": "DOUBLE"},
+ {"name": "col1", "type": "DOUBLE"}
+ ],
+ "inputs": [
+ [0.0, 2.2],
+ [3.3, -3.3],
+ [4.4, 1.1],
+ [-5.5, 4.4],
+ [6.6, -5.0],
+ [7.7, 6.6]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT col1 FROM {doubles} ORDER BY col1 LIMIT 2 OFFSET 1",
+ "description": "Basic test for ordering by a double column with a limit and offset"
+ },
+ {
+ "sql": "SELECT col0 FROM {doubles} ORDER BY col0 * 2 LIMIT 2 OFFSET 1",
+ "description": "Testing order by with an expression involving doubles"
+ }
+ ]
+ },
+ "order_by_boolean": {
+ "ignored": true,
+ "comment": "fails when we try to canonicalizeRow with ClassCastException - value is already in memory as boolean but DataSchema expects it to be an int",
+ "tables": {
+ "bools": {
+ "schema": [
+ {"name": "col0", "type": "BOOLEAN"},
+ {"name": "col1", "type": "INT"}
+ ],
+ "inputs": [
+ [true, 2],
+ [false, 3],
+ [false, 1],
+ [true, 4],
+ [true, 5],
+ [false, 6]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "sql": "SELECT col1 FROM {bools} ORDER BY col0, col1 LIMIT 2 OFFSET 1",
+ "description": "Basic test for ordering by a boolean column with a limit and offset"
+ }
+ ]
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org