You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/01 01:22:43 UTC

[pinot] branch master updated: [Multi-stage] Support ordering null values (#10819)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 37f44f08d9 [Multi-stage] Support ordering null values (#10819)
37f44f08d9 is described below

commit 37f44f08d92701104f7bf64494b41c1c3f782e26
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed May 31 18:22:37 2023 -0700

    [Multi-stage] Support ordering null values (#10819)
---
 .../pinot/common/datablock/DataBlockUtils.java     |   2 +-
 .../query/parser/CalciteRexExpressionParser.java   |  59 ++++---
 .../query/planner/plannode/MailboxReceiveNode.java |  29 +++-
 .../pinot/query/planner/plannode/SortNode.java     |  37 +++--
 .../pinot/query/runtime/operator/SortOperator.java |  14 +-
 .../operator/SortedMailboxReceiveOperator.java     |  15 +-
 .../query/runtime/operator/utils/SortUtils.java    |  47 +++---
 .../query/runtime/plan/PhysicalPlanVisitor.java    |   7 +-
 .../runtime/plan/ServerRequestPlanVisitor.java     |  12 +-
 .../query/runtime/operator/SortOperatorTest.java   | 171 +++++++++++++++++----
 .../operator/SortedMailboxReceiveOperatorTest.java |  58 ++++---
 .../src/test/resources/queries/NullHandling.json   |  10 ++
 12 files changed, 328 insertions(+), 133 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 8e4f9826a3..6bf6a0e05f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -49,7 +49,7 @@ public final class DataBlockUtils {
   }
 
   private static String extractErrorMsg(Throwable t) {
-    while (t.getMessage() == null) {
+    while (t.getCause() != null && t.getMessage() == null) {
       t = t.getCause();
     }
     return t.getMessage() + "\n" + QueryException.getTruncatedStackTrace(t);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index 25a48bec1a..34dc84805f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -23,7 +23,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.ExpressionType;
@@ -31,6 +32,7 @@ import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.plannode.SortNode;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.sql.FilterKind;
 import org.apache.pinot.sql.parsers.SqlCompilationException;
@@ -99,31 +101,46 @@ public class CalciteRexExpressionParser {
     return selectExpr;
   }
 
-  public static List<Expression> convertOrderByList(List<RexExpression> rexInputRefs,
-      List<RelFieldCollation.Direction> directions, PinotQuery pinotQuery) {
-    List<Expression> orderByExpr = new ArrayList<>();
-
-    for (int i = 0; i < rexInputRefs.size(); i++) {
-      orderByExpr.add(convertOrderBy(rexInputRefs.get(i), directions.get(i), pinotQuery));
+  public static List<Expression> convertOrderByList(SortNode node, PinotQuery pinotQuery) {
+    List<RexExpression> collationKeys = node.getCollationKeys();
+    List<Direction> collationDirections = node.getCollationDirections();
+    List<NullDirection> collationNullDirections = node.getCollationNullDirections();
+    int numKeys = collationKeys.size();
+    List<Expression> orderByExpressions = new ArrayList<>(numKeys);
+    for (int i = 0; i < numKeys; i++) {
+      orderByExpressions.add(
+          convertOrderBy(collationKeys.get(i), collationDirections.get(i), collationNullDirections.get(i), pinotQuery));
     }
-    return orderByExpr;
+    return orderByExpressions;
   }
 
-  private static Expression convertOrderBy(RexExpression rexNode, RelFieldCollation.Direction direction,
+  private static Expression convertOrderBy(RexExpression rexNode, Direction direction, NullDirection nullDirection,
       PinotQuery pinotQuery) {
-    Expression expression;
-    switch (direction) {
-      case DESCENDING:
-        expression = getFunctionExpression("DESC");
-        expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
-        break;
-      case ASCENDING:
-      default:
-        expression = getFunctionExpression("ASC");
-        expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
-        break;
+    if (direction == Direction.ASCENDING) {
+      Expression expression = getFunctionExpression("asc");
+      expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
+      // NOTE: Add explicit NULL direction only if it is not the default behavior (default behavior treats NULL as the
+      //       largest value)
+      if (nullDirection == NullDirection.FIRST) {
+        Expression nullFirstExpression = getFunctionExpression("nullsfirst");
+        nullFirstExpression.getFunctionCall().addToOperands(expression);
+        return nullFirstExpression;
+      } else {
+        return expression;
+      }
+    } else {
+      Expression expression = getFunctionExpression("desc");
+      expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
+      // NOTE: Add explicit NULL direction only if it is not the default behavior (default behavior treats NULL as the
+      //       largest value)
+      if (nullDirection == NullDirection.LAST) {
+        Expression nullLastExpression = getFunctionExpression("nullslast");
+        nullLastExpression.getFunctionCall().addToOperands(expression);
+        return nullLastExpression;
+      } else {
+        return expression;
+      }
     }
-    return expression;
   }
 
   private static Expression convertDistinctAndSelectListToFunctionExpression(RexExpression.FunctionCall rexCall,
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
index 3597651fdd..e67b8343ed 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
@@ -25,6 +25,8 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -42,7 +44,9 @@ public class MailboxReceiveNode extends AbstractPlanNode {
   @ProtoProperties
   private List<RexExpression> _collationKeys;
   @ProtoProperties
-  private List<RelFieldCollation.Direction> _collationDirections;
+  private List<Direction> _collationDirections;
+  @ProtoProperties
+  private List<NullDirection> _collationNullDirections;
   @ProtoProperties
   private boolean _isSortOnSender;
   @ProtoProperties
@@ -65,15 +69,26 @@ public class MailboxReceiveNode extends AbstractPlanNode {
     _exchangeType = exchangeType;
     _partitionKeySelector = partitionKeySelector;
     if (!CollectionUtils.isEmpty(fieldCollations)) {
-      _collationKeys = new ArrayList<>(fieldCollations.size());
-      _collationDirections = new ArrayList<>(fieldCollations.size());
+      int numCollations = fieldCollations.size();
+      _collationKeys = new ArrayList<>(numCollations);
+      _collationDirections = new ArrayList<>(numCollations);
+      _collationNullDirections = new ArrayList<>(numCollations);
       for (RelFieldCollation fieldCollation : fieldCollations) {
-        _collationDirections.add(fieldCollation.getDirection());
         _collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+        Direction direction = fieldCollation.getDirection();
+        Preconditions.checkArgument(direction == Direction.ASCENDING || direction == Direction.DESCENDING,
+            "Unsupported ORDER-BY direction: %s", direction);
+        _collationDirections.add(direction);
+        NullDirection nullDirection = fieldCollation.nullDirection;
+        if (nullDirection == NullDirection.UNSPECIFIED) {
+          nullDirection = direction == Direction.ASCENDING ? NullDirection.LAST : NullDirection.FIRST;
+        }
+        _collationNullDirections.add(nullDirection);
       }
     } else {
       _collationKeys = Collections.emptyList();
       _collationDirections = Collections.emptyList();
+      _collationNullDirections = Collections.emptyList();
     }
     _isSortOnSender = isSortOnSender;
     Preconditions.checkState(!isSortOnSender, "Input shouldn't be sorted as ordering on send is not yet implemented!");
@@ -105,10 +120,14 @@ public class MailboxReceiveNode extends AbstractPlanNode {
     return _collationKeys;
   }
 
-  public List<RelFieldCollation.Direction> getCollationDirections() {
+  public List<Direction> getCollationDirections() {
     return _collationDirections;
   }
 
+  public List<NullDirection> getCollationNullDirections() {
+    return _collationNullDirections;
+  }
+
   public boolean isSortOnSender() {
     return _isSortOnSender;
   }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
index 6bc91430f3..293a8f9eda 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pinot.query.planner.plannode;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -30,7 +33,9 @@ public class SortNode extends AbstractPlanNode {
   @ProtoProperties
   private List<RexExpression> _collationKeys;
   @ProtoProperties
-  private List<RelFieldCollation.Direction> _collationDirections;
+  private List<Direction> _collationDirections;
+  @ProtoProperties
+  private List<NullDirection> _collationNullDirections;
   @ProtoProperties
   private int _fetch;
   @ProtoProperties
@@ -43,24 +48,38 @@ public class SortNode extends AbstractPlanNode {
   public SortNode(int planFragmentId, List<RelFieldCollation> fieldCollations, int fetch, int offset,
       DataSchema dataSchema) {
     super(planFragmentId, dataSchema);
-    _collationDirections = new ArrayList<>(fieldCollations.size());
-    _collationKeys = new ArrayList<>(fieldCollations.size());
-    _fetch = fetch;
-    _offset = offset;
+    int numCollations = fieldCollations.size();
+    _collationKeys = new ArrayList<>(numCollations);
+    _collationDirections = new ArrayList<>(numCollations);
+    _collationNullDirections = new ArrayList<>(numCollations);
     for (RelFieldCollation fieldCollation : fieldCollations) {
-      _collationDirections.add(fieldCollation.getDirection());
       _collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+      Direction direction = fieldCollation.getDirection();
+      Preconditions.checkArgument(direction == Direction.ASCENDING || direction == Direction.DESCENDING,
+          "Unsupported ORDER-BY direction: %s", direction);
+      _collationDirections.add(direction);
+      NullDirection nullDirection = fieldCollation.nullDirection;
+      if (nullDirection == NullDirection.UNSPECIFIED) {
+        nullDirection = direction == Direction.ASCENDING ? NullDirection.LAST : NullDirection.FIRST;
+      }
+      _collationNullDirections.add(nullDirection);
     }
+    _fetch = fetch;
+    _offset = offset;
   }
 
   public List<RexExpression> getCollationKeys() {
     return _collationKeys;
   }
 
-  public List<RelFieldCollation.Direction> getCollationDirections() {
+  public List<Direction> getCollationDirections() {
     return _collationDirections;
   }
 
+  public List<NullDirection> getCollationNullDirections() {
+    return _collationNullDirections;
+  }
+
   public int getFetch() {
     return _fetch;
   }
@@ -71,9 +90,7 @@ public class SortNode extends AbstractPlanNode {
 
   @Override
   public String explain() {
-    return String.format("SORT%s%s",
-        (_fetch > 0) ? " LIMIT " + _fetch : "",
-        (_offset > 0) ? " OFFSET " + _offset : "");
+    return String.format("SORT%s%s", (_fetch > 0) ? " LIMIT " + _fetch : "", (_offset > 0) ? " OFFSET " + _offset : "");
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 908bfb9406..0f3fef5208 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -56,16 +56,18 @@ public class SortOperator extends MultiStageOperator {
   private TransferableBlock _upstreamErrorBlock;
 
   public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
-      List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, int fetch, int offset,
-      DataSchema dataSchema, boolean isInputSorted) {
-    this(context, upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, isInputSorted,
-        SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
+      List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
+      List<RelFieldCollation.NullDirection> collationNullDirections, int fetch, int offset, DataSchema dataSchema,
+      boolean isInputSorted) {
+    this(context, upstreamOperator, collationKeys, collationDirections, collationNullDirections, fetch, offset,
+        dataSchema, isInputSorted, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
         CommonConstants.Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
   }
 
   @VisibleForTesting
   SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
-      List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
+      List<RelFieldCollation.Direction> collationDirections,
+      List<RelFieldCollation.NullDirection> collationNullDirections, int fetch, int offset, DataSchema dataSchema,
       boolean isInputSorted, int defaultHolderCapacity, int defaultResponseLimit) {
     super(context);
     _upstreamOperator = upstreamOperator;
@@ -87,7 +89,7 @@ public class SortOperator extends MultiStageOperator {
       // Use the opposite direction as specified by the collation directions since we need the PriorityQueue to decide
       // which elements to keep and which to remove based on the limits.
       _priorityQueue = new PriorityQueue<>(Math.min(defaultHolderCapacity, _numRowsToKeep),
-          new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false, true));
+          new SortUtils.SortComparator(collationKeys, collationDirections, collationNullDirections, dataSchema, true));
       _rows = null;
     }
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
index 8faa8368ff..f5d4a8c65e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -23,7 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.exception.QueryException;
@@ -48,7 +49,8 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
 
   private final DataSchema _dataSchema;
   private final List<RexExpression> _collationKeys;
-  private final List<RelFieldCollation.Direction> _collationDirections;
+  private final List<Direction> _collationDirections;
+  private final List<NullDirection> _collationNullDirections;
   private final boolean _isSortOnSender;
   private final List<Object[]> _rows = new ArrayList<>();
 
@@ -56,13 +58,14 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
   private boolean _isSortedBlockConstructed;
 
   public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
-      DataSchema dataSchema, List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
-      boolean isSortOnSender, int senderStageId) {
+      DataSchema dataSchema, List<RexExpression> collationKeys, List<Direction> collationDirections,
+      List<NullDirection> collationNullDirections, boolean isSortOnSender, int senderStageId) {
     super(context, exchangeType, senderStageId);
     Preconditions.checkState(!CollectionUtils.isEmpty(collationKeys), "Collation keys must be set");
     _dataSchema = dataSchema;
     _collationKeys = collationKeys;
     _collationDirections = collationDirections;
+    _collationNullDirections = collationNullDirections;
     _isSortOnSender = isSortOnSender;
   }
 
@@ -110,7 +113,9 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
     }
 
     if (!_isSortedBlockConstructed && !_rows.isEmpty()) {
-      _rows.sort(new SortUtils.SortComparator(_collationKeys, _collationDirections, _dataSchema, false, false));
+      _rows.sort(
+          new SortUtils.SortComparator(_collationKeys, _collationDirections, _collationNullDirections, _dataSchema,
+              false));
       _isSortedBlockConstructed = true;
       return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW);
     } else {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
index 1d2c42a193..7827fcc1c0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
@@ -20,13 +20,13 @@ package org.apache.pinot.query.runtime.operator.utils;
 
 import java.util.Comparator;
 import java.util.List;
-import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
 
 
 public class SortUtils {
-
   private SortUtils() {
   }
 
@@ -34,44 +34,32 @@ public class SortUtils {
     private final int _size;
     private final int[] _valueIndices;
     private final int[] _multipliers;
+    private final int[] _nullsMultipliers;
     private final boolean[] _useDoubleComparison;
 
     /**
-     * Sort comparator for use with priority queues
+     * Sort comparator for use with priority queues.
+     *
      * @param collationKeys collation keys to sort on
      * @param collationDirections collation direction for each collation key to sort on
+     * @param collationNullDirections collation direction for NULL values in each collation key to sort on
      * @param dataSchema data schema to use
-     * @param isNullHandlingEnabled 'true' if null handling is enabled. Not supported yet
      * @param switchDirections 'true' if the opposite sort direction should be used as what is specified
      */
-    public SortComparator(List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
-        DataSchema dataSchema, boolean isNullHandlingEnabled, boolean switchDirections) {
+    public SortComparator(List<RexExpression> collationKeys, List<Direction> collationDirections,
+        List<NullDirection> collationNullDirections, DataSchema dataSchema, boolean switchDirections) {
       DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
       _size = collationKeys.size();
       _valueIndices = new int[_size];
       _multipliers = new int[_size];
+      _nullsMultipliers = new int[_size];
       _useDoubleComparison = new boolean[_size];
       for (int i = 0; i < _size; i++) {
         _valueIndices[i] = ((RexExpression.InputRef) collationKeys.get(i)).getIndex();
-        _multipliers[i] = switchDirections ? (collationDirections.get(i).isDescending() ? 1 : -1)
-            : (collationDirections.get(i).isDescending() ? -1 : 1);
-        _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
-      }
-    }
-
-    /**
-     * Sort comparator for use with priority queues
-     * @param dataSchema data schema to use
-     */
-    public SortComparator(DataSchema dataSchema) {
-      DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
-      _size = columnDataTypes.length;
-      _valueIndices = new int[_size];
-      _multipliers = new int[_size];
-      _useDoubleComparison = new boolean[_size];
-      for (int i = 0; i < _size; i++) {
-        _valueIndices[i] = i;
-        _multipliers[i] = 1;
+        int multiplier = collationDirections.get(i) == Direction.ASCENDING ? 1 : -1;
+        _multipliers[i] = switchDirections ? -multiplier : multiplier;
+        int nullsMultiplier = collationNullDirections.get(i) == NullDirection.LAST ? 1 : -1;
+        _nullsMultipliers[i] = switchDirections ? -nullsMultiplier : nullsMultiplier;
         _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
       }
     }
@@ -82,6 +70,15 @@ public class SortUtils {
         int index = _valueIndices[i];
         Object v1 = o1[index];
         Object v2 = o2[index];
+        if (v1 == null) {
+          if (v2 == null) {
+            continue;
+          }
+          return _nullsMultipliers[i];
+        }
+        if (v2 == null) {
+          return -_nullsMultipliers[i];
+        }
         int result;
         if (_useDoubleComparison[i]) {
           result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index f8dfe58e26..8b98fa517a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -72,8 +72,8 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
     if (node.isSortOnReceiver()) {
       SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
           new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
-              node.getDataSchema(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(),
-              node.getSenderStageId());
+              node.getDataSchema(), node.getCollationKeys(), node.getCollationDirections(),
+              node.getCollationNullDirections(), node.isSortOnSender(), node.getSenderStageId());
       context.addReceivingMailboxIds(sortedMailboxReceiveOperator.getMailboxIds());
       return sortedMailboxReceiveOperator;
     } else {
@@ -167,7 +167,8 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
     MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
     boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator;
     return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(),
-        node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema(), isInputSorted);
+        node.getCollationDirections(), node.getCollationNullDirections(), node.getFetch(), node.getOffset(),
+        node.getDataSchema(), isInputSorted);
   }
 
   @Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index d712e524ab..a4bf0450ea 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -108,8 +108,7 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
     pinotQuery.setExplain(false);
     ServerPlanRequestContext context =
         new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs,
-            stagePlan.getServer(), stagePlan.getStageMetadata(), pinotQuery, tableType, timeBoundaryInfo,
-            traceEnabled);
+            stagePlan.getServer(), stagePlan.getStageMetadata(), pinotQuery, tableType, timeBoundaryInfo, traceEnabled);
 
     // visit the plan and create query physical plan.
     ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
@@ -232,16 +231,15 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
   @Override
   public Void visitSort(SortNode node, ServerPlanRequestContext context) {
     visitChildren(node, context);
+    PinotQuery pinotQuery = context.getPinotQuery();
     if (node.getCollationKeys().size() > 0) {
-      context.getPinotQuery().setOrderByList(
-          CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(), node.getCollationDirections(),
-              context.getPinotQuery()));
+      pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery));
     }
     if (node.getFetch() > 0) {
-      context.getPinotQuery().setLimit(node.getFetch());
+      pinotQuery.setLimit(node.getFetch());
     }
     if (node.getOffset() > 0) {
-      context.getPinotQuery().setOffset(node.getOffset());
+      pinotQuery.setOffset(node.getOffset());
     }
     return null;
   }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index ebdb23aa3e..b450d4b481 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.logical.RexExpression;
@@ -68,9 +69,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")));
@@ -87,9 +90,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
 
@@ -105,9 +110,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
@@ -123,9 +130,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -146,9 +155,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
-        schema, true);
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, true);
 
     // Purposefully setting input as unsorted order for validation but 'isInputSorted' should only be true if actually
     // sorted
@@ -171,9 +182,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new DataSchema.ColumnDataType[]{INT, INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -194,9 +207,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{STRING});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -217,9 +232,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.DESCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -240,9 +257,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 1,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -263,9 +282,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1,
-        schema, true);
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 1,
+            schema, true);
 
     // Set input rows as sorted since input is expected to be sorted
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}))
@@ -287,9 +308,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 1, 1,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -309,9 +332,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1,
-        schema, true);
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 1, 1,
+            schema, true);
 
     // Set input rows as sorted since input is expected to be sorted
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}))
@@ -332,10 +357,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, false, 10,
-            1);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 0, 0,
+            schema, false, 10, 1);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -355,9 +381,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, -1, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, -1, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -376,9 +404,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
         .thenReturn(block(schema, new Object[]{1}))
