You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/11/30 18:25:40 UTC

[pinot] branch master updated: [multistage] support sort push-down (#9832)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 041865a80f [multistage] support sort push-down (#9832)
041865a80f is described below

commit 041865a80f7a2359270571a2343049bb1f294fe5
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Wed Nov 30 10:25:34 2022 -0800

    [multistage] support sort push-down (#9832)
---
 .../query/selection/SelectionOperatorService.java  |  17 +-
 .../query/selection/SelectionOperatorUtils.java    |  15 +
 .../rel/rules/ImmutableSortExchangeCopyRule.java   | 414 +++++++++++++++++++++
 .../calcite/rel/rules/PinotQueryRuleSets.java      |   9 +-
 .../rel/rules/PinotSortExchangeCopyRule.java       | 114 ++++++
 .../rel/rules/PinotSortExchangeNodeInsertRule.java |  18 +-
 .../query/planner/logical/RelToStageConverter.java |   5 +-
 .../query/planner/logical/RexExpressionUtils.java  |  10 +
 .../pinot/query/planner/logical/StagePlanner.java  |   6 +-
 .../apache/pinot/query/planner/stage/SortNode.java |   4 +-
 .../rel/rules/PinotSortExchangeCopyRuleTest.java   | 249 +++++++++++++
 .../apache/pinot/query/runtime/QueryRunner.java    |  46 ++-
 .../pinot/query/runtime/operator/SortOperator.java |   2 +-
 .../src/test/resources/queries/OrderBy.json        | 232 ++++++++++++
 14 files changed, 1111 insertions(+), 30 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index f77c936a21..525ab7af9e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -25,7 +25,6 @@ import java.util.PriorityQueue;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.spi.utils.LoopUtils;
 import org.roaringbitmap.RoaringBitmap;
@@ -134,18 +133,7 @@ public class SelectionOperatorService {
   public ResultTable renderResultTableWithOrdering() {
     int[] columnIndices = SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
     int numColumns = columnIndices.length;
-
-    // Construct the result data schema
-    String[] columnNames = _dataSchema.getColumnNames();
-    ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
-    String[] resultColumnNames = new String[numColumns];
-    ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
-    for (int i = 0; i < numColumns; i++) {
-      int columnIndex = columnIndices[i];
-      resultColumnNames[i] = columnNames[columnIndex];
-      resultColumnDataTypes[i] = columnDataTypes[columnIndex];
-    }
-    DataSchema resultDataSchema = new DataSchema(resultColumnNames, resultColumnDataTypes);
+    DataSchema resultDataSchema = SelectionOperatorUtils.getSchemaForProjection(_dataSchema, columnIndices);
 
     // Extract the result rows
     LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
@@ -156,9 +144,10 @@ public class SelectionOperatorService {
       for (int i = 0; i < numColumns; i++) {
         Object value = row[columnIndices[i]];
         if (value != null) {
-          extractedRow[i] = resultColumnDataTypes[i].convertAndFormat(value);
+          extractedRow[i] = resultDataSchema.getColumnDataType(i).convertAndFormat(value);
         }
       }
+
       rowsInSelectionResults.addFirst(extractedRow);
     }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index e53b4f2982..bda3efab0b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -656,4 +656,19 @@ public class SelectionOperatorUtils {
       queue.offer(value);
     }
   }
+
+  public static DataSchema getSchemaForProjection(DataSchema dataSchema, int[] columnIndices) {
+    int numColumns = columnIndices.length;
+
+    String[] columnNames = dataSchema.getColumnNames();
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+    String[] resultColumnNames = new String[numColumns];
+    ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
+    for (int i = 0; i < numColumns; i++) {
+      int columnIndex = columnIndices[i];
+      resultColumnNames[i] = columnNames[columnIndex];
+      resultColumnDataTypes[i] = columnDataTypes[columnIndex];
+    }
+    return new DataSchema(resultColumnNames, resultColumnDataTypes);
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java
new file mode 100644
index 0000000000..eca7fc6110
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/ImmutableSortExchangeCopyRule.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+// NOTE: this file was generated using Calcite's code generator, but instead of pulling in all
+// the dependencies for codegen we just manually generate it and check it in. If active development
+// on this needs to happen, re-generate it using Calcite's generator.
+
+// CHECKSTYLE:OFF
+
+import com.google.common.base.MoreObjects;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * {@code ImmutableSortExchangeCopyRule} contains immutable implementation classes generated from
+ * abstract value types defined as nested inside {@link SortExchangeCopyRule}.
+ * @see ImmutableSortExchangeCopyRule.Config
+ */
+@SuppressWarnings({"all"})
+final class ImmutableSortExchangeCopyRule {
+  private ImmutableSortExchangeCopyRule() {
+  }
+
+  /**
+   * Immutable implementation of {@link SortExchangeCopyRule.Config}.
+   * <p>
+   * Use the builder to create immutable instances:
+   * {@code ImmutableSortExchangeCopyRule.Config.builder()}.
+   * Use the static factory method to get the default singleton instance:
+   * {@code ImmutableSortExchangeCopyRule.Config.of()}.
+   */
+  static final class Config implements PinotSortExchangeCopyRule.Config {
+    private final RelBuilderFactory relBuilderFactory;
+    private final @Nullable String description;
+    private final RelRule.OperandTransform operandSupplier;
+
+    private Config() {
+      this.description = null;
+      this.relBuilderFactory = initShim.relBuilderFactory();
+      this.operandSupplier = initShim.operandSupplier();
+      this.initShim = null;
+    }
+
+    private Config(ImmutableSortExchangeCopyRule.Config.Builder builder) {
+      this.description = builder.description;
+      if (builder.relBuilderFactory != null) {
+        initShim.withRelBuilderFactory(builder.relBuilderFactory);
+      }
+      if (builder.operandSupplier != null) {
+        initShim.withOperandSupplier(builder.operandSupplier);
+      }
+      this.relBuilderFactory = initShim.relBuilderFactory();
+      this.operandSupplier = initShim.operandSupplier();
+      this.initShim = null;
+    }
+
+    private Config(RelBuilderFactory relBuilderFactory,
+        @Nullable String description,
+        RelRule.OperandTransform operandSupplier) {
+      this.relBuilderFactory = relBuilderFactory;
+      this.description = description;
+      this.operandSupplier = operandSupplier;
+      this.initShim = null;
+    }
+
+    private static final byte STAGE_INITIALIZING = -1;
+    private static final byte STAGE_UNINITIALIZED = 0;
+    private static final byte STAGE_INITIALIZED = 1;
+    @SuppressWarnings("Immutable")
+    private transient volatile InitShim initShim = new InitShim();
+
+    private final class InitShim {
+      private byte relBuilderFactoryBuildStage = STAGE_UNINITIALIZED;
+      private RelBuilderFactory relBuilderFactory;
+
+      RelBuilderFactory relBuilderFactory() {
+        if (relBuilderFactoryBuildStage == STAGE_INITIALIZING) {
+          throw new IllegalStateException(formatInitCycleMessage());
+        }
+        if (relBuilderFactoryBuildStage == STAGE_UNINITIALIZED) {
+          relBuilderFactoryBuildStage = STAGE_INITIALIZING;
+          this.relBuilderFactory = Objects.requireNonNull(relBuilderFactoryInitialize(), "relBuilderFactory");
+          relBuilderFactoryBuildStage = STAGE_INITIALIZED;
+        }
+        return this.relBuilderFactory;
+      }
+
+      void withRelBuilderFactory(RelBuilderFactory relBuilderFactory) {
+        this.relBuilderFactory = relBuilderFactory;
+        relBuilderFactoryBuildStage = STAGE_INITIALIZED;
+      }
+
+      private byte operandSupplierBuildStage = STAGE_UNINITIALIZED;
+      private RelRule.OperandTransform operandSupplier;
+
+      RelRule.OperandTransform operandSupplier() {
+        if (operandSupplierBuildStage == STAGE_INITIALIZING) {
+          throw new IllegalStateException(formatInitCycleMessage());
+        }
+        if (operandSupplierBuildStage == STAGE_UNINITIALIZED) {
+          operandSupplierBuildStage = STAGE_INITIALIZING;
+          this.operandSupplier = Objects.requireNonNull(operandSupplierInitialize(), "operandSupplier");
+          operandSupplierBuildStage = STAGE_INITIALIZED;
+        }
+        return this.operandSupplier;
+      }
+
+      void withOperandSupplier(RelRule.OperandTransform operandSupplier) {
+        this.operandSupplier = operandSupplier;
+        operandSupplierBuildStage = STAGE_INITIALIZED;
+      }
+
+      private String formatInitCycleMessage() {
+        List<String> attributes = new ArrayList<>();
+        if (relBuilderFactoryBuildStage == STAGE_INITIALIZING) {
+          attributes.add("relBuilderFactory");
+        }
+        if (operandSupplierBuildStage == STAGE_INITIALIZING) {
+          attributes.add("operandSupplier");
+        }
+        return "Cannot build Config, attribute initializers form cycle " + attributes;
+      }
+    }
+
+    private RelBuilderFactory relBuilderFactoryInitialize() {
+      return PinotSortExchangeCopyRule.Config.super.relBuilderFactory();
+    }
+
+    private RelRule.OperandTransform operandSupplierInitialize() {
+      return PinotSortExchangeCopyRule.Config.super.operandSupplier();
+    }
+
+    /**
+     * @return The value of the {@code relBuilderFactory} attribute
+     */
+    @Override
+    public RelBuilderFactory relBuilderFactory() {
+      InitShim shim = this.initShim;
+      return shim != null ? shim.relBuilderFactory() : this.relBuilderFactory;
+    }
+
+    /**
+     * @return The value of the {@code description} attribute
+     */
+    @Override
+    public @Nullable String description() {
+      return description;
+    }
+
+    /**
+     * @return The value of the {@code operandSupplier} attribute
+     */
+    @Override
+    public RelRule.OperandTransform operandSupplier() {
+      InitShim shim = this.initShim;
+      return shim != null ? shim.operandSupplier() : this.operandSupplier;
+    }
+
+    /**
+     * Copy the current immutable object by setting a value for the
+     * {@link SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory} attribute.
+     * A shallow reference equality check is used to prevent copying of the same value by returning {@code this}.
+     * @param value A new value for relBuilderFactory
+     * @return A modified copy of the {@code this} object
+     */
+    public final ImmutableSortExchangeCopyRule.Config withRelBuilderFactory(RelBuilderFactory value) {
+      if (this.relBuilderFactory == value) {
+        return this;
+      }
+      RelBuilderFactory newValue = Objects.requireNonNull(value, "relBuilderFactory");
+      return validate(new ImmutableSortExchangeCopyRule.Config(newValue, this.description, this.operandSupplier));
+    }
+
+    /**
+     * Copy the current immutable object by setting a value for the {@link SortExchangeCopyRule.Config#description()
+     * description} attribute.
+     * An equals check used to prevent copying of the same value by returning {@code this}.
+     * @param value A new value for description (can be {@code null})
+     * @return A modified copy of the {@code this} object
+     */
+    public final ImmutableSortExchangeCopyRule.Config withDescription(
+        @Nullable String value) {
+      if (Objects.equals(this.description, value)) {
+        return this;
+      }
+      return validate(new ImmutableSortExchangeCopyRule.Config(this.relBuilderFactory, value, this.operandSupplier));
+    }
+
+    /**
+     * Copy the current immutable object by setting a value for the
+     * {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier} attribute.
+     * A shallow reference equality check is used to prevent copying of the same value by returning {@code this}.
+     * @param value A new value for operandSupplier
+     * @return A modified copy of the {@code this} object
+     */
+    public final ImmutableSortExchangeCopyRule.Config withOperandSupplier(RelRule.OperandTransform value) {
+      if (this.operandSupplier == value) {
+        return this;
+      }
+      RelRule.OperandTransform newValue = Objects.requireNonNull(value, "operandSupplier");
+      return validate(new ImmutableSortExchangeCopyRule.Config(this.relBuilderFactory, this.description, newValue));
+    }
+
+    /**
+     * This instance is equal to all instances of {@code Config} that have equal attribute values.
+     * @return {@code true} if {@code this} is equal to {@code another} instance
+     */
+    @Override
+    public boolean equals(@Nullable Object another) {
+      if (this == another) {
+        return true;
+      }
+      return another instanceof ImmutableSortExchangeCopyRule.Config && equalTo(
+          (ImmutableSortExchangeCopyRule.Config) another);
+    }
+
+    private boolean equalTo(ImmutableSortExchangeCopyRule.Config another) {
+      return relBuilderFactory.equals(another.relBuilderFactory) && Objects.equals(description, another.description)
+          && operandSupplier.equals(another.operandSupplier);
+    }
+
+    /**
+     * Computes a hash code from attributes: {@code relBuilderFactory}, {@code description}, {@code operandSupplier}.
+     * @return hashCode value
+     */
+    @Override
+    public int hashCode() {
+      int h = 5381;
+      h += (h << 5) + relBuilderFactory.hashCode();
+      h += (h << 5) + Objects.hashCode(description);
+      h += (h << 5) + operandSupplier.hashCode();
+      return h;
+    }
+
+    /**
+     * Prints the immutable value {@code Config} with attribute values.
+     * @return A string representation of the value
+     */
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper("Config").omitNullValues().add("relBuilderFactory", relBuilderFactory)
+          .add("description", description).add("operandSupplier", operandSupplier).toString();
+    }
+
+    private static final ImmutableSortExchangeCopyRule.Config INSTANCE =
+        validate(new ImmutableSortExchangeCopyRule.Config());
+
+    /**
+     * Returns the default immutable singleton value of {@code Config}
+     * @return An immutable instance of Config
+     */
+    public static ImmutableSortExchangeCopyRule.Config of() {
+      return INSTANCE;
+    }
+
+    private static ImmutableSortExchangeCopyRule.Config validate(ImmutableSortExchangeCopyRule.Config instance) {
+      return INSTANCE != null && INSTANCE.equalTo(instance) ? INSTANCE : instance;
+    }
+
+    /**
+     * Creates an immutable copy of a {@link SortExchangeCopyRule.Config} value.
+     * Uses accessors to get values to initialize the new immutable instance.
+     * If an instance is already immutable, it is returned as is.
+     * @param instance The instance to copy
+     * @return A copied immutable Config instance
+     */
+    public static ImmutableSortExchangeCopyRule.Config copyOf(PinotSortExchangeCopyRule.Config instance) {
+      if (instance instanceof ImmutableSortExchangeCopyRule.Config) {
+        return (ImmutableSortExchangeCopyRule.Config) instance;
+      }
+      return ImmutableSortExchangeCopyRule.Config.builder().from(instance).build();
+    }
+
+    /**
+     * Creates a builder for {@link ImmutableSortExchangeCopyRule.Config Config}.
+     * <pre>
+     * ImmutableSortExchangeCopyRule.Config.builder()
+     *    .withRelBuilderFactory(org.apache.calcite.tools.RelBuilderFactory)
+     *    // optional {@link SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory}
+     *    .withDescription(@org.checkerframework.checker.nullness.qual.Nullable String | null)
+     *    // nullable {@link SortExchangeCopyRule.Config#description() description}
+     *    .withOperandSupplier(org.apache.calcite.plan.RelRule.OperandTransform)
+     *    // optional {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier}
+     *    .build();
+     * </pre>
+     * @return A new Config builder
+     */
+    public static ImmutableSortExchangeCopyRule.Config.Builder builder() {
+      return new ImmutableSortExchangeCopyRule.Config.Builder();
+    }
+
+    /**
+     * Builds instances of type {@link ImmutableSortExchangeCopyRule.Config Config}.
+     * Initialize attributes and then invoke the {@link #build()} method to create an
+     * immutable instance.
+     * <p><em>{@code Builder} is not thread-safe and generally should not be stored in a field or collection,
+     * but instead used immediately to create instances.</em>
+     */
+    @NotThreadSafe
+    public static final class Builder {
+      private @Nullable RelBuilderFactory relBuilderFactory;
+      private @Nullable String description;
+      private @Nullable RelRule.OperandTransform operandSupplier;
+
+      private Builder() {
+      }
+
+      /**
+       * Fill a builder with attribute values from the provided {@code org.apache.calcite.plan.RelRule.Config} instance.
+       * @param instance The instance from which to copy values
+       * @return {@code this} builder for use in a chained invocation
+       */
+      public final Builder from(RelRule.Config instance) {
+        Objects.requireNonNull(instance, "instance");
+        from((Object) instance);
+        return this;
+      }
+
+      /**
+       * Fill a builder with attribute values from the provided {@code org.apache.calcite.rel.rules
+       * .SortExchangeCopyRule.Config} instance.
+       * @param instance The instance from which to copy values
+       * @return {@code this} builder for use in a chained invocation
+       */
+      public final Builder from(PinotSortExchangeCopyRule.Config instance) {
+        Objects.requireNonNull(instance, "instance");
+        from((Object) instance);
+        return this;
+      }
+
+      private void from(Object object) {
+        if (object instanceof RelRule.Config) {
+          RelRule.Config instance = (RelRule.Config) object;
+          withRelBuilderFactory(instance.relBuilderFactory());
+          withOperandSupplier(instance.operandSupplier());
+          @Nullable
+          String descriptionValue =
+              instance.description();
+          if (descriptionValue != null) {
+            withDescription(descriptionValue);
+          }
+        }
+      }
+
+      /**
+       * Initializes the value for the {@link SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory}
+       * attribute.
+       * <p><em>If not set, this attribute will have a default value as returned by the initializer of
+       * {@link SortExchangeCopyRule.Config#relBuilderFactory() relBuilderFactory}.</em>
+       * @param relBuilderFactory The value for relBuilderFactory
+       * @return {@code this} builder for use in a chained invocation
+       */
+      public final Builder withRelBuilderFactory(RelBuilderFactory relBuilderFactory) {
+        this.relBuilderFactory = Objects.requireNonNull(relBuilderFactory, "relBuilderFactory");
+        return this;
+      }
+
+      /**
+       * Initializes the value for the {@link SortExchangeCopyRule.Config#description() description} attribute.
+       * @param description The value for description (can be {@code null})
+       * @return {@code this} builder for use in a chained invocation
+       */
+      public final Builder withDescription(
+          @Nullable String description) {
+        this.description = description;
+        return this;
+      }
+
+      /**
+       * Initializes the value for the {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier} attribute.
+       * <p><em>If not set, this attribute will have a default value as returned by the initializer of
+       * {@link SortExchangeCopyRule.Config#operandSupplier() operandSupplier}.</em>
+       * @param operandSupplier The value for operandSupplier
+       * @return {@code this} builder for use in a chained invocation
+       */
+      public final Builder withOperandSupplier(RelRule.OperandTransform operandSupplier) {
+        this.operandSupplier = Objects.requireNonNull(operandSupplier, "operandSupplier");
+        return this;
+      }
+
+      /**
+       * Builds a new {@link ImmutableSortExchangeCopyRule.Config Config}.
+       * @return An immutable instance of Config
+       * @throws java.lang.IllegalStateException if any required attributes are missing
+       */
+      public ImmutableSortExchangeCopyRule.Config build() {
+        return ImmutableSortExchangeCopyRule.Config.validate(new ImmutableSortExchangeCopyRule.Config(this));
+      }
+    }
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 4ed6be0a23..1828abd0e8 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -59,9 +59,15 @@ public class PinotQueryRuleSets {
           CoreRules.PROJECT_MERGE,
           // remove identity project
           CoreRules.PROJECT_REMOVE,
+          // add an extra exchange for sort
+          PinotSortExchangeNodeInsertRule.INSTANCE,
+          // copy exchanges down
+          PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY,
           // reorder sort and projection
           CoreRules.SORT_PROJECT_TRANSPOSE,
 
+          // TODO: evaluate the SORT_JOIN_TRANSPOSE and SORT_JOIN_COPY rules
+
           // join rules
           CoreRules.JOIN_PUSH_EXPRESSIONS,
 
@@ -89,7 +95,6 @@ public class PinotQueryRuleSets {
           // Pinot specific rules
           PinotFilterExpandSearchRule.INSTANCE,
           PinotJoinExchangeNodeInsertRule.INSTANCE,
-          PinotAggregateExchangeNodeInsertRule.INSTANCE,
-          PinotSortExchangeNodeInsertRule.INSTANCE
+          PinotAggregateExchangeNodeInsertRule.INSTANCE
       );
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
new file mode 100644
index 0000000000..7f163ca99b
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.base.Preconditions;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.metadata.RelMdUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.pinot.query.planner.logical.RexExpressionUtils;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+
+
+public class PinotSortExchangeCopyRule extends RelRule<RelRule.Config> {
+
+  public static final PinotSortExchangeCopyRule SORT_EXCHANGE_COPY =
+      PinotSortExchangeCopyRule.Config.DEFAULT.toRule();
+  private static final TypeFactory TYPE_FACTORY = new TypeFactory(new TypeSystem());
+
+  /**
+   * Creates a PinotSortExchangeCopyRule.
+   */
+  protected PinotSortExchangeCopyRule(Config config) {
+    super(config);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final Sort sort = call.rel(0);
+    final SortExchange exchange = call.rel(1);
+    final RelMetadataQuery metadataQuery = call.getMetadataQuery();
+
+    if (RelMdUtil.checkInputForCollationAndLimit(
+        metadataQuery,
+        exchange.getInput(),
+        sort.getCollation(),
+        sort.offset,
+        sort.fetch)) {
+      // Don't rewrite anything if the input is already sorted AND the
+      // input node would already return fewer than sort.offset + sort.fetch
+      // rows (e.g. there is already an inner limit applied)
+      return;
+    }
+
+    RelCollation collation = sort.getCollation();
+    Preconditions.checkArgument(
+        collation.equals(exchange.getCollation()),
+        "Expected collation on exchange and sort to be the same"
+    );
+
+    final RexNode fetch;
+    if (sort.fetch == null) {
+      fetch = null;
+    } else if (sort.offset == null) {
+      fetch = sort.fetch;
+    } else {
+      RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
+      int total = RexExpressionUtils.getValueAsInt(sort.fetch) + RexExpressionUtils.getValueAsInt(sort.offset);
+      fetch = rexBuilder.makeLiteral(total, TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+    }
+
+    final RelNode newExchangeInput = sort.copy(sort.getTraitSet(), exchange.getInput(), collation, null, fetch);
+    final RelNode exchangeCopy = exchange.copy(exchange.getTraitSet(), newExchangeInput, exchange.getDistribution());
+    final RelNode sortCopy = sort.copy(sort.getTraitSet(), exchangeCopy, collation, sort.offset, sort.fetch);
+
+    call.transformTo(sortCopy);
+  }
+
+  public interface Config extends RelRule.Config {
+
+    Config DEFAULT = ImmutableSortExchangeCopyRule.Config.of()
+        .withOperandFor(LogicalSort.class, LogicalSortExchange.class);
+
+    @Override default PinotSortExchangeCopyRule toRule() {
+      return new PinotSortExchangeCopyRule(this);
+    }
+
+    /** Defines an operand tree for the given classes. */
+
+    default Config withOperandFor(Class<? extends Sort> sortClass,
+        Class<? extends SortExchange> exchangeClass) {
+      return withOperandSupplier(b0 ->
+          b0.operand(sortClass).oneInput(b1 ->
+              b1.operand(exchangeClass).anyInputs()))
+          .as(Config.class);
+    }
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
index fe57992074..f03b78dc3d 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -23,13 +23,21 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalExchange;
 import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
 import org.apache.calcite.tools.RelBuilderFactory;
 
 
 /**
- * Special rule for Pinot, this rule is fixed to always insert exchange after SORT node.
+ * Rewrite any sort into a relation that adds an exchange and pushes down the collation
+ * to the rest of the tree. This happens for two reasons:
+ * <ol>
+ *   <li>Sort needs to be a distributed operation, if there are multiple nodes that are
+ *   scanning data the sort ordering must be applied globally.</li>
+ *   <li>It is ideal to push down the sort ordering as far as possible. If upstream nodes
+ *   can send data in sorted order, then we can apply N-way merge sort and early terminate
+ *   once all nodes have sent data that is no longer in the top OFFSET+LIMIT.</li>
+ * </ol>
  */
 public class PinotSortExchangeNodeInsertRule extends RelOptRule {
   public static final PinotSortExchangeNodeInsertRule INSTANCE =
@@ -54,8 +62,10 @@ public class PinotSortExchangeNodeInsertRule extends RelOptRule {
   @Override
   public void onMatch(RelOptRuleCall call) {
     Sort sort = call.rel(0);
-    // TODO: this is a single value
-    LogicalExchange exchange = LogicalExchange.create(sort.getInput(), RelDistributions.hash(Collections.emptyList()));
+    LogicalSortExchange exchange = LogicalSortExchange.create(
+        sort.getInput(),
+        RelDistributions.hash(Collections.emptyList()),
+        sort.getCollation());
     call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index de6a87ca88..cd29edac30 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -33,7 +33,6 @@ import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.stage.AggregateNode;
@@ -88,8 +87,8 @@ public final class RelToStageConverter {
   }
 
   private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
-    int fetch = node.fetch == null ? 0 : ((RexLiteral) node.fetch).getValueAs(Integer.class);
-    int offset = node.offset == null ? 0 : ((RexLiteral) node.offset).getValueAs(Integer.class);
+    int fetch = RexExpressionUtils.getValueAsInt(node.fetch);
+    int offset = RexExpressionUtils.getValueAsInt(node.offset);
     return new SortNode(currentStageId, node.getCollation().getFieldCollations(), fetch, offset,
         toDataSchema(node.getRowType()));
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index cc66047651..314b0baa54 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -86,4 +86,14 @@ public class RexExpressionUtils {
     }
     return result;
   }
+
+  public static Integer getValueAsInt(RexNode in) {
+    if (in == null) {
+      return 0;
+    }
+
+    Preconditions.checkArgument(in instanceof RexLiteral, "expected literal, got " + in);
+    RexLiteral literal = (RexLiteral) in;
+    return literal.getValueAs(Integer.class);
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 5f46b23d26..736794c283 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.core.Exchange;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
@@ -92,7 +92,7 @@ public class StagePlanner {
   private StageNode walkRelPlan(RelNode node, int currentStageId) {
     if (isExchangeNode(node)) {
       StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
-      RelDistribution distribution = ((LogicalExchange) node).getDistribution();
+      RelDistribution distribution = ((Exchange) node).getDistribution();
       return createSendReceivePair(nextStageRoot, distribution, currentStageId);
     } else {
       StageNode stageNode = RelToStageConverter.toStageNode(node, currentStageId);
@@ -125,7 +125,7 @@ public class StagePlanner {
   }
 
   private boolean isExchangeNode(RelNode node) {
-    return (node instanceof LogicalExchange);
+    return (node instanceof Exchange);
   }
 
   private int getNewStageId() {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
index 38b2da6c56..4ffe901b20 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
@@ -70,7 +70,9 @@ public class SortNode extends AbstractStageNode {
 
   @Override
   public String explain() {
-    return "SORT" + (_fetch > 0 ? " (LIMIT " + _fetch + ")" : "");
+    return String.format("SORT%s%s",
+        (_fetch > 0) ? " LIMIT " + _fetch : "",
+        (_offset > 0) ? " OFFSET " + _offset : "");
   }
 
   @Override
diff --git a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
new file mode 100644
index 0000000000..b92fd79352
--- /dev/null
+++ b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.pinot.query.type.TypeFactory;
+import org.apache.pinot.query.type.TypeSystem;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class PinotSortExchangeCopyRuleTest {
+
+  public static final TypeFactory TYPE_FACTORY = new TypeFactory(new TypeSystem());
+  private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private RelOptRuleCall _call;
+  @Mock
+  private RelNode _input;
+  @Mock
+  private RelOptCluster _cluster;
+  @Mock
+  private RelMetadataQuery _query;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    RelTraitSet traits = RelTraitSet.createEmpty();
+    Mockito.when(_input.getTraitSet()).thenReturn(traits);
+    Mockito.when(_input.getCluster()).thenReturn(_cluster);
+    Mockito.when(_call.getMetadataQuery()).thenReturn(_query);
+    Mockito.when(_query.getMaxRowCount(Mockito.any())).thenReturn(null);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void shouldMatchLimitNoOffsetNoSort() {
+    // Given:
+    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+    Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, null, literal(1));
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+    Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+    RelNode sortCopy = sortCopyCapture.getValue();
+    Assert.assertTrue(sortCopy instanceof LogicalSort);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+    LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+    Assert.assertEquals(innerSort.getCollation().getKeys().size(), 0);
+    Assert.assertNull((innerSort).offset);
+    Assert.assertEquals((innerSort).fetch, literal(1));
+  }
+
+  @Test
+  public void shouldMatchLimitNoOffsetYesSort() {
+    // Given:
+    RelCollation collation = RelCollations.of(1);
+    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+    Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+    Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+    RelNode sortCopy = sortCopyCapture.getValue();
+    Assert.assertTrue(sortCopy instanceof LogicalSort);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+    LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+    Assert.assertEquals(innerSort.getCollation(), collation);
+    Assert.assertNull((innerSort).offset);
+    Assert.assertEquals((innerSort).fetch, literal(1));
+  }
+
+  @Test
+  public void shouldMatchNoSortAndPushDownLimitPlusOffset() {
+    // Given:
+    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+    Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(2), literal(1));
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+    Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+    RelNode sortCopy = sortCopyCapture.getValue();
+    Assert.assertTrue(sortCopy instanceof LogicalSort);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+    LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+    Assert.assertEquals(innerSort.getCollation().getKeys().size(), 0);
+    Assert.assertNull((innerSort).offset);
+    Assert.assertEquals((innerSort).fetch, literal(3));
+  }
+
+  @Test
+  public void shouldMatchSortOnly() {
+    // Given:
+    RelCollation collation = RelCollations.of(1);
+    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+    Sort sort = LogicalSort.create(exchange, collation, null, null);
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+    Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+    RelNode sortCopy = sortCopyCapture.getValue();
+    Assert.assertTrue(sortCopy instanceof LogicalSort);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+    LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+    Assert.assertEquals(innerSort.getCollation(), collation);
+    Assert.assertNull((innerSort).offset);
+    Assert.assertNull((innerSort).fetch);
+  }
+
+  @Test
+  public void shouldMatchLimitOffsetAndSort() {
+    // Given:
+    RelCollation collation = RelCollations.of(1);
+    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+    Sort sort = LogicalSort.create(exchange, collation, literal(1), literal(2));
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+    Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+    RelNode sortCopy = sortCopyCapture.getValue();
+    Assert.assertTrue(sortCopy instanceof LogicalSort);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+    Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+    LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+    Assert.assertEquals(innerSort.getCollation(), collation);
+    Assert.assertNull((innerSort).offset);
+    Assert.assertEquals((innerSort).fetch, literal(3));
+  }
+
+  @Test
+  public void shouldNotMatchOnlySortAlreadySorted() {
+    // Given:
+    RelCollation collation = RelCollations.of(1);
+    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+    Sort sort = LogicalSort.create(exchange, collation, null, null);
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+    Mockito.when(_query.collations(_input)).thenReturn(ImmutableList.of(collation));
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    Mockito.verify(_call, Mockito.never()).transformTo(Mockito.any(), Mockito.anyMap());
+  }
+
+  @Test
+  public void shouldNotMatchOffsetNoLimitNoSort() {
+    // Given:
+    SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+    Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(1), null);
+    Mockito.when(_call.rel(0)).thenReturn(sort);
+    Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+    // When:
+    PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+    // Then:
+    Mockito.verify(_call, Mockito.never()).transformTo(Mockito.any(), Mockito.anyMap());
+  }
+
+  private static RexNode literal(int i) {
+    return REX_BUILDER.makeLiteral(i, TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index eaf119880b..32dcc2c0f6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -20,8 +20,10 @@ package org.apache.pinot.query.runtime;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -29,10 +31,13 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
@@ -183,10 +188,47 @@ public class QueryRunner {
   private InstanceResponseBlock processServerQuery(ServerQueryRequest serverQueryRequest,
       ExecutorService executorService) {
     try {
-      return _serverExecutor.execute(serverQueryRequest, executorService);
+      InstanceResponseBlock result = _serverExecutor.execute(serverQueryRequest, executorService);
+
+      if (result.getRows() != null && serverQueryRequest.getQueryContext().getOrderByExpressions() != null) {
+        // we only re-arrange columns to match the projection in the case of order by - this is to ensure
+        // that V1 results match what the expected projection schema in the calcite logical operator; if
+        // we realize that there are other situations where we need to post-process v1 results to adhere to
+        // the expected results we should factor this out and also apply the canonicalization of the data
+        // types during this post-process step (also see LeafStageTransferableBlockOperator#canonicalizeRow)
+        DataSchema dataSchema = result.getDataSchema();
+        List<String> selectionColumns =
+            SelectionOperatorUtils.getSelectionColumns(serverQueryRequest.getQueryContext(), dataSchema);
+
+        int[] columnIndices = SelectionOperatorUtils.getColumnIndices(selectionColumns, dataSchema);
+        int numColumns = columnIndices.length;
+
+        DataSchema resultDataSchema = SelectionOperatorUtils.getSchemaForProjection(dataSchema, columnIndices);
+
+        // Extract the result rows
+        LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
+        for (Object[] row : result.getRows()) {
+          assert row != null;
+          Object[] extractedRow = new Object[numColumns];
+          for (int i = 0; i < numColumns; i++) {
+            Object value = row[columnIndices[i]];
+            if (value != null) {
+              extractedRow[i] = value;
+            }
+          }
+
+          rowsInSelectionResults.addFirst(extractedRow);
+        }
+
+        return new InstanceResponseBlock(
+            new SelectionResultsBlock(resultDataSchema, rowsInSelectionResults),
+            serverQueryRequest.getQueryContext());
+      } else {
+        return result;
+      }
     } catch (Exception e) {
       InstanceResponseBlock errorResponse = new InstanceResponseBlock();
-      errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage());
+      errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, Objects.toString(e.getMessage()));
       return errorResponse;
     }
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 64a3ed6810..79dcd1f6a6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -106,7 +106,7 @@ public class SortOperator extends BaseOperator<TransferableBlock> {
       LinkedList<Object[]> rows = new LinkedList<>();
       while (_rows.size() > _offset) {
         Object[] row = _rows.poll();
-          rows.addFirst(row);
+        rows.addFirst(row);
       }
       _isSortedBlockConstructed = true;
       if (rows.size() == 0) {
diff --git a/pinot-query-runtime/src/test/resources/queries/OrderBy.json b/pinot-query-runtime/src/test/resources/queries/OrderBy.json
new file mode 100644
index 0000000000..2c7708743c
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/OrderBy.json
@@ -0,0 +1,232 @@
+{
+  "basic_order_by": {
+    "tables": {
+      "basic": {
+        "schema": [
+          {"name": "col0", "type": "INT"},
+          {"name": "col1", "type": "INT"},
+          {"name": "col2", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, 2, "a"],
+          [2, 3, "b"],
+          [3, 1, "c"],
+          [4, 4, "d"],
+          [5, 5, "e"],
+          [6, 6, "f"]
+        ]
+      }
+    },
+    "queries": [
+      {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col0 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT 2 OFFSET 0"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col2, col1 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1, col2 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 2"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1 LIMIT 100"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1 OFFSET 100"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1 DESC"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1 DESC LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1 ASC"},
+      {"sql": "SELECT * FROM {basic} ORDER BY col1 ASC LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} ORDER BY power(col1, 2)"},
+      {"sql": "SELECT * FROM {basic} WHERE col1 > 3 ORDER BY col1 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} WHERE col0 > 3 ORDER BY col1 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {basic} WHERE col1 > 3 AND col1 < 5 ORDER BY col1 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT col1 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT col2 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT col1, col2 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT col1, col0, col2 FROM {basic} ORDER BY col1 LIMIT 2 OFFSET 1"},
+      {
+        "ignored": true,
+        "comment": "we don't support ALL statement",
+        "sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT ALL"
+      },
+      {
+        "ignored": true,
+        "comment": "we don't support LIMIT NULL",
+        "sql": "SELECT * FROM {basic} ORDER BY col2 LIMIT NULL"
+      }
+    ]
+  },
+  "order_by_agg": {
+    "tables": {
+      "agg": {
+        "schema": [
+          {"name": "val", "type": "INT"},
+          {"name": "g", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, "a"],
+          [2, "a"],
+          [3, "b"],
+          [4, "b"],
+          [5, "c"]
+        ]
+      }
+    },
+    "queries": [
+      {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY sum"},
+      {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY sum LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g HAVING SUM(val) > 3 ORDER BY sum LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY g"},
+      {"sql": "SELECT g, SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY SUM(val)"},
+      {"sql": "SELECT SUM(val) AS sum FROM {agg} GROUP BY g ORDER BY g"}
+    ]
+  },
+  "order_by_join": {
+    "tables": {
+      "l": {
+        "schema": [
+          {"name": "key", "type": "STRING"},
+          {"name": "lval", "type": "INT"}
+        ],
+        "inputs": [
+          ["foo", 1],
+          ["foo", 3],
+          ["foo", 5],
+          ["bar", 2],
+          ["bar", 4],
+          ["bar", 6]
+        ]
+      },
+      "r": {
+        "schema": [
+          {"name": "key", "type": "STRING"},
+          {"name": "rval", "type": "INT"}
+        ],
+        "inputs": [
+          ["foo", 1],
+          ["foo", 3],
+          ["foo", 7],
+          ["bar", 2],
+          ["bar", 4],
+          ["bar", 8]
+        ]
+      }
+    },
+    "queries": [
+      {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY lval, rval LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT lval FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY lval, rval LIMIT 2 OFFSET 1"},
+      {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY {l}.key"},
+      {"sql": "SELECT * FROM {l} JOIN {r} ON {l}.key = {r}.key ORDER BY {r}.key"},
+      {"sql": "SELECT {l}.key, SUM(lval), SUM(rval) FROM {l} JOIN {r} ON {l}.key = {r}.key GROUP BY {l}.key ORDER BY {l}.key"}
+    ]
+  },
+  "order_by_subqueries": {
+    "tables": {
+      "tbl": {
+        "schema": [
+          {"name": "col0", "type": "INT"},
+          {"name": "col1", "type": "INT"},
+          {"name": "col2", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, 2, "a"],
+          [2, 3, "b"],
+          [3, 1, "c"],
+          [4, 4, "d"],
+          [5, 5, "e"],
+          [6, 6, "f"]
+        ]
+      }
+    },
+    "queries": [
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col0)"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2)"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET 1)"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET 1) ORDER BY col1 LIMIT 3"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2 OFFSET 1) ORDER BY col2 LIMIT 3 OFFSET 1"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 5 OFFSET 1) ORDER BY col1 LIMIT 3 OFFSET 1"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 5 OFFSET 1) ORDER BY col1 OFFSET 1"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1 LIMIT 2) LIMIT 1"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1) ORDER BY col0"},
+      {"sql": "SELECT * FROM (SELECT * FROM {tbl} ORDER BY col1) ORDER BY col0 LIMIT 1"}
+    ]
+  },
+  "order_by_float": {
+    "tables": {
+      "floats": {
+        "schema": [
+          {"name": "col0", "type": "FLOAT"},
+          {"name": "col1", "type": "FLOAT"}
+        ],
+        "inputs": [
+          [0.0, 2.2],
+          [3.3, -3.3],
+          [4.4, 1.1],
+          [-5.5, 4.4],
+          [6.6, 5.0],
+          [7.7, -6.6]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "sql": "SELECT col1 FROM {floats} ORDER BY col1 LIMIT 2 OFFSET 1",
+        "description": "Basic test for ordering by a float column with a limit and offset"
+      },
+      {
+        "sql": "SELECT col0 FROM {floats} ORDER BY col0 * 2 LIMIT 2 OFFSET 1",
+        "description": "Testing order by with an expression involving floats"
+      }
+    ]
+  },
+  "order_by_double": {
+    "tables": {
+      "doubles": {
+        "schema": [
+          {"name": "col0", "type": "DOUBLE"},
+          {"name": "col1", "type": "DOUBLE"}
+        ],
+        "inputs": [
+          [0.0, 2.2],
+          [3.3, -3.3],
+          [4.4, 1.1],
+          [-5.5, 4.4],
+          [6.6, -5.0],
+          [7.7, 6.6]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "sql": "SELECT col1 FROM {doubles} ORDER BY col1 LIMIT 2 OFFSET 1",
+        "description": "Basic test for ordering by a double column with a limit and offset"
+      },
+      {
+        "sql": "SELECT col0 FROM {doubles} ORDER BY col0 * 2 LIMIT 2 OFFSET 1",
+        "description": "Testing order by with an expression involving doubles"
+      }
+    ]
+  },
+  "order_by_boolean": {
+    "ignored": true,
+    "comment": "fails when we try to canonicalizeRow with ClassCastException - value is already in memory as boolean but DataSchema expects it to be an int",
+    "tables": {
+      "bools": {
+        "schema": [
+          {"name": "col0", "type": "BOOLEAN"},
+          {"name": "col1", "type": "INT"}
+        ],
+        "inputs": [
+          [true, 2],
+          [false, 3],
+          [false, 1],
+          [true, 4],
+          [true, 5],
+          [false, 6]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "sql": "SELECT col1 FROM {bools} ORDER BY col0, col1 LIMIT 2 OFFSET 1",
+        "description": "Basic test for ordering by a boolean column with a limit and offset"
+      }
+    ]
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org