You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/01 01:22:43 UTC
[pinot] branch master updated: [Multi-stage] Support ordering null values (#10819)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 37f44f08d9 [Multi-stage] Support ordering null values (#10819)
37f44f08d9 is described below
commit 37f44f08d92701104f7bf64494b41c1c3f782e26
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed May 31 18:22:37 2023 -0700
[Multi-stage] Support ordering null values (#10819)
---
.../pinot/common/datablock/DataBlockUtils.java | 2 +-
.../query/parser/CalciteRexExpressionParser.java | 59 ++++---
.../query/planner/plannode/MailboxReceiveNode.java | 29 +++-
.../pinot/query/planner/plannode/SortNode.java | 37 +++--
.../pinot/query/runtime/operator/SortOperator.java | 14 +-
.../operator/SortedMailboxReceiveOperator.java | 15 +-
.../query/runtime/operator/utils/SortUtils.java | 47 +++---
.../query/runtime/plan/PhysicalPlanVisitor.java | 7 +-
.../runtime/plan/ServerRequestPlanVisitor.java | 12 +-
.../query/runtime/operator/SortOperatorTest.java | 171 +++++++++++++++++----
.../operator/SortedMailboxReceiveOperatorTest.java | 58 ++++---
.../src/test/resources/queries/NullHandling.json | 10 ++
12 files changed, 328 insertions(+), 133 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 8e4f9826a3..6bf6a0e05f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -49,7 +49,7 @@ public final class DataBlockUtils {
}
private static String extractErrorMsg(Throwable t) {
- while (t.getMessage() == null) {
+ while (t.getCause() != null && t.getMessage() == null) {
t = t.getCause();
}
return t.getMessage() + "\n" + QueryException.getTruncatedStackTrace(t);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index 25a48bec1a..34dc84805f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -23,7 +23,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.sql.SqlKind;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.ExpressionType;
@@ -31,6 +32,7 @@ import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.plannode.SortNode;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.SqlCompilationException;
@@ -99,31 +101,46 @@ public class CalciteRexExpressionParser {
return selectExpr;
}
- public static List<Expression> convertOrderByList(List<RexExpression> rexInputRefs,
- List<RelFieldCollation.Direction> directions, PinotQuery pinotQuery) {
- List<Expression> orderByExpr = new ArrayList<>();
-
- for (int i = 0; i < rexInputRefs.size(); i++) {
- orderByExpr.add(convertOrderBy(rexInputRefs.get(i), directions.get(i), pinotQuery));
+ public static List<Expression> convertOrderByList(SortNode node, PinotQuery pinotQuery) {
+ List<RexExpression> collationKeys = node.getCollationKeys();
+ List<Direction> collationDirections = node.getCollationDirections();
+ List<NullDirection> collationNullDirections = node.getCollationNullDirections();
+ int numKeys = collationKeys.size();
+ List<Expression> orderByExpressions = new ArrayList<>(numKeys);
+ for (int i = 0; i < numKeys; i++) {
+ orderByExpressions.add(
+ convertOrderBy(collationKeys.get(i), collationDirections.get(i), collationNullDirections.get(i), pinotQuery));
}
- return orderByExpr;
+ return orderByExpressions;
}
- private static Expression convertOrderBy(RexExpression rexNode, RelFieldCollation.Direction direction,
+ private static Expression convertOrderBy(RexExpression rexNode, Direction direction, NullDirection nullDirection,
PinotQuery pinotQuery) {
- Expression expression;
- switch (direction) {
- case DESCENDING:
- expression = getFunctionExpression("DESC");
- expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
- break;
- case ASCENDING:
- default:
- expression = getFunctionExpression("ASC");
- expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
- break;
+ if (direction == Direction.ASCENDING) {
+ Expression expression = getFunctionExpression("asc");
+ expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
+ // NOTE: Add explicit NULL direction only if it is not the default behavior (default behavior treats NULL as the
+ // largest value)
+ if (nullDirection == NullDirection.FIRST) {
+ Expression nullFirstExpression = getFunctionExpression("nullsfirst");
+ nullFirstExpression.getFunctionCall().addToOperands(expression);
+ return nullFirstExpression;
+ } else {
+ return expression;
+ }
+ } else {
+ Expression expression = getFunctionExpression("desc");
+ expression.getFunctionCall().addToOperands(toExpression(rexNode, pinotQuery));
+ // NOTE: Add explicit NULL direction only if it is not the default behavior (default behavior treats NULL as the
+ // largest value)
+ if (nullDirection == NullDirection.LAST) {
+ Expression nullLastExpression = getFunctionExpression("nullslast");
+ nullLastExpression.getFunctionCall().addToOperands(expression);
+ return nullLastExpression;
+ } else {
+ return expression;
+ }
}
- return expression;
}
private static Expression convertDistinctAndSelectListToFunctionExpression(RexExpression.FunctionCall rexCall,
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
index 3597651fdd..e67b8343ed 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/MailboxReceiveNode.java
@@ -25,6 +25,8 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
@@ -42,7 +44,9 @@ public class MailboxReceiveNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _collationKeys;
@ProtoProperties
- private List<RelFieldCollation.Direction> _collationDirections;
+ private List<Direction> _collationDirections;
+ @ProtoProperties
+ private List<NullDirection> _collationNullDirections;
@ProtoProperties
private boolean _isSortOnSender;
@ProtoProperties
@@ -65,15 +69,26 @@ public class MailboxReceiveNode extends AbstractPlanNode {
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
if (!CollectionUtils.isEmpty(fieldCollations)) {
- _collationKeys = new ArrayList<>(fieldCollations.size());
- _collationDirections = new ArrayList<>(fieldCollations.size());
+ int numCollations = fieldCollations.size();
+ _collationKeys = new ArrayList<>(numCollations);
+ _collationDirections = new ArrayList<>(numCollations);
+ _collationNullDirections = new ArrayList<>(numCollations);
for (RelFieldCollation fieldCollation : fieldCollations) {
- _collationDirections.add(fieldCollation.getDirection());
_collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+ Direction direction = fieldCollation.getDirection();
+ Preconditions.checkArgument(direction == Direction.ASCENDING || direction == Direction.DESCENDING,
+ "Unsupported ORDER-BY direction: %s", direction);
+ _collationDirections.add(direction);
+ NullDirection nullDirection = fieldCollation.nullDirection;
+ if (nullDirection == NullDirection.UNSPECIFIED) {
+ nullDirection = direction == Direction.ASCENDING ? NullDirection.LAST : NullDirection.FIRST;
+ }
+ _collationNullDirections.add(nullDirection);
}
} else {
_collationKeys = Collections.emptyList();
_collationDirections = Collections.emptyList();
+ _collationNullDirections = Collections.emptyList();
}
_isSortOnSender = isSortOnSender;
Preconditions.checkState(!isSortOnSender, "Input shouldn't be sorted as ordering on send is not yet implemented!");
@@ -105,10 +120,14 @@ public class MailboxReceiveNode extends AbstractPlanNode {
return _collationKeys;
}
- public List<RelFieldCollation.Direction> getCollationDirections() {
+ public List<Direction> getCollationDirections() {
return _collationDirections;
}
+ public List<NullDirection> getCollationNullDirections() {
+ return _collationNullDirections;
+ }
+
public boolean isSortOnSender() {
return _isSortOnSender;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
index 6bc91430f3..293a8f9eda 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/SortNode.java
@@ -18,9 +18,12 @@
*/
package org.apache.pinot.query.planner.plannode;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -30,7 +33,9 @@ public class SortNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _collationKeys;
@ProtoProperties
- private List<RelFieldCollation.Direction> _collationDirections;
+ private List<Direction> _collationDirections;
+ @ProtoProperties
+ private List<NullDirection> _collationNullDirections;
@ProtoProperties
private int _fetch;
@ProtoProperties
@@ -43,24 +48,38 @@ public class SortNode extends AbstractPlanNode {
public SortNode(int planFragmentId, List<RelFieldCollation> fieldCollations, int fetch, int offset,
DataSchema dataSchema) {
super(planFragmentId, dataSchema);
- _collationDirections = new ArrayList<>(fieldCollations.size());
- _collationKeys = new ArrayList<>(fieldCollations.size());
- _fetch = fetch;
- _offset = offset;
+ int numCollations = fieldCollations.size();
+ _collationKeys = new ArrayList<>(numCollations);
+ _collationDirections = new ArrayList<>(numCollations);
+ _collationNullDirections = new ArrayList<>(numCollations);
for (RelFieldCollation fieldCollation : fieldCollations) {
- _collationDirections.add(fieldCollation.getDirection());
_collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+ Direction direction = fieldCollation.getDirection();
+ Preconditions.checkArgument(direction == Direction.ASCENDING || direction == Direction.DESCENDING,
+ "Unsupported ORDER-BY direction: %s", direction);
+ _collationDirections.add(direction);
+ NullDirection nullDirection = fieldCollation.nullDirection;
+ if (nullDirection == NullDirection.UNSPECIFIED) {
+ nullDirection = direction == Direction.ASCENDING ? NullDirection.LAST : NullDirection.FIRST;
+ }
+ _collationNullDirections.add(nullDirection);
}
+ _fetch = fetch;
+ _offset = offset;
}
public List<RexExpression> getCollationKeys() {
return _collationKeys;
}
- public List<RelFieldCollation.Direction> getCollationDirections() {
+ public List<Direction> getCollationDirections() {
return _collationDirections;
}
+ public List<NullDirection> getCollationNullDirections() {
+ return _collationNullDirections;
+ }
+
public int getFetch() {
return _fetch;
}
@@ -71,9 +90,7 @@ public class SortNode extends AbstractPlanNode {
@Override
public String explain() {
- return String.format("SORT%s%s",
- (_fetch > 0) ? " LIMIT " + _fetch : "",
- (_offset > 0) ? " OFFSET " + _offset : "");
+ return String.format("SORT%s%s", (_fetch > 0) ? " LIMIT " + _fetch : "", (_offset > 0) ? " OFFSET " + _offset : "");
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 908bfb9406..0f3fef5208 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -56,16 +56,18 @@ public class SortOperator extends MultiStageOperator {
private TransferableBlock _upstreamErrorBlock;
public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
- List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, int fetch, int offset,
- DataSchema dataSchema, boolean isInputSorted) {
- this(context, upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, isInputSorted,
- SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
+ List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
+ List<RelFieldCollation.NullDirection> collationNullDirections, int fetch, int offset, DataSchema dataSchema,
+ boolean isInputSorted) {
+ this(context, upstreamOperator, collationKeys, collationDirections, collationNullDirections, fetch, offset,
+ dataSchema, isInputSorted, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
CommonConstants.Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
}
@VisibleForTesting
SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
- List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
+ List<RelFieldCollation.Direction> collationDirections,
+ List<RelFieldCollation.NullDirection> collationNullDirections, int fetch, int offset, DataSchema dataSchema,
boolean isInputSorted, int defaultHolderCapacity, int defaultResponseLimit) {
super(context);
_upstreamOperator = upstreamOperator;
@@ -87,7 +89,7 @@ public class SortOperator extends MultiStageOperator {
// Use the opposite direction as specified by the collation directions since we need the PriorityQueue to decide
// which elements to keep and which to remove based on the limits.
_priorityQueue = new PriorityQueue<>(Math.min(defaultHolderCapacity, _numRowsToKeep),
- new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false, true));
+ new SortUtils.SortComparator(collationKeys, collationDirections, collationNullDirections, dataSchema, true));
_rows = null;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
index 8faa8368ff..f5d4a8c65e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -23,7 +23,8 @@ import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
@@ -48,7 +49,8 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
private final DataSchema _dataSchema;
private final List<RexExpression> _collationKeys;
- private final List<RelFieldCollation.Direction> _collationDirections;
+ private final List<Direction> _collationDirections;
+ private final List<NullDirection> _collationNullDirections;
private final boolean _isSortOnSender;
private final List<Object[]> _rows = new ArrayList<>();
@@ -56,13 +58,14 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
private boolean _isSortedBlockConstructed;
public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
- DataSchema dataSchema, List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
- boolean isSortOnSender, int senderStageId) {
+ DataSchema dataSchema, List<RexExpression> collationKeys, List<Direction> collationDirections,
+ List<NullDirection> collationNullDirections, boolean isSortOnSender, int senderStageId) {
super(context, exchangeType, senderStageId);
Preconditions.checkState(!CollectionUtils.isEmpty(collationKeys), "Collation keys must be set");
_dataSchema = dataSchema;
_collationKeys = collationKeys;
_collationDirections = collationDirections;
+ _collationNullDirections = collationNullDirections;
_isSortOnSender = isSortOnSender;
}
@@ -110,7 +113,9 @@ public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
}
if (!_isSortedBlockConstructed && !_rows.isEmpty()) {
- _rows.sort(new SortUtils.SortComparator(_collationKeys, _collationDirections, _dataSchema, false, false));
+ _rows.sort(
+ new SortUtils.SortComparator(_collationKeys, _collationDirections, _collationNullDirections, _dataSchema,
+ false));
_isSortedBlockConstructed = true;
return new TransferableBlock(_rows, _dataSchema, DataBlock.Type.ROW);
} else {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
index 1d2c42a193..7827fcc1c0 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
@@ -20,13 +20,13 @@ package org.apache.pinot.query.runtime.operator.utils;
import java.util.Comparator;
import java.util.List;
-import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
public class SortUtils {
-
private SortUtils() {
}
@@ -34,44 +34,32 @@ public class SortUtils {
private final int _size;
private final int[] _valueIndices;
private final int[] _multipliers;
+ private final int[] _nullsMultipliers;
private final boolean[] _useDoubleComparison;
/**
- * Sort comparator for use with priority queues
+ * Sort comparator for use with priority queues.
+ *
* @param collationKeys collation keys to sort on
* @param collationDirections collation direction for each collation key to sort on
+ * @param collationNullDirections collation direction for NULL values in each collation key to sort on
* @param dataSchema data schema to use
- * @param isNullHandlingEnabled 'true' if null handling is enabled. Not supported yet
* @param switchDirections 'true' if the opposite sort direction should be used as what is specified
*/
- public SortComparator(List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
- DataSchema dataSchema, boolean isNullHandlingEnabled, boolean switchDirections) {
+ public SortComparator(List<RexExpression> collationKeys, List<Direction> collationDirections,
+ List<NullDirection> collationNullDirections, DataSchema dataSchema, boolean switchDirections) {
DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
_size = collationKeys.size();
_valueIndices = new int[_size];
_multipliers = new int[_size];
+ _nullsMultipliers = new int[_size];
_useDoubleComparison = new boolean[_size];
for (int i = 0; i < _size; i++) {
_valueIndices[i] = ((RexExpression.InputRef) collationKeys.get(i)).getIndex();
- _multipliers[i] = switchDirections ? (collationDirections.get(i).isDescending() ? 1 : -1)
- : (collationDirections.get(i).isDescending() ? -1 : 1);
- _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
- }
- }
-
- /**
- * Sort comparator for use with priority queues
- * @param dataSchema data schema to use
- */
- public SortComparator(DataSchema dataSchema) {
- DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
- _size = columnDataTypes.length;
- _valueIndices = new int[_size];
- _multipliers = new int[_size];
- _useDoubleComparison = new boolean[_size];
- for (int i = 0; i < _size; i++) {
- _valueIndices[i] = i;
- _multipliers[i] = 1;
+ int multiplier = collationDirections.get(i) == Direction.ASCENDING ? 1 : -1;
+ _multipliers[i] = switchDirections ? -multiplier : multiplier;
+ int nullsMultiplier = collationNullDirections.get(i) == NullDirection.LAST ? 1 : -1;
+ _nullsMultipliers[i] = switchDirections ? -nullsMultiplier : nullsMultiplier;
_useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
}
}
@@ -82,6 +70,15 @@ public class SortUtils {
int index = _valueIndices[i];
Object v1 = o1[index];
Object v2 = o2[index];
+ if (v1 == null) {
+ if (v2 == null) {
+ continue;
+ }
+ return _nullsMultipliers[i];
+ }
+ if (v2 == null) {
+ return -_nullsMultipliers[i];
+ }
int result;
if (_useDoubleComparison[i]) {
result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index f8dfe58e26..8b98fa517a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -72,8 +72,8 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
if (node.isSortOnReceiver()) {
SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
- node.getDataSchema(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(),
- node.getSenderStageId());
+ node.getDataSchema(), node.getCollationKeys(), node.getCollationDirections(),
+ node.getCollationNullDirections(), node.isSortOnSender(), node.getSenderStageId());
context.addReceivingMailboxIds(sortedMailboxReceiveOperator.getMailboxIds());
return sortedMailboxReceiveOperator;
} else {
@@ -167,7 +167,8 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator;
return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(),
- node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema(), isInputSorted);
+ node.getCollationDirections(), node.getCollationNullDirections(), node.getFetch(), node.getOffset(),
+ node.getDataSchema(), isInputSorted);
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index d712e524ab..a4bf0450ea 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -108,8 +108,7 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
pinotQuery.setExplain(false);
ServerPlanRequestContext context =
new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs,
- stagePlan.getServer(), stagePlan.getStageMetadata(), pinotQuery, tableType, timeBoundaryInfo,
- traceEnabled);
+ stagePlan.getServer(), stagePlan.getStageMetadata(), pinotQuery, tableType, timeBoundaryInfo, traceEnabled);
// visit the plan and create query physical plan.
ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
@@ -232,16 +231,15 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
@Override
public Void visitSort(SortNode node, ServerPlanRequestContext context) {
visitChildren(node, context);
+ PinotQuery pinotQuery = context.getPinotQuery();
if (node.getCollationKeys().size() > 0) {
- context.getPinotQuery().setOrderByList(
- CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(), node.getCollationDirections(),
- context.getPinotQuery()));
+ pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery));
}
if (node.getFetch() > 0) {
- context.getPinotQuery().setLimit(node.getFetch());
+ pinotQuery.setLimit(node.getFetch());
}
if (node.getOffset() > 0) {
- context.getPinotQuery().setOffset(node.getOffset());
+ pinotQuery.setOffset(node.getOffset());
}
return null;
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index ebdb23aa3e..b450d4b481 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
@@ -68,9 +69,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")));
@@ -87,9 +90,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
@@ -105,9 +110,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -123,9 +130,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -146,9 +155,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
- schema, true);
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, true);
// Purposefully setting input as unsorted order for validation but 'isInputSorted' should only be true if actually
// sorted
@@ -171,9 +182,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new DataSchema.ColumnDataType[]{INT, INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -194,9 +207,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{STRING});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -217,9 +232,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.DESCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -240,9 +257,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 1,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -263,9 +282,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1,
- schema, true);
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 1,
+ schema, true);
// Set input rows as sorted since input is expected to be sorted
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}))
@@ -287,9 +308,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 1, 1,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -309,9 +332,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1,
- schema, true);
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 1, 1,
+ schema, true);
// Set input rows as sorted since input is expected to be sorted
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}))
@@ -332,10 +357,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, false, 10,
- 1);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 0, 0,
+ schema, false, 10, 1);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -355,9 +381,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, -1, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, -1, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -376,9 +404,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
.thenReturn(block(schema, new Object[]{1}))
@@ -400,9 +430,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
- schema, true);
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, true);
// Set input rows as sorted since input is expected to be sorted
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}))
@@ -425,9 +457,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0, 1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST, NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -450,9 +484,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0, 1);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST, NullDirection.FIRST);
DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -475,9 +511,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{1}))
@@ -500,9 +538,11 @@ public class SortOperatorTest {
// Given:
List<RexExpression> collation = collation(0);
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
- SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
- schema, true);
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, true);
// Set input rows as sorted since input is expected to be sorted
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}))
@@ -521,6 +561,85 @@ public class SortOperatorTest {
Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
}
+ @Test
+ public void shouldHaveNullAtLast() {
+ // Given:
+ List<RexExpression> collation = collation(0);
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.LAST);
+ DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
+
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{null}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Assert.assertEquals(block.getNumRows(), 3);
+ Assert.assertEquals(block.getContainer().get(0), new Object[]{1});
+ Assert.assertEquals(block.getContainer().get(1), new Object[]{2});
+ Assert.assertEquals(block.getContainer().get(2), new Object[]{null});
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+ }
+
+ @Test
+ public void shouldHaveNullAtFirst() {
+ // Given:
+ List<RexExpression> collation = collation(0);
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.FIRST);
+ DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
+
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{null}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Assert.assertEquals(block.getNumRows(), 3);
+ Assert.assertEquals(block.getContainer().get(0), new Object[]{null});
+ Assert.assertEquals(block.getContainer().get(1), new Object[]{1});
+ Assert.assertEquals(block.getContainer().get(2), new Object[]{2});
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+ }
+
+ @Test
+ public void shouldHandleMultipleCollationKeysWithNulls() {
+ // Given:
+ List<RexExpression> collation = collation(0, 1);
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING);
+ List<NullDirection> nullDirections = ImmutableList.of(NullDirection.FIRST, NullDirection.LAST);
+ DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
+ SortOperator op =
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, nullDirections, 10, 0,
+ schema, false);
+
+ Mockito.when(_input.nextBlock())
+ .thenReturn(block(schema, new Object[]{1, 1}, new Object[]{1, null}, new Object[]{null, 1}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Assert.assertEquals(block.getNumRows(), 3);
+ Assert.assertEquals(block.getContainer().get(0), new Object[]{null, 1});
+ Assert.assertEquals(block.getContainer().get(1), new Object[]{1, 1});
+ Assert.assertEquals(block.getContainer().get(2), new Object[]{1, null});
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+ }
+
private static List<RexExpression> collation(int... indexes) {
return Arrays.stream(indexes).mapToObj(RexExpression.InputRef::new).collect(Collectors.toList());
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 7ed0c2469a..5bd973f4c2 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -26,7 +26,8 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
@@ -60,8 +61,8 @@ public class SortedMailboxReceiveOperatorTest {
private static final DataSchema DATA_SCHEMA =
new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
private static final List<RexExpression> COLLATION_KEYS = Collections.singletonList(new RexExpression.InputRef(0));
- private static final List<RelFieldCollation.Direction> COLLATION_DIRECTIONS =
- Collections.singletonList(RelFieldCollation.Direction.ASCENDING);
+ private static final List<Direction> COLLATION_DIRECTIONS = Collections.singletonList(Direction.ASCENDING);
+ private static final List<NullDirection> COLLATION_NULL_DIRECTIONS = Collections.singletonList(NullDirection.LAST);
private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 1, 1, 0, 0);
@@ -129,26 +130,25 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldThrowSingletonNoMatchMailboxServer() {
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
- StageMetadata stageMetadata = new StageMetadata.Builder()
- .setWorkerMetadataList(Stream.of(server1, server2).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList()))
- .build();
+ StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
+ Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
+ .collect(Collectors.toList())).build();
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
stageMetadata, false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
- COLLATION_DIRECTIONS, false, 1);
+ COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- null, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, null,
+ false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
- COLLATION_DIRECTIONS, false, 1);
+ COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Collation keys.*")
@@ -159,7 +159,7 @@ public class SortedMailboxReceiveOperatorTest {
_stageMetadata1, false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
- Collections.emptyList(), false, 1);
+ Collections.emptyList(), Collections.emptyList(), false, 1);
}
@Test
@@ -171,7 +171,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
_stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+ false, 1)) {
Thread.sleep(100L);
TransferableBlock mailbox = receiveOp.nextBlock();
assertTrue(mailbox.isErrorBlock());
@@ -183,7 +184,8 @@ public class SortedMailboxReceiveOperatorTest {
context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+ false, 1)) {
Thread.sleep(100L);
TransferableBlock mailbox = receiveOp.nextBlock();
assertFalse(mailbox.isErrorBlock());
@@ -197,7 +199,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
_stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+ false, 1)) {
assertTrue(receiveOp.nextBlock().isNoOpBlock());
}
}
@@ -210,7 +213,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
_stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+ false, 1)) {
assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
}
@@ -225,7 +229,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
_stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+ false, 1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
assertEquals(actualRows.size(), 1);
assertEquals(actualRows.get(0), row);
@@ -243,7 +248,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
_stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
+ false, 1)) {
TransferableBlock block = receiveOp.nextBlock();
assertTrue(block.isErrorBlock());
assertTrue(block.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains(errorMessage));
@@ -262,7 +268,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
_stageMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
+ COLLATION_NULL_DIRECTIONS, false, 1)) {
assertTrue(receiveOp.nextBlock().isNoOpBlock());
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
assertEquals(actualRows.size(), 1);
@@ -285,7 +292,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
_stageMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
+ COLLATION_NULL_DIRECTIONS, false, 1)) {
TransferableBlock block = receiveOp.nextBlock();
assertTrue(block.isErrorBlock());
assertTrue(block.getDataBlock().getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains(errorMessage));
@@ -310,7 +318,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
_stageMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, false, 1)) {
+ RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
+ COLLATION_NULL_DIRECTIONS, false, 1)) {
assertEquals(receiveOp.nextBlock().getContainer(), Arrays.asList(row5, row2, row4, row1, row3));
assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -321,8 +330,8 @@ public class SortedMailboxReceiveOperatorTest {
DataSchema dataSchema =
new DataSchema(new String[]{"col1", "col2", "col3"}, new DataSchema.ColumnDataType[]{INT, INT, STRING});
List<RexExpression> collationKeys = Arrays.asList(new RexExpression.InputRef(2), new RexExpression.InputRef(0));
- List<RelFieldCollation.Direction> collationDirection =
- Arrays.asList(RelFieldCollation.Direction.DESCENDING, RelFieldCollation.Direction.ASCENDING);
+ List<Direction> collationDirections = Arrays.asList(Direction.DESCENDING, Direction.ASCENDING);
+ List<NullDirection> collationNullDirections = Arrays.asList(NullDirection.FIRST, NullDirection.LAST);
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
Object[] row1 = new Object[]{3, 3, "queen"};
@@ -341,7 +350,8 @@ public class SortedMailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
_stageMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
- RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirection, false, 1)) {
+ RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections,
+ false, 1)) {
assertEquals(receiveOp.nextBlock().getContainer(), Arrays.asList(row1, row2, row3, row5, row4));
assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
diff --git a/pinot-query-runtime/src/test/resources/queries/NullHandling.json b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
index f51701317c..36fad242e6 100644
--- a/pinot-query-runtime/src/test/resources/queries/NullHandling.json
+++ b/pinot-query-runtime/src/test/resources/queries/NullHandling.json
@@ -45,6 +45,16 @@
{
"description": "LEFT JOIN and GROUP BY with AGGREGATE",
"sql": "SELECT {tbl1}.strCol2, COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1), MAX({tbl2}.doubleCol1), SUM({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 GROUP BY {tbl1}.strCol2"
+ },
+ {
+ "description": "LEFT JOIN and SORT (by default, H2 treats null as the smallest value, which is different from Postgres, thus we don't test the default ordering)",
+ "sql": "SELECT {tbl2}.doubleCol1 AS col FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 ORDER BY col NULLS FIRST",
+ "keepOutputRowOrder": true
+ },
+ {
+ "description": "LEFT JOIN and GROUP BY with AGGREGATE AND SORT",
+ "sql": "SELECT {tbl1}.strCol2, COUNT({tbl2}.intCol1), MIN({tbl2}.intCol1) AS minCol, MAX({tbl2}.doubleCol1) AS maxCol, SUM({tbl2}.doubleCol1) FROM {tbl1} LEFT OUTER JOIN {tbl2} ON {tbl1}.strCol1 = {tbl2}.strCol1 GROUP BY {tbl1}.strCol2 ORDER BY minCol DESC NULLS LAST, maxCol ASC NULLS LAST",
+ "keepOutputRowOrder": true
}
]
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org