@@ -400,9 +430,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
-        schema, true);
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, true);
 
     // Set input rows as sorted since input is expected to be sorted
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}))
@@ -425,9 +457,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0, 1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST, NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -450,9 +484,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0, 1);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST, NullDirection.FIRST);
     DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock())
         .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -475,9 +511,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
         .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{1}))
@@ -500,9 +538,11 @@ public class SortOperatorTest {
     // Given:
     List<RexExpression> collation = collation(0);
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
-    SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
-        schema, true);
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, true);
 
     // Set input rows as sorted since input is expected to be sorted
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}))
@@ -521,6 +561,85 @@ public class SortOperatorTest {
     Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
   }
 
+  @Test
+  public void shouldHaveNullAtLast() {
+    // Given:
+    List<RexExpression> collation = collation(0);
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
+    DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
+
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{null}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Assert.assertEquals(block.getNumRows(), 3);
+    Assert.assertEquals(block.getContainer().get(0), new Object[]{1});
+    Assert.assertEquals(block.getContainer().get(1), new Object[]{2});
+    Assert.assertEquals(block.getContainer().get(2), new Object[]{null});
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+  }
+
+  @Test
+  public void shouldHaveNullAtFirst() {
+    // Given:
+    List<RexExpression> collation = collation(0);
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.FIRST);
+    DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
+
+    Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{null}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Assert.assertEquals(block.getNumRows(), 3);
+    Assert.assertEquals(block.getContainer().get(0), new Object[]{null});
+    Assert.assertEquals(block.getContainer().get(1), new Object[]{1});
+    Assert.assertEquals(block.getContainer().get(2), new Object[]{2});
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+  }
+
+  @Test
+  public void shouldHandleMultipleCollationKeysWithNulls() {
+    // Given:
+    List<RexExpression> collation = collation(0, 1);
+    List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING);
+    List<NullDirection> nullDirections = ImmutableList.of(NullDirection.FIRST, NullDirection.LAST);
+    DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
+    SortOperator op =
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+            schema, false);
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(block(schema, new Object[]{1, 1}, new Object[]{1, null}, new Object[]{null, 1}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    // When:
+    TransferableBlock block = op.nextBlock(); // construct
+    TransferableBlock block2 = op.nextBlock(); // eos
+
+    // Then:
+    Assert.assertEquals(block.getNumRows(), 3);
+    Assert.assertEquals(block.getContainer().get(0), new Object[]{null, 1});
+    Assert.assertEquals(block.getContainer().get(1), new Object[]{1, 1});
+    Assert.assertEquals(block.getContainer().get(2), new Object[]{1, null});
+    Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+  }
+
   private static List<RexExpression> collation(int... indexes) {
     return Arrays.stream(indexes).mapToObj(RexExpression.InputRef::new).collect(Collectors.toList());
   }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 7ed0c2469a..5bd973f4c2 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -26,7 +26,8 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
@@ -60,8 +61,8 @@ public class SortedMailboxReceiveOperatorTest {
   private static final DataSchema DATA_SCHEMA =
       new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
   private static final List<RexExpression> COLLATION_KEYS = Collections.singletonList(new RexExpression.InputRef(0));
-  private static final List<RelFieldCollation.Direction> COLLATION_DIRECTIONS =
-      Collections.singletonList(RelFieldCollation.Direction.ASCENDING);
+  private static final List<Direction> COLLATION_DIRECTIONS = Collections.singletonList(Direction.ASCENDING);
+  private static final List<NullDirection> COLLATION_NULL_DIRECTIONS = Collections.singletonList(NullDirection.LAST);
   private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
   private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 1, 1, 0, 0);
 
@@ -129,26 +130,25 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldThrowSingletonNoMatchMailboxServer() {
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
-    StageMetadata stageMetadata = new StageMetadata.Builder()
-        .setWorkerMetadataList(Stream.of(server1, server2).map(
-            s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList()))
-        .build();
+    StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
+        Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
+            .collect(Collectors.toList())).build();
     OpChainExecutionContext context =
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             stageMetadata, false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
-        COLLATION_DIRECTIONS, false, 1);
+        COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
   }
 
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            null, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, null,
+            false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
-        COLLATION_DIRECTIONS, false, 1);
+        COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
   }
 
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Collation keys.*")
@@ -159,7 +159,7 @@ public class SortedMailboxReceiveOperatorTest {
             _stageMetadata1, false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
-        Collections.emptyList(), false, 1);
+        Collections.emptyList(), Collections.emptyList(), false, 1);
   }
 
   @Test
