You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/08 19:55:39 UTC

[pinot] branch master updated: [multisage] support ORDER BY (#9279)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 43f8903de0 [multisage] support ORDER BY (#9279)
43f8903de0 is described below

commit 43f8903de0287cec93b627c8b325804bee95d3e3
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Sep 8 12:55:32 2022 -0700

    [multisage] support ORDER BY (#9279)
    
    * unset limit
    
    * initial support for order by
    
    * adding tests
    
    - this is not ready, since we dont do final stage reduce sort
    
    * add exchange before sort
    
    * fix test failure
    
    * unset unset limit
    
    * fix rebase
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../query/selection/SelectionOperatorService.java  |  85 +-----------
 .../query/selection/SelectionOperatorUtils.java    |  81 ++++++++++++
 .../rel/rules/PinotJoinExchangeNodeInsertRule.java |  12 +-
 .../calcite/rel/rules/PinotQueryRuleSets.java      |   2 +-
 .../apache/calcite/rel/rules/PinotRuleUtils.java   |  39 ++++++
 .../rel/rules/PinotSortExchangeNodeInsertRule.java |  73 +++++++++++
 .../query/parser/CalciteRexExpressionParser.java   |  23 ++--
 .../query/planner/logical/RelToStageConverter.java |  14 +-
 .../apache/pinot/query/planner/stage/SortNode.java |  70 ++++++++++
 .../query/planner/stage/StageNodeSerDeUtils.java   |   2 +
 .../pinot/query/QueryEnvironmentTestBase.java      |   3 +-
 .../runtime/executor/WorkerQueryExecutor.java      |   7 +
 .../pinot/query/runtime/operator/SortOperator.java | 142 +++++++++++++++++++++
 .../query/runtime/utils/ServerRequestUtils.java    |  14 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |   7 +-
 15 files changed, 461 insertions(+), 113 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index 441e20dc99..16ca57d91f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -18,13 +18,10 @@
  */
 package org.apache.pinot.core.query.selection;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
-import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -82,86 +79,8 @@ public class SelectionOperatorService {
     _numRowsToKeep = _offset + queryContext.getLimit();
     assert queryContext.getOrderByExpressions() != null;
     _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
-        getTypeCompatibleComparator(queryContext.getOrderByExpressions()));
-  }
-
-  /**
-   * Helper method to get the type-compatible {@link Comparator} for selection rows. (Inter segment)
-   * <p>Type-compatible comparator allows compatible types to compare with each other.
-   *
-   * @return flexible {@link Comparator} for selection rows.
-   */
-  private Comparator<Object[]> getTypeCompatibleComparator(List<OrderByExpressionContext> orderByExpressions) {
-    ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
-
-    // Compare all single-value columns
-    int numOrderByExpressions = orderByExpressions.size();
-    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
-    for (int i = 0; i < numOrderByExpressions; i++) {
-      if (!columnDataTypes[i].isArray()) {
-        valueIndexList.add(i);
-      }
-    }
-
-    int numValuesToCompare = valueIndexList.size();
-    int[] valueIndices = new int[numValuesToCompare];
-    boolean[] useDoubleComparison = new boolean[numValuesToCompare];
-    // Use multiplier -1 or 1 to control ascending/descending order
-    int[] multipliers = new int[numValuesToCompare];
-    for (int i = 0; i < numValuesToCompare; i++) {
-      int valueIndex = valueIndexList.get(i);
-      valueIndices[i] = valueIndex;
-      if (columnDataTypes[valueIndex].isNumber()) {
-        useDoubleComparison[i] = true;
-      }
-      multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
-    }
-
-    if (_queryContext.isNullHandlingEnabled()) {
-      return (o1, o2) -> {
-        for (int i = 0; i < numValuesToCompare; i++) {
-          int index = valueIndices[i];
-          Object v1 = o1[index];
-          Object v2 = o2[index];
-          if (v1 == null) {
-            // The default null ordering is: 'NULLS LAST'.
-            return v2 == null ? 0 : -multipliers[i];
-          } else if (v2 == null) {
-            return multipliers[i];
-          }
-          int result;
-          if (useDoubleComparison[i]) {
-            result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
-          } else {
-            //noinspection unchecked
-            result = ((Comparable) v1).compareTo(v2);
-          }
-          if (result != 0) {
-            return result * multipliers[i];
-          }
-        }
-        return 0;
-      };
-    } else {
-      return (o1, o2) -> {
-        for (int i = 0; i < numValuesToCompare; i++) {
-          int index = valueIndices[i];
-          Object v1 = o1[index];
-          Object v2 = o2[index];
-          int result;
-          if (useDoubleComparison[i]) {
-            result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
-          } else {
-            //noinspection unchecked
-            result = ((Comparable) v1).compareTo(v2);
-          }
-          if (result != 0) {
-            return result * multipliers[i];
-          }
-        }
-        return 0;
-      };
-    }
+        SelectionOperatorUtils.getTypeCompatibleComparator(queryContext.getOrderByExpressions(), _dataSchema,
+            _queryContext.isNullHandlingEnabled()));
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index ac0f8d5b91..851608330d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -542,6 +543,86 @@ public class SelectionOperatorUtils {
     return columnToIndexMap;
   }
 
+  /**
+   * Helper method to get the type-compatible {@link Comparator} for selection rows. (Inter segment)
+   * <p>Type-compatible comparator allows compatible types to compare with each other.
+   *
+   * @return flexible {@link Comparator} for selection rows.
+   */
+  public static Comparator<Object[]> getTypeCompatibleComparator(List<OrderByExpressionContext> orderByExpressions,
+      DataSchema dataSchema, boolean isNullHandlingEnabled) {
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+
+    // Compare all single-value columns
+    int numOrderByExpressions = orderByExpressions.size();
+    List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      if (!columnDataTypes[i].isArray()) {
+        valueIndexList.add(i);
+      }
+    }
+
+    int numValuesToCompare = valueIndexList.size();
+    int[] valueIndices = new int[numValuesToCompare];
+    boolean[] useDoubleComparison = new boolean[numValuesToCompare];
+    // Use multiplier -1 or 1 to control ascending/descending order
+    int[] multipliers = new int[numValuesToCompare];
+    for (int i = 0; i < numValuesToCompare; i++) {
+      int valueIndex = valueIndexList.get(i);
+      valueIndices[i] = valueIndex;
+      if (columnDataTypes[valueIndex].isNumber()) {
+        useDoubleComparison[i] = true;
+      }
+      multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
+    }
+
+    if (isNullHandlingEnabled) {
+      return (o1, o2) -> {
+        for (int i = 0; i < numValuesToCompare; i++) {
+          int index = valueIndices[i];
+          Object v1 = o1[index];
+          Object v2 = o2[index];
+          if (v1 == null) {
+            // The default null ordering is: 'NULLS LAST'.
+            return v2 == null ? 0 : -multipliers[i];
+          } else if (v2 == null) {
+            return multipliers[i];
+          }
+          int result;
+          if (useDoubleComparison[i]) {
+            result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
+          } else {
+            //noinspection unchecked
+            result = ((Comparable) v1).compareTo(v2);
+          }
+          if (result != 0) {
+            return result * multipliers[i];
+          }
+        }
+        return 0;
+      };
+    } else {
+      return (o1, o2) -> {
+        for (int i = 0; i < numValuesToCompare; i++) {
+          int index = valueIndices[i];
+          Object v1 = o1[index];
+          Object v2 = o2[index];
+          int result;
+          if (useDoubleComparison[i]) {
+            result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
+          } else {
+            //noinspection unchecked
+            result = ((Comparable) v1).compareTo(v2);
+          }
+          if (result != 0) {
+            return result * multipliers[i];
+          }
+        }
+        return 0;
+      };
+    }
+  }
+
   /**
    * Helper method to add a value to a {@link PriorityQueue}.
    *
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 072e639733..39af45cff7 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -22,10 +22,8 @@ import com.google.common.collect.ImmutableList;
 import java.util.List;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rel.hint.RelHint;
@@ -55,7 +53,7 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
     }
     if (call.rel(0) instanceof Join) {
       Join join = call.rel(0);
-      return !isExchange(join.getLeft()) && !isExchange(join.getRight());
+      return !PinotRuleUtils.isExchange(join.getLeft()) && !PinotRuleUtils.isExchange(join.getRight());
     }
     return false;
   }
@@ -92,12 +90,4 @@ public class PinotJoinExchangeNodeInsertRule extends RelOptRule {
 
     call.transformTo(newJoinNode);
   }
-
-  private static boolean isExchange(RelNode rel) {
-    RelNode reference = rel;
-    if (reference instanceof HepRelVertex) {
-      reference = ((HepRelVertex) reference).getCurrentRel();
-    }
-    return reference instanceof Exchange;
-  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
index 55202805b2..fcba253d20 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotQueryRuleSets.java
@@ -89,6 +89,6 @@ public class PinotQueryRuleSets {
           // Pinot specific rules
           PinotJoinExchangeNodeInsertRule.INSTANCE,
           PinotAggregateExchangeNodeInsertRule.INSTANCE,
-          PinotLogicalSortFetchEliminationRule.INSTANCE
+          PinotSortExchangeNodeInsertRule.INSTANCE
       );
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
new file mode 100644
index 0000000000..97b1074b7f
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotRuleUtils.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+
+
+public class PinotRuleUtils {
+
+  private PinotRuleUtils() {
+    // do not instantiate.
+  }
+
+  public static boolean isExchange(RelNode rel) {
+    RelNode reference = rel;
+    if (reference instanceof HepRelVertex) {
+      reference = ((HepRelVertex) reference).getCurrentRel();
+    }
+    return reference instanceof Exchange;
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
new file mode 100644
index 0000000000..c590f2b652
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import java.util.Collections;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+
+/**
+ * Special rule for Pinot, this rule is fixed to always insert exchange after JOIN node.
+ */
+public class PinotSortExchangeNodeInsertRule extends RelOptRule {
+  public static final PinotSortExchangeNodeInsertRule INSTANCE =
+      new PinotSortExchangeNodeInsertRule(RelFactories.LOGICAL_BUILDER);
+
+  public PinotSortExchangeNodeInsertRule(RelBuilderFactory factory) {
+    super(operand(LogicalSort.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1) {
+      return false;
+    }
+    if (call.rel(0) instanceof Sort) {
+      Sort sort = call.rel(0);
+      return sort.getCollation().getFieldCollations().size() > 0 && !PinotRuleUtils.isExchange(sort.getInput());
+    }
+    return false;
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Sort sort = call.rel(0);
+    // TODO: this is a single value
+    LogicalExchange exchange = LogicalExchange.create(sort.getInput(), RelDistributions.hash(Collections.emptyList()));
+    call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
+  }
+
+  private static boolean isExchange(RelNode rel) {
+    RelNode reference = rel;
+    if (reference instanceof HepRelVertex) {
+      reference = ((HepRelVertex) reference).getCurrentRel();
+    }
+    return reference instanceof Exchange;
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index 70ee68b21c..804825cff1 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pinot.query.parser;
 
-import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.ExpressionType;
@@ -88,28 +88,25 @@ public class CalciteRexExpressionParser {
     return selectExpr;
   }
 
-  private static List<Expression> convertOrderByList(RexExpression.FunctionCall rexCall, PinotQuery pinotQuery) {
-    Preconditions.checkState(rexCall.getKind() == SqlKind.ORDER_BY);
+  public static List<Expression> convertOrderByList(List<RexExpression> rexInputRefs,
+      List<RelFieldCollation.Direction> directions, PinotQuery pinotQuery) {
     List<Expression> orderByExpr = new ArrayList<>();
 
-    final Iterator<RexExpression> iterator = rexCall.getFunctionOperands().iterator();
-    while (iterator.hasNext()) {
-      final RexExpression next = iterator.next();
-      orderByExpr.add(convertOrderBy(next, pinotQuery));
+    for (int i = 0; i < rexInputRefs.size(); i++) {
+      orderByExpr.add(convertOrderBy(rexInputRefs.get(i), directions.get(i), pinotQuery));
     }
     return orderByExpr;
   }
 
-  private static Expression convertOrderBy(RexExpression rexNode, PinotQuery pinotQuery) {
-    final SqlKind kind = rexNode.getKind();
+  private static Expression convertOrderBy(RexExpression rexNode, RelFieldCollation.Direction direction,
+      PinotQuery pinotQuery) {
     Expression expression;
-    switch (kind) {
+    switch (direction) {
       case DESCENDING:
-        RexExpression.FunctionCall rexCall = (RexExpression.FunctionCall) rexNode;
         expression = RequestUtils.getFunctionExpression("DESC");
-        expression.getFunctionCall().addToOperands(toExpression(rexCall.getFunctionOperands().get(0), pinotQuery));
+        expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
         break;
-      case IDENTIFIER:
+      case ASCENDING:
       default:
         expression = RequestUtils.getFunctionExpression("ASC");
         expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index c7daa877f8..0729ad1ab9 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -28,11 +28,13 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
@@ -40,6 +42,7 @@ import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 
@@ -72,11 +75,20 @@ public final class RelToStageConverter {
       return convertLogicalFilter((LogicalFilter) node, currentStageId);
     } else if (node instanceof LogicalAggregate) {
       return convertLogicalAggregate((LogicalAggregate) node, currentStageId);
+    } else if (node instanceof LogicalSort) {
+      return convertLogicalSort((LogicalSort) node, currentStageId);
     } else {
-      throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
+        throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
     }
   }
 
+  private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
+    int fetch = node.fetch == null ? -1 : ((RexLiteral) node.fetch).getValueAs(Integer.class);
+    int offset = node.offset == null ? -1 : ((RexLiteral) node.offset).getValueAs(Integer.class);
+    return new SortNode(currentStageId, node.getCollation().getFieldCollations(), fetch, offset,
+        toDataSchema(node.getRowType()));
+  }
+
   private static StageNode convertLogicalAggregate(LogicalAggregate node, int currentStageId) {
     return new AggregateNode(currentStageId, toDataSchema(node.getRowType()), node.getAggCallList(),
         node.getGroupSet());
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
new file mode 100644
index 0000000000..6dc27ce12e
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.serde.ProtoProperties;
+
+
+public class SortNode extends AbstractStageNode {
+  @ProtoProperties
+  private List<RexExpression> _collationKeys;
+  @ProtoProperties
+  private List<RelFieldCollation.Direction> _collationDirections;
+  @ProtoProperties
+  private int _fetch;
+  @ProtoProperties
+  private int _offset;
+
+  public SortNode(int stageId) {
+    super(stageId);
+  }
+
+  public SortNode(int stageId, List<RelFieldCollation> fieldCollations, int fetch, int offset, DataSchema dataSchema) {
+    super(stageId, dataSchema);
+    _collationDirections = new ArrayList<>(fieldCollations.size());
+    _collationKeys = new ArrayList<>(fieldCollations.size());
+    _fetch = fetch;
+    _offset = offset;
+    for (RelFieldCollation fieldCollation : fieldCollations) {
+      _collationDirections.add(fieldCollation.getDirection());
+      _collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+    }
+  }
+
+  public List<RexExpression> getCollationKeys() {
+    return _collationKeys;
+  }
+
+  public List<RelFieldCollation.Direction> getCollationDirections() {
+    return _collationDirections;
+  }
+
+  public int getFetch() {
+    return _fetch;
+  }
+
+  public int getOffset() {
+    return _offset;
+  }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
index c76792371f..4e8e1c4e61 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeSerDeUtils.java
@@ -75,6 +75,8 @@ public final class StageNodeSerDeUtils {
         return new FilterNode(stageId);
       case "AggregateNode":
         return new AggregateNode(stageId);
+      case "SortNode":
+        return new SortNode(stageId);
       case "MailboxSendNode":
         return new MailboxSendNode(stageId);
       case "MailboxReceiveNode":
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index f740709d09..f83eea0fb3 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -43,7 +43,8 @@ public class QueryEnvironmentTestBase {
   @DataProvider(name = "testQueryDataProvider")
   protected Object[][] provideQueries() {
     return new Object[][] {
-        new Object[]{"SELECT * FROM a LIMIT 10"},
+        new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 10"},
+        new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 10"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3"},
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index f77b1a0286..018b3dc82c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -35,6 +35,7 @@ import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
@@ -43,6 +44,7 @@ import org.apache.pinot.query.runtime.operator.FilterOperator;
 import org.apache.pinot.query.runtime.operator.HashJoinOperator;
 import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.SortOperator;
 import org.apache.pinot.query.runtime.operator.TransformOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -136,6 +138,11 @@ public class WorkerQueryExecutor {
       ProjectNode projectNode = (ProjectNode) stageNode;
       return new TransformOperator(getOperator(requestId, projectNode.getInputs().get(0), metadataMap),
           projectNode.getDataSchema(), projectNode.getProjects(), projectNode.getInputs().get(0).getDataSchema());
+    } else if (stageNode instanceof SortNode) {
+      SortNode sortNode = (SortNode) stageNode;
+      return new SortOperator(getOperator(requestId, sortNode.getInputs().get(0), metadataMap),
+          sortNode.getCollationKeys(), sortNode.getCollationDirections(), sortNode.getFetch(), sortNode.getOffset(),
+          sortNode.getDataSchema());
     } else {
       throw new UnsupportedOperationException(
           String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
new file mode 100644
index 0000000000..70a4536fde
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.datablock.BaseDataBlock;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+public class SortOperator extends BaseOperator<TransferableBlock> {
+  private static final String EXPLAIN_NAME = "SORT";
+  private final BaseOperator<TransferableBlock> _upstreamOperator;
+  private final int _fetch;
+  private final int _offset;
+  private final DataSchema _dataSchema;
+  private final PriorityQueue<Object[]> _rows;
+  private final int _numRowsToKeep;
+
+  private boolean _isSortedBlockConstructed;
+  private TransferableBlock _upstreamErrorBlock;
+
+  public SortOperator(BaseOperator<TransferableBlock> upstreamOperator, List<RexExpression> collationKeys,
+      List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema) {
+    _upstreamOperator = upstreamOperator;
+    _fetch = fetch;
+    _offset = offset;
+    _dataSchema = dataSchema;
+    _upstreamErrorBlock = null;
+    _isSortedBlockConstructed = false;
+    _numRowsToKeep = _fetch > 0 ? Math.min(SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
+        _fetch + (Math.max(_offset, 0))) : SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY;
+    _rows = new PriorityQueue<>(_numRowsToKeep, SelectionOperatorUtils.getTypeCompatibleComparator(
+        toOrderByExpressions(collationKeys, collationDirections), dataSchema, false));
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    // WorkerExecutor doesn't use getChildOperators, returns null here.
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    try {
+      consumeInputBlocks();
+      return produceSortedBlock();
+    } catch (Exception e) {
+      return TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+  }
+
+  private TransferableBlock produceSortedBlock()
+      throws IOException {
+    if (_upstreamErrorBlock != null) {
+      return _upstreamErrorBlock;
+    }
+    if (!_isSortedBlockConstructed) {
+      int currentOffset = 0;
+      List<Object[]> rows = new ArrayList<>(_rows.size());
+      while (_rows.size() > 0) {
+        Object[] row = _rows.poll();
+        if (currentOffset > _offset) {
+          rows.add(row);
+        }
+        currentOffset++;
+      }
+      _isSortedBlockConstructed = true;
+      if (rows.size() == 0) {
+        return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+      } else {
+        return new TransferableBlock(rows, _dataSchema, BaseDataBlock.Type.ROW);
+      }
+    } else {
+      return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
+    }
+  }
+
+  private void consumeInputBlocks() {
+    if (!_isSortedBlockConstructed) {
+      TransferableBlock block = _upstreamOperator.nextBlock();
+      while (!TransferableBlockUtils.isEndOfStream(block)) {
+        BaseDataBlock dataBlock = block.getDataBlock();
+        int numRows = dataBlock.getNumberOfRows();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId);
+          SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
+        }
+        block = _upstreamOperator.nextBlock();
+      }
+      // setting upstream error block
+      if (block.isErrorBlock()) {
+        _upstreamErrorBlock = block;
+      }
+    }
+  }
+
+  private List<OrderByExpressionContext> toOrderByExpressions(List<RexExpression> collationKeys,
+      List<RelFieldCollation.Direction> collationDirections) {
+    List<OrderByExpressionContext> orderByExpressionContextList = new ArrayList<>(collationKeys.size());
+    for (int i = 0; i < collationKeys.size(); i++) {
+      orderByExpressionContextList.add(new OrderByExpressionContext(ExpressionContext.forIdentifier(
+          _dataSchema.getColumnName(((RexExpression.InputRef) collationKeys.get(i)).getIndex())),
+          !collationDirections.get(i).isDescending()));
+    }
+    return orderByExpressionContextList;
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 738992fe52..07e02bdb8b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -33,6 +33,7 @@ import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -46,7 +47,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
  * conversion step is needed so that the V2 query plan can be converted into a compatible format to run V1 executor.
  */
 public class ServerRequestUtils {
-  private static final int DEFAULT_LEAF_NODE_LIMIT = 1_000_000;
+  private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
 
   private ServerRequestUtils() {
     // do not instantiate.
@@ -116,6 +117,17 @@ public class ServerRequestUtils {
       // set group-by list
       pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(
           ((AggregateNode) node).getGroupSet(), pinotQuery));
+    } else if (node instanceof SortNode) {
+      if (((SortNode) node).getCollationKeys().size() > 0) {
+        pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(((SortNode) node).getCollationKeys(),
+            ((SortNode) node).getCollationDirections(), pinotQuery));
+      }
+      if (((SortNode) node).getFetch() > 0) {
+        pinotQuery.setLimit(((SortNode) node).getFetch());
+      }
+      if (((SortNode) node).getOffset() > 0) {
+        pinotQuery.setOffset(((SortNode) node).getOffset());
+      }
     } else if (node instanceof MailboxSendNode) {
       // TODO: MailboxSendNode should be the root of the leaf stage. but ignore for now since it is handle seperately
       // in QueryRunner as a single step sender.
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 462c5da0c3..89da8574ea 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -66,8 +66,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
   @DataProvider(name = "testDataWithSqlToFinalRowCount")
   private Object[][] provideTestSqlAndRowCount() {
     return new Object[][] {
-        new Object[]{"SELECT * FROM b", 5},
-        new Object[]{"SELECT * FROM a", 15},
+        new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 3", 3},
+        new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 20", 15},
 
         // No match filter
         new Object[]{"SELECT * FROM b WHERE col3 < 0", 0},
@@ -154,6 +154,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         new Object[]{"SELECT a.col2, COUNT(*), MAX(a.col3), MIN(a.col3), SUM(a.col3) FROM a GROUP BY a.col2 "
             + "HAVING COUNT(*) < 5 OR (COUNT(*) > 5 AND SUM(a.col3) >= 10)"
             + "OR (MIN(a.col3) != 20 AND SUM(a.col3) = 100)", 3},
+
+        // Order-by
+        new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON a.col1 = b.col1 ORDER BY a.col3, b.col3 DESC", 15},
     };
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org