@@ -171,7 +171,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
             _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+        false, 1)) {
       Thread.sleep(100L);
       TransferableBlock mailbox = receiveOp.nextBlock();
       assertTrue(mailbox.isErrorBlock());
@@ -183,7 +184,8 @@ public class SortedMailboxReceiveOperatorTest {
     context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
         System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+        false, 1)) {
       Thread.sleep(100L);
       TransferableBlock mailbox = receiveOp.nextBlock();
       assertFalse(mailbox.isErrorBlock());
@@ -197,7 +199,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+        false, 1)) {
       assertTrue(receiveOp.nextBlock().isNoOpBlock());
     }
   }
@@ -210,7 +213,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+        false, 1)) {
       assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
     }
   }
@@ -225,7 +229,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+        false, 1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
       assertEquals(actualRows.size(), 1);
       assertEquals(actualRows.get(0), row);
@@ -243,7 +248,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+        false, 1)) {
       TransferableBlock block = receiveOp.nextBlock();
       assertTrue(block.isErrorBlock());
       assertTrue(block.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains(errorMessage));
@@ -262,7 +268,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             _stageMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
+        COLLATION_NULL_DIRECTIONS, false, 1)) {
       assertTrue(receiveOp.nextBlock().isNoOpBlock());
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
       assertEquals(actualRows.size(), 1);
@@ -285,7 +292,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             _stageMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
+        COLLATION_NULL_DIRECTIONS, false, 1)) {
       TransferableBlock block = receiveOp.nextBlock();
       assertTrue(block.isErrorBlock());
       assertTrue(block.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains(errorMessage));
@@ -310,7 +318,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             _stageMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+        RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
+        COLLATION_NULL_DIRECTIONS, false, 1)) {
       assertEquals(receiveOp.nextBlock().getContainer(), Arrays.asList(row5, row2, row4, row1, row3));
       assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
     }
@@ -321,8 +330,8 @@ public class SortedMailboxReceiveOperatorTest {
     DataSchema dataSchema =
         new DataSchema(new String[]{"col1", "col2", "col3"}, new DataSchema.ColumnDataType[]{INT, INT, STRING});
     List<RexExpression> collationKeys = Arrays.asList(new RexExpression.InputRef(2), new RexExpression.InputRef(0));
-    List<RelFieldCollation.Direction> collationDirection =
-        Arrays.asList(RelFieldCollation.Direction.DESCENDING, RelFieldCollation.Direction.ASCENDING);
+    List<Direction> collationDirections = Arrays.asList(Direction.DESCENDING, Direction.ASCENDING);
+    List<NullDirection> collationNullDirections = Arrays.asList(NullDirection.FIRST, NullDirection.LAST);
 
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     Object[] row1 = new Object[]{3, 3, "queen"};
@@ -341,7 +350,8 @@ public class SortedMailboxReceiveOperatorTest {
         new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
             _stageMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
-        RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirection, false, 1)) {
+        RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections,
+        false, 1)) {
       assertEquals(receiveOp.nextBlock().getContainer(), Arrays.asList(row1, row2, row3, row5, row4));
       assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
     }
diff --git a/pinot-query-runtime/src/test/resources/queries/NullHandling.json b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
index f51701317c..36fad242e6 100644
--- a/pinot-query-runtime/src/test/resources/queries/NullHandling.json
+++ b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
@@ -45,6 +45,16 @@
       {
         "description": "LEFT JOIN and GROUP BY with AGGREGATE",
         "sql": "SELECT {tbl1}.strCol2, COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 GROUP BY {tbl1}.strCol2"
+      },
+      {
+        "description": "LEFT JOIN and SORT (by default, H2 treats null as the smallest value, which is different from Postgres, thus we don't test the default ordering)",
+        "sql": "SELECT {tbl2}.doubleCol1 AS col FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 ORDER BY col NULLS FIRST",
+        "keepOutputRowOrder": true
+      },
+      {
+        "description": "LEFT JOIN and GROUP BY with AGGREGATE AND SORT",
+        "sql": "SELECT {tbl1}.strCol2, COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1) AS minCol, MAX({tbl2}.doubleCol1) AS maxCol, SUM({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 GROUP BY {tbl1}.strCol2 ORDER BY minCol DESC NULLS LAST, maxCol ASC NULLS LAST",
+        "keepOutputRowOrder": true
       }
     ]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org