You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/03/16 21:31:15 UTC
[pinot] branch master updated: Add a PinotLogicalSortExchange to replace usage of LogicalSortExchange as it will be sender and receiver aware (#10408)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bc9aa75585 Add a PinotLogicalSortExchange to replace usage of LogicalSortExchange as it will be sender and receiver aware (#10408)
bc9aa75585 is described below
commit bc9aa755859497aca3fcc0736c1b4db694f44127
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Thu Mar 16 14:31:06 2023 -0700
Add a PinotLogicalSortExchange to replace usage of LogicalSortExchange as it will be sender and receiver aware (#10408)
- MailboxSendOperator will be modified later to add sort support.
- Modify SortOperator to avoid sorting if the input is already sorted. It should still apply offset + limit
---
.../rel/logical/PinotLogicalSortExchange.java | 112 ++++++
.../rel/rules/PinotSortExchangeCopyRule.java | 4 +-
.../rel/rules/PinotSortExchangeNodeInsertRule.java | 11 +-
.../rules/PinotWindowExchangeNodeInsertRule.java | 17 +-
.../pinot/query/planner/logical/StagePlanner.java | 34 +-
.../query/planner/stage/MailboxReceiveNode.java | 46 +++
.../pinot/query/planner/stage/MailboxSendNode.java | 42 ++-
.../rel/rules/PinotSortExchangeCopyRuleTest.java | 89 ++++-
.../test/resources/queries/BasicQueryPlans.json | 4 +-
.../src/test/resources/queries/JoinPlans.json | 4 +-
.../src/test/resources/queries/OrderByPlans.json | 16 +-
.../resources/queries/WindowFunctionPlans.json | 176 ++++-----
.../apache/pinot/query/runtime/QueryRunner.java | 3 +-
.../runtime/operator/MailboxReceiveOperator.java | 77 +++-
.../runtime/operator/MailboxSendOperator.java | 20 +-
.../pinot/query/runtime/operator/SortOperator.java | 57 +--
.../query/runtime/operator/utils/SortUtils.java | 73 ++++
.../query/runtime/plan/PhysicalPlanVisitor.java | 10 +-
.../query/service/dispatch/QueryDispatcher.java | 4 +-
.../operator/MailboxReceiveOperatorTest.java | 395 +++++++++++++++++++--
.../runtime/operator/MailboxSendOperatorTest.java | 10 +-
.../query/runtime/operator/SortOperatorTest.java | 153 +++++++-
22 files changed, 1124 insertions(+), 233 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
new file mode 100644
index 0000000000..e019a680c1
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/logical/PinotLogicalSortExchange.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.SortExchange;
+
+
+/**
+ * Pinot's implementation of {@code SortExchange} which needs information about whether to sort on the sender
+ * and/or receiver side of the exchange. Every {@code Exchange} is broken into a send and a receive node and the
+ * decision on where to sort is made by the planner and this information has to b passed onto the send and receive
+ * nodes for the correct execution.
+ *
+ * Note: This class does not extend {@code LogicalSortExchange} because its constructor which takes the list of
+ * parameters is private.
+ */
+public class PinotLogicalSortExchange extends SortExchange {
+
+ protected final boolean _isSortOnSender;
+ protected final boolean _isSortOnReceiver;
+
+ private PinotLogicalSortExchange(RelOptCluster cluster, RelTraitSet traitSet,
+ RelNode input, RelDistribution distribution, RelCollation collation,
+ boolean isSortOnSender, boolean isSortOnReceiver) {
+ super(cluster, traitSet, input, distribution, collation);
+ _isSortOnSender = isSortOnSender;
+ _isSortOnReceiver = isSortOnReceiver;
+ }
+
+ /**
+ * Creates a PinotLogicalSortExchange by parsing serialized output.
+ */
+ public PinotLogicalSortExchange(RelInput input) {
+ super(input);
+ _isSortOnSender = false;
+ _isSortOnReceiver = true;
+ }
+
+ /**
+ * Creates a PinotLogicalSortExchange.
+ *
+ * @param input Input relational expression
+ * @param distribution Distribution specification
+ * @param collation array of sort specifications
+ * @param isSortOnSender whether to sort on the sender
+ * @param isSortOnReceiver whether to sort on receiver
+ */
+ public static PinotLogicalSortExchange create(
+ RelNode input,
+ RelDistribution distribution,
+ RelCollation collation,
+ boolean isSortOnSender,
+ boolean isSortOnReceiver) {
+ RelOptCluster cluster = input.getCluster();
+ collation = RelCollationTraitDef.INSTANCE.canonize(collation);
+ distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
+ RelTraitSet traitSet =
+ input.getTraitSet().replace(Convention.NONE).replace(distribution).replace(collation);
+ return new PinotLogicalSortExchange(cluster, traitSet, input, distribution,
+ collation, isSortOnSender, isSortOnReceiver);
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ @Override
+ public SortExchange copy(RelTraitSet traitSet, RelNode newInput,
+ RelDistribution newDistribution, RelCollation newCollation) {
+ return new PinotLogicalSortExchange(this.getCluster(), traitSet, newInput,
+ newDistribution, newCollation, _isSortOnSender, _isSortOnReceiver);
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .item("isSortOnSender", _isSortOnSender)
+ .item("isSortOnReceiver", _isSortOnReceiver);
+ }
+
+ public boolean isSortOnSender() {
+ return _isSortOnSender;
+ }
+
+ public boolean isSortOnReceiver() {
+ return _isSortOnReceiver;
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
index 3a15679fd7..4f93f78efd 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
@@ -26,7 +26,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.rel.metadata.RelMdUtil;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexBuilder;
@@ -99,7 +99,7 @@ public class PinotSortExchangeCopyRule extends RelRule<RelRule.Config> {
public interface Config extends RelRule.Config {
Config DEFAULT = ImmutableSortExchangeCopyRule.Config.of()
- .withOperandFor(LogicalSort.class, LogicalSortExchange.class);
+ .withOperandFor(LogicalSort.class, PinotLogicalSortExchange.class);
@Override default PinotSortExchangeCopyRule toRule() {
return new PinotSortExchangeCopyRule(this);
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
index f03b78dc3d..28a5018a08 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -24,7 +24,7 @@ import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.tools.RelBuilderFactory;
@@ -62,10 +62,15 @@ public class PinotSortExchangeNodeInsertRule extends RelOptRule {
@Override
public void onMatch(RelOptRuleCall call) {
Sort sort = call.rel(0);
- LogicalSortExchange exchange = LogicalSortExchange.create(
+ // TODO: Assess whether sorting is needed on both sender and receiver side or only receiver side. Potentially add
+ // SqlHint support to determine this. For now setting sort only on receiver side as sender side sorting is
+ // not yet implemented.
+ PinotLogicalSortExchange exchange = PinotLogicalSortExchange.create(
sort.getInput(),
RelDistributions.hash(Collections.emptyList()),
- sort.getCollation());
+ sort.getCollation(),
+ false,
+ true);
call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
index ebae6c0df9..e9283db8f0 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
@@ -29,8 +29,8 @@ import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalExchange;
-import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilderFactory;
@@ -86,8 +86,11 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
} else if (windowGroup.keys.isEmpty() && !windowGroup.orderKeys.getKeys().isEmpty()) {
// Only ORDER BY
// Add a LogicalSortExchange with collation on the order by key(s) and an empty hash partition key
- LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
- RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys);
+ // TODO: ORDER BY only type queries need to be sorted on both sender and receiver side for better performance.
+ // Sorted input data can use a k-way merge instead of a PriorityQueue for sorting. For now support to
+ // sort on the sender side is not available thus setting this up to only sort on the receiver.
+ PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput,
+ RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys, false, true);
call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
window.groups));
} else {
@@ -106,8 +109,12 @@ public class PinotWindowExchangeNodeInsertRule extends RelOptRule {
} else {
// PARTITION BY and ORDER BY on different key(s)
// Add a LogicalSortExchange hashed on the partition by keys and collation based on order by keys
- LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
- RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys);
+ // TODO: ORDER BY only type queries need to be sorted only on the receiver side unless a hint is set indicating
+ // that the data is already partitioned and sorting can be done on the sender side instead. This way
+ // sorting on the receiver side can be a no-op. Add support for this hint and pass it on. Until sender
+ // side sorting is implemented, setting this hint will throw an error on execution.
+ PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput,
+ RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys, false, true);
call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
window.groups));
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 1081459dd7..f8859e668f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -20,10 +20,13 @@ package org.apache.pinot.query.planner.logical;
import java.util.List;
import java.util.Map;
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.SortExchange;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.QueryPlan;
@@ -74,12 +77,12 @@ public class StagePlanner {
// global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
- 0, RelDistribution.Type.RANDOM_DISTRIBUTED, null);
+ 0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
globalSenderNode.addInput(globalStageRoot);
StageNode globalReceiverNode =
new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
- RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode);
+ RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false, false, globalSenderNode);
QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields, globalReceiverNode);
@@ -100,7 +103,19 @@ public class StagePlanner {
if (isExchangeNode(node)) {
StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
RelDistribution distribution = ((Exchange) node).getDistribution();
- return createSendReceivePair(nextStageRoot, distribution, currentStageId);
+ RelCollation collation = null;
+ boolean isSortOnSender = false;
+ boolean isSortOnReceiver = false;
+ if (isSortExchangeNode(node)) {
+ collation = ((SortExchange) node).getCollation();
+ if (node instanceof PinotLogicalSortExchange) {
+ // These flags only take meaning if the collation is not null or empty
+ isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
+ isSortOnReceiver = ((PinotLogicalSortExchange) node).isSortOnReceiver();
+ }
+ }
+ return createSendReceivePair(nextStageRoot, distribution, collation, isSortOnSender, isSortOnReceiver,
+ currentStageId);
} else {
StageNode stageNode = RelToStageConverter.toStageNode(node, currentStageId);
List<RelNode> inputs = node.getInputs();
@@ -118,7 +133,8 @@ public class StagePlanner {
}
}
- private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, int currentStageId) {
+ private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, RelCollation collation,
+ boolean isSortOnSender, boolean isSortOnReceiver, int currentStageId) {
List<Integer> distributionKeys = distribution.getKeys();
RelDistribution.Type exchangeType = distribution.getType();
@@ -129,9 +145,11 @@ public class StagePlanner {
? new FieldSelectionKeySelector(distributionKeys) : null;
StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
- currentStageId, exchangeType, keySelector);
+ currentStageId, exchangeType, keySelector, collation == null ? null : collation.getFieldCollations(),
+ isSortOnSender);
StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
- nextStageRoot.getStageId(), exchangeType, keySelector, mailboxSender);
+ nextStageRoot.getStageId(), exchangeType, keySelector,
+ collation == null ? null : collation.getFieldCollations(), isSortOnSender, isSortOnReceiver, mailboxSender);
mailboxSender.addInput(nextStageRoot);
return mailboxReceiver;
@@ -141,6 +159,10 @@ public class StagePlanner {
return (node instanceof Exchange);
}
+ private boolean isSortExchangeNode(RelNode node) {
+ return (node instanceof SortExchange);
+ }
+
private int getNewStageId() {
return _stageIdCounter++;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
index cf90a0005c..97a019741f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
@@ -18,9 +18,16 @@
*/
package org.apache.pinot.query.planner.stage;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -32,6 +39,14 @@ public class MailboxReceiveNode extends AbstractStageNode {
private RelDistribution.Type _exchangeType;
@ProtoProperties
private KeySelector<Object[], Object[]> _partitionKeySelector;
+ @ProtoProperties
+ private List<RexExpression> _collationKeys;
+ @ProtoProperties
+ private List<RelFieldCollation.Direction> _collationDirections;
+ @ProtoProperties
+ private boolean _isSortOnSender;
+ @ProtoProperties
+ private boolean _isSortOnReceiver;
// this is only available during planning and should not be relied
// on in any post-serialization code
@@ -43,11 +58,26 @@ public class MailboxReceiveNode extends AbstractStageNode {
public MailboxReceiveNode(int stageId, DataSchema dataSchema, int senderStageId,
RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
+ @Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver,
StageNode sender) {
super(stageId, dataSchema);
_senderStageId = senderStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
+ if (!CollectionUtils.isEmpty(fieldCollations)) {
+ _collationKeys = new ArrayList<>(fieldCollations.size());
+ _collationDirections = new ArrayList<>(fieldCollations.size());
+ for (RelFieldCollation fieldCollation : fieldCollations) {
+ _collationDirections.add(fieldCollation.getDirection());
+ _collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+ }
+ } else {
+ _collationKeys = Collections.emptyList();
+ _collationDirections = Collections.emptyList();
+ }
+ _isSortOnSender = isSortOnSender;
+ Preconditions.checkState(!isSortOnSender, "Input shouldn't be sorted as ordering on send is not yet implemented!");
+ _isSortOnReceiver = isSortOnReceiver;
_sender = sender;
}
@@ -67,6 +97,22 @@ public class MailboxReceiveNode extends AbstractStageNode {
return _partitionKeySelector;
}
+ public List<RexExpression> getCollationKeys() {
+ return _collationKeys;
+ }
+
+ public List<RelFieldCollation.Direction> getCollationDirections() {
+ return _collationDirections;
+ }
+
+ public boolean isSortOnSender() {
+ return _isSortOnSender;
+ }
+
+ public boolean isSortOnReceiver() {
+ return _isSortOnReceiver;
+ }
+
public StageNode getSender() {
return _sender;
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index 4219590100..c98b82907a 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -18,9 +18,16 @@
*/
package org.apache.pinot.query.planner.stage;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -32,17 +39,38 @@ public class MailboxSendNode extends AbstractStageNode {
private RelDistribution.Type _exchangeType;
@ProtoProperties
private KeySelector<Object[], Object[]> _partitionKeySelector;
+ @ProtoProperties
+ private List<RexExpression> _collationKeys;
+ @ProtoProperties
+ private List<RelFieldCollation.Direction> _collationDirections;
+ @ProtoProperties
+ private boolean _isSortOnSender;
public MailboxSendNode(int stageId) {
super(stageId);
}
public MailboxSendNode(int stageId, DataSchema dataSchema, int receiverStageId,
- RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector) {
+ RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
+ @Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender) {
super(stageId, dataSchema);
_receiverStageId = receiverStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
+ // TODO: Support ordering here if the 'fieldCollations' aren't empty and 'sortOnSender' is true
+ Preconditions.checkState(!isSortOnSender, "Ordering is not yet supported on Mailbox Send");
+ if (!CollectionUtils.isEmpty(fieldCollations) && isSortOnSender) {
+ _collationKeys = new ArrayList<>(fieldCollations.size());
+ _collationDirections = new ArrayList<>(fieldCollations.size());
+ for (RelFieldCollation fieldCollation : fieldCollations) {
+ _collationDirections.add(fieldCollation.getDirection());
+ _collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
+ }
+ } else {
+ _collationKeys = Collections.emptyList();
+ _collationDirections = Collections.emptyList();
+ }
+ _isSortOnSender = isSortOnSender;
}
public int getReceiverStageId() {
@@ -61,6 +89,18 @@ public class MailboxSendNode extends AbstractStageNode {
return _partitionKeySelector;
}
+ public List<RexExpression> getCollationKeys() {
+ return _collationKeys;
+ }
+
+ public List<RelFieldCollation.Direction> getCollationDirections() {
+ return _collationDirections;
+ }
+
+ public boolean isSortOnSender() {
+ return _isSortOnSender;
+ }
+
@Override
public String explain() {
return "MAIL_SEND(" + _exchangeType + ")";
diff --git a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
index b92fd79352..443998833c 100644
--- a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
+++ b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
@@ -29,7 +29,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.logical.LogicalSortExchange;
+import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
@@ -81,7 +81,8 @@ public class PinotSortExchangeCopyRuleTest {
@Test
public void shouldMatchLimitNoOffsetNoSort() {
// Given:
- SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY,
+ false, false);
Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, null, literal(1));
Mockito.when(_call.rel(0)).thenReturn(sort);
Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -95,7 +96,7 @@ public class PinotSortExchangeCopyRuleTest {
RelNode sortCopy = sortCopyCapture.getValue();
Assert.assertTrue(sortCopy instanceof LogicalSort);
- Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -104,11 +105,68 @@ public class PinotSortExchangeCopyRuleTest {
Assert.assertEquals((innerSort).fetch, literal(1));
}
+ @Test
+ public void shouldMatchLimitNoOffsetYesSortNoSortEnabled() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation,
+ false, false);
+ Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation().getKeys().size(), 1);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(1));
+ }
+
+ @Test
+ public void shouldMatchLimitNoOffsetYesSortOnSender() {
+ // Given:
+ RelCollation collation = RelCollations.of(1);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation,
+ true, false);
+ Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
+ Mockito.when(_call.rel(0)).thenReturn(sort);
+ Mockito.when(_call.rel(1)).thenReturn(exchange);
+
+ // When:
+ PinotSortExchangeCopyRule.SORT_EXCHANGE_COPY.onMatch(_call);
+
+ // Then:
+ ArgumentCaptor<RelNode> sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class);
+ Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap());
+
+ RelNode sortCopy = sortCopyCapture.getValue();
+ Assert.assertTrue(sortCopy instanceof LogicalSort);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
+
+ LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
+ Assert.assertEquals(innerSort.getCollation().getKeys().size(), 1);
+ Assert.assertNull((innerSort).offset);
+ Assert.assertEquals((innerSort).fetch, literal(1));
+ }
+
@Test
public void shouldMatchLimitNoOffsetYesSort() {
// Given:
RelCollation collation = RelCollations.of(1);
- SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation, false,
+ true);
Sort sort = LogicalSort.create(exchange, collation, null, literal(1));
Mockito.when(_call.rel(0)).thenReturn(sort);
Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -122,7 +180,7 @@ public class PinotSortExchangeCopyRuleTest {
RelNode sortCopy = sortCopyCapture.getValue();
Assert.assertTrue(sortCopy instanceof LogicalSort);
- Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -134,7 +192,8 @@ public class PinotSortExchangeCopyRuleTest {
@Test
public void shouldMatchNoSortAndPushDownLimitPlusOffset() {
// Given:
- SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY,
+ false, true);
Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(2), literal(1));
Mockito.when(_call.rel(0)).thenReturn(sort);
Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -148,7 +207,7 @@ public class PinotSortExchangeCopyRuleTest {
RelNode sortCopy = sortCopyCapture.getValue();
Assert.assertTrue(sortCopy instanceof LogicalSort);
- Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -161,7 +220,8 @@ public class PinotSortExchangeCopyRuleTest {
public void shouldMatchSortOnly() {
// Given:
RelCollation collation = RelCollations.of(1);
- SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation, false,
+ true);
Sort sort = LogicalSort.create(exchange, collation, null, null);
Mockito.when(_call.rel(0)).thenReturn(sort);
Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -175,7 +235,7 @@ public class PinotSortExchangeCopyRuleTest {
RelNode sortCopy = sortCopyCapture.getValue();
Assert.assertTrue(sortCopy instanceof LogicalSort);
- Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -188,7 +248,8 @@ public class PinotSortExchangeCopyRuleTest {
public void shouldMatchLimitOffsetAndSort() {
// Given:
RelCollation collation = RelCollations.of(1);
- SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation, false,
+ true);
Sort sort = LogicalSort.create(exchange, collation, literal(1), literal(2));
Mockito.when(_call.rel(0)).thenReturn(sort);
Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -202,7 +263,7 @@ public class PinotSortExchangeCopyRuleTest {
RelNode sortCopy = sortCopyCapture.getValue();
Assert.assertTrue(sortCopy instanceof LogicalSort);
- Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof LogicalSortExchange);
+ Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof PinotLogicalSortExchange);
Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) instanceof LogicalSort);
LogicalSort innerSort = (LogicalSort) ((LogicalSort) sortCopy).getInput().getInput(0);
@@ -215,7 +276,8 @@ public class PinotSortExchangeCopyRuleTest {
public void shouldNotMatchOnlySortAlreadySorted() {
// Given:
RelCollation collation = RelCollations.of(1);
- SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, collation, false,
+ true);
Sort sort = LogicalSort.create(exchange, collation, null, null);
Mockito.when(_call.rel(0)).thenReturn(sort);
Mockito.when(_call.rel(1)).thenReturn(exchange);
@@ -231,7 +293,8 @@ public class PinotSortExchangeCopyRuleTest {
@Test
public void shouldNotMatchOffsetNoLimitNoSort() {
// Given:
- SortExchange exchange = LogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY);
+ SortExchange exchange = PinotLogicalSortExchange.create(_input, RelDistributions.SINGLETON, RelCollations.EMPTY,
+ false, true);
Sort sort = LogicalSort.create(exchange, RelCollations.EMPTY, literal(1), null);
Mockito.when(_call.rel(0)).thenReturn(sort);
Mockito.when(_call.rel(1)).thenReturn(exchange);
diff --git a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
index 13d505b3cb..749db3748c 100644
--- a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
@@ -39,7 +39,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[dateTrunc('DAY', $4)])",
"\n LogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -52,7 +52,7 @@
"Execution Plan",
"\nLogicalProject(day=[dateTrunc('DAY', $4)])",
"\n LogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalTableScan(table=[[a]])",
"\n"
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index ab57467c57..e8e1631c62 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -8,7 +8,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$0], ts=[$1], col3=[$3])",
"\n LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
"\n LogicalExchange(distribution=[hash[0]])",
@@ -27,7 +27,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$0], ts1=[$1], col3=[$3])",
"\n LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
"\n LogicalExchange(distribution=[hash[0]])",
diff --git a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
index 0b7f33e026..e0787faa3e 100644
--- a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
@@ -8,7 +8,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$3])",
"\n LogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], dir0=[ASC])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -21,7 +21,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$3])",
"\n LogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], dir0=[ASC])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -33,7 +33,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], dir0=[ASC], fetch=[10])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -45,7 +45,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], sort1=[$1], dir0=[ASC], dir1=[DESC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3, 1 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3, 1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], sort1=[$1], dir0=[ASC], dir1=[DESC], fetch=[10])",
"\n LogicalTableScan(table=[[b]])",
"\n"
@@ -57,7 +57,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash[0]])",
@@ -72,7 +72,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash[0]])",
@@ -87,7 +87,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash[0]])",
@@ -102,7 +102,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
"\n LogicalExchange(distribution=[hash[0]])",
diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index 4516ae1aa2..49312c481b 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -61,7 +61,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], col2=[$0])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1)])])",
@@ -77,7 +77,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$1], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0)])])",
@@ -93,7 +93,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[20])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[20])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], col2=[$0])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1)])])",
@@ -211,7 +211,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], col3=[$0])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
@@ -284,7 +284,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
@@ -300,7 +300,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[100])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[100])",
"\n LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
@@ -316,7 +316,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1), COUNT($0)])])",
@@ -434,7 +434,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$2], col3=[$0])",
"\n LogicalWindow(window#0=[window(aggs [SUM($0), COUNT($0)])])",
@@ -520,7 +520,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
"\n LogicalWindow(window#0=[window(partition {2} aggs [MIN($1)])])",
@@ -536,7 +536,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1)])])",
@@ -552,7 +552,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
"\n LogicalWindow(window#0=[window(partition {2} aggs [MIN($1)])])",
@@ -683,7 +683,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], col3=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0)])])",
@@ -769,7 +769,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), MAX($1)])])",
@@ -785,7 +785,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {1} aggs [SUM($0), COUNT($0), MIN($0)])])",
@@ -801,7 +801,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), MAX($1)])])",
@@ -817,7 +817,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), MAX($1)])])",
@@ -833,7 +833,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), COUNT($1)])])",
@@ -849,7 +849,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($4):DOUBLE NOT NULL, $5)])",
"\n LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), SUM($1), COUNT($1)])])",
@@ -865,7 +865,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], dir0=[ASC])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
"\n LogicalWindow(window#0=[window(partition {0, 2} aggs [SUM($1), COUNT($1)])])",
@@ -996,7 +996,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], dir0=[ASC])",
"\n LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4], col3=[$0])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), COUNT($0), MAX($0)])])",
@@ -1016,7 +1016,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1030,7 +1030,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1043,7 +1043,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1056,7 +1056,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$2], avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1069,7 +1069,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [MAX($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1081,11 +1081,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
"\n LogicalWindow(window#0=[window(order by [2 DESC] aggs [MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1097,11 +1097,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1113,11 +1113,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
"\n LogicalWindow(window#0=[window(order by [2 DESC] aggs [MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1130,7 +1130,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], $2=[SUBSTR($3, 0, 2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1143,7 +1143,7 @@
"Execution Plan",
"\nLogicalProject(col2=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1157,7 +1157,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], $2=[CONCAT($3, '-', $1)])",
"\n LogicalFilter(condition=[OR(AND(<>($3, 'bar'), <>($3, 'foo')), >=($2, 42))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1171,7 +1171,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($2):DOUBLE NOT NULL, $3)])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], $1=[CONCAT($3, '-', $1)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1184,7 +1184,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(order by [2] aggs [MAX($1), COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1198,7 +1198,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(order by [2] aggs [MAX($1), COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1211,7 +1211,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1224,7 +1224,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$1], avg=[/(CAST($2):DOUBLE NOT NULL, $3)], min=[$4])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1237,7 +1237,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [COUNT($1), MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1249,11 +1249,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[DESC])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [0, 2 DESC] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1265,11 +1265,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1281,11 +1281,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [0, 2 DESC] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0, 2 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1298,7 +1298,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[REVERSE($2)], EXPR$1=[$3], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), MAX($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1311,7 +1311,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [SUM($0), COUNT($0), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], col1=[$3])",
"\n LogicalFilter(condition=[AND(>($2, 42), OR(=($3, 'chewbacca':VARCHAR(9)), =($3, 'vader':VARCHAR(9)), =($3, 'yoda':VARCHAR(9))))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1325,7 +1325,7 @@
"Execution Plan",
"\nLogicalProject($0=[$2], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [MIN($0), MAX($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], col1=[$3], $2=[REVERSE(CONCAT($3, ' ', $1))])",
"\n LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'baz'), <>($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1339,7 +1339,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$1=[$5])",
"\n LogicalWindow(window#0=[window(order by [2] aggs [SUM($0), COUNT($0), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], col1=[$3], $2=[REVERSE(CONCAT($3, '-', $1))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1417,7 +1417,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
"\n LogicalWindow(window#0=[window(partition {2} order by [2] aggs [MIN($1)])])",
@@ -1433,7 +1433,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1)])])",
@@ -1449,7 +1449,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
"\n LogicalWindow(window#0=[window(partition {2} order by [2] aggs [MIN($1)])])",
@@ -1624,7 +1624,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3, 0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
"\n LogicalWindow(window#0=[window(partition {0, 2} order by [0, 2] aggs [SUM($1), COUNT($1)])])",
@@ -1640,7 +1640,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($0), COUNT($0), MIN($0)])])",
@@ -1656,7 +1656,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3, 0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], fetch=[10])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
"\n LogicalWindow(window#0=[window(partition {0, 2} order by [0, 2] aggs [SUM($1), COUNT($1)])])",
@@ -1766,7 +1766,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1779,7 +1779,7 @@
"Execution Plan",
"\nLogicalProject(avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1792,7 +1792,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1805,7 +1805,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$2], avg=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1818,7 +1818,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MAX($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1830,11 +1830,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC])",
"\n LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1846,11 +1846,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1862,11 +1862,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
"\n LogicalProject(col2=[$0], EXPR$1=[$3], col1=[$2])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1879,7 +1879,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(partition {1} order by [2] aggs [COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash[1]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3], $3=[SUBSTR($3, 0, 2)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1892,7 +1892,7 @@
"Execution Plan",
"\nLogicalProject(col2=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1906,7 +1906,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[$3], EXPR$1=[/(CAST($4):DOUBLE NOT NULL, $5)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3], $3=[CONCAT($3, '-', $1)])",
"\n LogicalFilter(condition=[OR(AND(<>($3, 'bar'), <>($3, 'foo')), >=($2, 42))])",
"\n LogicalTableScan(table=[[a]])",
@@ -1920,7 +1920,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {2} order by [1] aggs [SUM($0), COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], $1=[REVERSE($1)], $2=[CONCAT($3, '-', $1)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1933,7 +1933,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(partition {2} order by [1] aggs [MAX($1), COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1947,7 +1947,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(partition {2} order by [1] aggs [MAX($1), COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1960,7 +1960,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1973,7 +1973,7 @@
"Execution Plan",
"\nLogicalProject(value1=[$2], avg=[/(CAST($3):DOUBLE NOT NULL, $4)], min=[$5])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1986,7 +1986,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [COUNT($1), MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -1998,11 +1998,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], offset=[0])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
"\n LogicalWindow(window#0=[window(partition {0, 2} order by [1, 2] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2014,11 +2014,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2030,11 +2030,11 @@
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], offset=[0], fetch=[10])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], fetch=[10])",
"\n LogicalProject(col1=[$2], EXPR$1=[$3], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$0])",
"\n LogicalWindow(window#0=[window(partition {0, 2} order by [1, 2] aggs [SUM($1), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0, 2]], collation=[[1, 2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2047,7 +2047,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4], $2=[$5])",
"\n LogicalWindow(window#0=[window(partition {0} order by [2] aggs [SUM($1), MAX($1)])])",
- "\n LogicalSortExchange(distribution=[hash[0]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3], $3=[REVERSE($3)])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2060,7 +2060,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$2], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [SUM($1), COUNT($1), COUNT($2)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3])",
"\n LogicalFilter(condition=[AND(>($2, 42), OR(=($3, 'chewbacca':VARCHAR(9)), =($3, 'vader':VARCHAR(9)), =($3, 'yoda':VARCHAR(9))))])",
"\n LogicalTableScan(table=[[a]])",
@@ -2074,7 +2074,7 @@
"Execution Plan",
"\nLogicalProject($0=[$3], $1=[$4], $2=[$5])",
"\n LogicalWindow(window#0=[window(partition {2} order by [0] aggs [MIN($1), MAX($1)])])",
- "\n LogicalSortExchange(distribution=[hash[2]], collation=[[0]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[2]], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2], col1=[$3], $3=[REVERSE(CONCAT($3, ' ', $1))])",
"\n LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'baz'), <>($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
@@ -2088,7 +2088,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[/(CAST($4):DOUBLE NOT NULL, $5)], EXPR$1=[$6])",
"\n LogicalWindow(window#0=[window(partition {3} order by [2] aggs [SUM($0), COUNT($0), COUNT($1)])])",
- "\n LogicalSortExchange(distribution=[hash[3]], collation=[[2]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[3]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col3=[$2], col1=[$3], $2=[CONCAT($3, '-', $1)], $3=[REVERSE(CONCAT($3, '-', $1))])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -2120,7 +2120,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$1], EXPR$1=[$2], EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [2 DESC, 1] aggs [SUM($0), COUNT($0)])])",
- "\n LogicalSortExchange(distribution=[hash], collation=[[2 DESC, 1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[2 DESC, 1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
"\n LogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{2, 3}], EXPR$1=[COUNT()])",
@@ -2135,7 +2135,7 @@
"Execution Plan",
"\nLogicalProject(col1=[$1], EXPR$1=[$2], $2=[$3])",
"\n LogicalWindow(window#0=[window(partition {1} order by [2 DESC, 1] aggs [MAX($0)])])",
- "\n LogicalSortExchange(distribution=[hash[1]], collation=[[2 DESC, 1]])",
+ "\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2 DESC, 1]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
"\n LogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalAggregate(group=[{2, 3}], EXPR$1=[COUNT()])",
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 9436ec4bf0..b0d0433f42 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -215,7 +215,8 @@ public class QueryRunner {
deadlineMs, distributedStagePlan.getMetadataMap());
mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext,
new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema()),
- sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), sendNode.getStageId(),
+ sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(),
+ sendNode.getCollationDirections(), sendNode.isSortOnSender(), sendNode.getStageId(),
sendNode.getReceiverStageId());
int blockCounter = 0;
while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 520392985e..fe9782c4c6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -23,19 +23,27 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.PriorityQueue;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.utils.SortUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.service.QueryConfig;
import org.slf4j.Logger;
@@ -50,6 +58,9 @@ import org.slf4j.LoggerFactory;
* We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
* When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
* When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
+ *
+ * TODO: Once sorting on the {@code MailboxSendOperator} is available, modify this to use a k-way merge instead of
+ * resorting via the PriorityQueue.
*/
public class MailboxReceiveOperator extends MultiStageOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
@@ -62,10 +73,17 @@ public class MailboxReceiveOperator extends MultiStageOperator {
private final MailboxService<TransferableBlock> _mailboxService;
private final RelDistribution.Type _exchangeType;
+ private final List<RexExpression> _collationKeys;
+ private final List<RelFieldCollation.Direction> _collationDirections;
+ private final boolean _isSortOnSender;
+ private final boolean _isSortOnReceiver;
+ private final DataSchema _dataSchema;
private final List<MailboxIdentifier> _sendingMailbox;
private final long _deadlineTimestampNano;
+ private final PriorityQueue<Object[]> _priorityQueue;
private int _serverIdx;
private TransferableBlock _upstreamErrorBlock;
+ private boolean _isSortedBlockConstructed;
private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
int receiverStageId, VirtualServerAddress receiver) {
@@ -77,16 +95,20 @@ public class MailboxReceiveOperator extends MultiStageOperator {
receiverStageId);
}
- public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, int senderStageId,
- int receiverStageId) {
- this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId,
+ public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
+ List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender,
+ boolean isSortOnReceiver, DataSchema dataSchema, int senderStageId, int receiverStageId) {
+ this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, collationKeys,
+ collationDirections, isSortOnSender, isSortOnReceiver, dataSchema, senderStageId,
receiverStageId, context.getTimeoutMs());
}
// TODO: Move deadlineInNanoSeconds to OperatorContext.
- //TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context.
+ // TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context.
public MailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
- RelDistribution.Type exchangeType, int senderStageId, int receiverStageId, Long timeoutMs) {
+ RelDistribution.Type exchangeType, List<RexExpression> collationKeys,
+ List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender, boolean isSortOnReceiver,
+ DataSchema dataSchema, int senderStageId, int receiverStageId, Long timeoutMs) {
super(context);
_mailboxService = context.getMailboxService();
VirtualServerAddress receiver = context.getServer();
@@ -121,8 +143,20 @@ public class MailboxReceiveOperator extends MultiStageOperator {
_sendingMailbox.add(toMailboxId(instance, jobId, senderStageId, receiverStageId, receiver));
}
}
+ _collationKeys = collationKeys;
+ _collationDirections = collationDirections;
+ _isSortOnSender = isSortOnSender;
+ _isSortOnReceiver = isSortOnReceiver;
+ _dataSchema = dataSchema;
+ if (CollectionUtils.isEmpty(collationKeys) || !_isSortOnReceiver) {
+ _priorityQueue = null;
+ } else {
+ _priorityQueue = new PriorityQueue<>(new SortUtils.SortComparator(collationKeys, collationDirections,
+ dataSchema, false));
+ }
_upstreamErrorBlock = null;
_serverIdx = 0;
+ _isSortedBlockConstructed = false;
}
public List<MailboxIdentifier> getSendingMailbox() {
@@ -143,6 +177,7 @@ public class MailboxReceiveOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
if (_upstreamErrorBlock != null) {
+ cleanUpResourcesOnError();
return _upstreamErrorBlock;
} else if (System.nanoTime() >= _deadlineTimestampNano) {
return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
@@ -165,12 +200,20 @@ public class MailboxReceiveOperator extends MultiStageOperator {
// Get null block when pulling times out from mailbox.
if (block != null) {
if (block.isErrorBlock()) {
+ cleanUpResourcesOnError();
_upstreamErrorBlock =
TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
return _upstreamErrorBlock;
}
if (!block.isEndOfStreamBlock()) {
- return block;
+ if (_priorityQueue != null) {
+ // Ordering is enabled, add rows to the PriorityQueue
+ List<Object[]> container = block.getContainer();
+ _priorityQueue.addAll(container);
+ } else {
+ // Ordering is not enabled, return the input block as is
+ return block;
+ }
} else {
if (!block.getResultMetadata().isEmpty()) {
_operatorStatsMap.putAll(block.getResultMetadata());
@@ -185,6 +228,18 @@ public class MailboxReceiveOperator extends MultiStageOperator {
}
}
+ if (((openMailboxCount == 0) || (openMailboxCount <= eosMailboxCount))
+ && (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) {
+ // Some data is present in the PriorityQueue, these need to be sent upstream
+ LinkedList<Object[]> rows = new LinkedList<>();
+ while (_priorityQueue.size() > 0) {
+ Object[] row = _priorityQueue.poll();
+ rows.addFirst(row);
+ }
+ _isSortedBlockConstructed = true;
+ return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
+ }
+
// there are two conditions in which we should return EOS: (1) there were
// no mailboxes to open (this shouldn't happen because the second condition
// should be hit first, but is defensive) (2) every mailbox that was opened
@@ -196,6 +251,16 @@ public class MailboxReceiveOperator extends MultiStageOperator {
return block;
}
+ private void cleanUpResourcesOnError() {
+ if (_priorityQueue != null) {
+ _priorityQueue.clear();
+ }
+ }
+
+ public boolean hasCollationKeys() {
+ return !CollectionUtils.isEmpty(_collationKeys);
+ }
+
@Override
public void close() {
super.close();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 940e668cdc..3be593a70a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -28,9 +28,11 @@ import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
/**
* This {@code MailboxSendOperator} is created to send {@link TransferableBlock}s to the receiving end.
+ *
+ * TODO: Add support to sort the data prior to sending if sorting is enabled
*/
public class MailboxSendOperator extends MultiStageOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
@@ -56,6 +60,9 @@ public class MailboxSendOperator extends MultiStageOperator {
private final MultiStageOperator _dataTableBlockBaseOperator;
private final BlockExchange _exchange;
+ private final List<RexExpression> _collationKeys;
+ private final List<RelFieldCollation.Direction> _collationDirections;
+ private final boolean _isSortOnSender;
@VisibleForTesting
interface BlockExchangeFactory {
@@ -70,16 +77,19 @@ public class MailboxSendOperator extends MultiStageOperator {
}
public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator dataTableBlockBaseOperator,
- RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, int senderStageId,
+ RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, List<RexExpression> collationKeys,
+ List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender, int senderStageId,
int receiverStageId) {
- this(context, dataTableBlockBaseOperator, exchangeType, keySelector,
+ this(context, dataTableBlockBaseOperator, exchangeType, keySelector, collationKeys, collationDirections,
+ isSortOnSender,
(server) -> toMailboxId(server, context.getRequestId(), senderStageId, receiverStageId, context.getServer()),
BlockExchange::getExchange, receiverStageId);
}
@VisibleForTesting
MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator dataTableBlockBaseOperator,
- RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
+ RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, List<RexExpression> collationKeys,
+ List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender,
MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, int receiverStageId) {
super(context);
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -116,6 +126,10 @@ public class MailboxSendOperator extends MultiStageOperator {
blockExchangeFactory.build(context.getMailboxService(), receivingMailboxes, exchangeType, keySelector, splitter,
context.getDeadlineMs());
+ _collationKeys = collationKeys;
+ _collationDirections = collationDirections;
+ _isSortOnSender = isSortOnSender;
+
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
String.format("Exchange type '%s' is not supported yet", exchangeType));
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 9eeea863a9..7668dc0397 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
@@ -33,6 +32,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.utils.SortUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,15 +56,15 @@ public class SortOperator extends MultiStageOperator {
public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator,
List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, int fetch, int offset,
- DataSchema dataSchema) {
- this(context, upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema,
+ DataSchema dataSchema, boolean isInputSorted) {
+ this(context, upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, isInputSorted,
SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
}
@VisibleForTesting
SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
- int defaultHolderCapacity) {
+ boolean isInputSorted, int defaultHolderCapacity) {
super(context);
_upstreamOperator = upstreamOperator;
_fetch = fetch;
@@ -73,13 +73,15 @@ public class SortOperator extends MultiStageOperator {
_upstreamErrorBlock = null;
_isSortedBlockConstructed = false;
_numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultHolderCapacity;
- // When there's no collationKeys, the SortOperator is a simple selection with row trim on limit & offset
- if (collationKeys.isEmpty()) {
+ // Under the following circumstances, the SortOperator is a simple selection with row trim on limit & offset:
+ // - There are no collationKeys
+ // - 'isInputSorted' is set to true indicating that the data was already sorted
+ if (collationKeys.isEmpty() || isInputSorted) {
_priorityQueue = null;
_rows = new ArrayList<>();
} else {
_priorityQueue = new PriorityQueue<>(_numRowsToKeep,
- new SortComparator(collationKeys, collationDirections, dataSchema, false));
+ new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false));
_rows = null;
}
}
@@ -174,45 +176,4 @@ public class SortOperator extends MultiStageOperator {
}
}
}
-
- private static class SortComparator implements Comparator<Object[]> {
- private final int _size;
- private final int[] _valueIndices;
- private final int[] _multipliers;
- private final boolean[] _useDoubleComparison;
-
- public SortComparator(List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
- DataSchema dataSchema, boolean isNullHandlingEnabled) {
- DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
- _size = collationKeys.size();
- _valueIndices = new int[_size];
- _multipliers = new int[_size];
- _useDoubleComparison = new boolean[_size];
- for (int i = 0; i < _size; i++) {
- _valueIndices[i] = ((RexExpression.InputRef) collationKeys.get(i)).getIndex();
- _multipliers[i] = collationDirections.get(i).isDescending() ? 1 : -1;
- _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
- }
- }
-
- @Override
- public int compare(Object[] o1, Object[] o2) {
- for (int i = 0; i < _size; i++) {
- int index = _valueIndices[i];
- Object v1 = o1[index];
- Object v2 = o2[index];
- int result;
- if (_useDoubleComparison[i]) {
- result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
- } else {
- //noinspection unchecked
- result = ((Comparable) v1).compareTo(v2);
- }
- if (result != 0) {
- return result * _multipliers[i];
- }
- }
- return 0;
- }
- }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
new file mode 100644
index 0000000000..820402f360
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.utils;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.logical.RexExpression;
+
+
+public class SortUtils {
+
+ private SortUtils() {
+ }
+
+ public static class SortComparator implements Comparator<Object[]> {
+ private final int _size;
+ private final int[] _valueIndices;
+ private final int[] _multipliers;
+ private final boolean[] _useDoubleComparison;
+
+ public SortComparator(List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
+ DataSchema dataSchema, boolean isNullHandlingEnabled) {
+ DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ _size = collationKeys.size();
+ _valueIndices = new int[_size];
+ _multipliers = new int[_size];
+ _useDoubleComparison = new boolean[_size];
+ for (int i = 0; i < _size; i++) {
+ _valueIndices[i] = ((RexExpression.InputRef) collationKeys.get(i)).getIndex();
+ _multipliers[i] = collationDirections.get(i).isDescending() ? 1 : -1;
+ _useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
+ }
+ }
+
+ @Override
+ public int compare(Object[] o1, Object[] o2) {
+ for (int i = 0; i < _size; i++) {
+ int index = _valueIndices[i];
+ Object v1 = o1[index];
+ Object v2 = o2[index];
+ int result;
+ if (_useDoubleComparison[i]) {
+ result = Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
+ } else {
+ //noinspection unchecked
+ result = ((Comparable) v1).compareTo(v2);
+ }
+ if (result != 0) {
+ return result * _multipliers[i];
+ }
+ }
+ return 0;
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 4a9892a4dd..ca89173205 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -63,7 +63,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) {
MailboxReceiveOperator mailboxReceiveOperator =
new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
- node.getSenderStageId(), node.getStageId());
+ node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.isSortOnReceiver(),
+ node.getDataSchema(), node.getSenderStageId(), node.getStageId());
context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
return mailboxReceiveOperator;
}
@@ -72,7 +73,8 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
public MultiStageOperator visitMailboxSend(MailboxSendNode node, PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getExchangeType(),
- node.getPartitionKeySelector(), node.getStageId(), node.getReceiverStageId());
+ node.getPartitionKeySelector(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(),
+ node.getStageId(), node.getReceiverStageId());
}
@Override
@@ -120,8 +122,10 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
+ boolean isInputSorted =
+ nextOperator instanceof MailboxReceiveOperator && ((MailboxReceiveOperator) nextOperator).hasCollationKeys();
return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(),
- node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema());
+ node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema(), isInputSorted);
}
@Override
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index a295ef06f6..4d01c67105 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.service.dispatch;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Deadline;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -282,7 +283,8 @@ public class QueryDispatcher {
new OpChainExecutionContext(mailboxService, jobId, stageId, server, timeoutMs, timeoutMs, stageMetadataMap);
// timeout is set for reduce stage
MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, stageId, reducerStageId);
+ new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, Collections.emptyList(),
+ Collections.emptyList(), false, false, dataSchema, stageId, reducerStageId);
return mailboxReceiveOperator;
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index ebd712a0c4..bfbb977e7c 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -20,15 +20,20 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Random;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -43,6 +48,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
public class MailboxReceiveOperatorTest {
@@ -66,6 +72,8 @@ public class MailboxReceiveOperatorTest {
private final VirtualServerAddress _testAddr = new VirtualServerAddress("test", 123, 0);
+ private final Random _random = new Random();
+
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
@@ -80,12 +88,24 @@ public class MailboxReceiveOperatorTest {
@Test
public void shouldTimeoutOnExtraLongSleep()
throws InterruptedException {
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
// shorter timeoutMs should result in error.
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
new HashMap<>());
MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 10L);
+ new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
+ collationDirections, false, sortOnReceiver, inSchema, 456, 789, 10L);
Thread.sleep(200L);
TransferableBlock mailbox = receiveOp.nextBlock();
Assert.assertTrue(mailbox.isErrorBlock());
@@ -95,13 +115,15 @@ public class MailboxReceiveOperatorTest {
// longer timeout or default timeout (10s) doesn't result in error.
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
new HashMap<>());
- receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 2000L);
+ receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
+ collationDirections, false, sortOnReceiver, inSchema, 456, 789, 2000L);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, null);
+ receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
+ collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
@@ -120,12 +142,24 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_server2.getHostname()).thenReturn("singleton");
Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, 456,
- 789, null);
+ new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
@@ -139,11 +173,24 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_server2.getHostname()).thenReturn("singleton");
Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.RANGE_DISTRIBUTED, 456, 789, null);
+ RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys, collationDirections, false, sortOnReceiver, inSchema,
+ 456, 789, null);
}
@Test
@@ -167,13 +214,26 @@ public class MailboxReceiveOperatorTest {
String toHost = "toHost";
VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
// Receive end of stream block directly when there is no match.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -205,13 +265,26 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(true);
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
// Receive end of stream block directly when mailbox is close.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -245,13 +318,27 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox.isClosed()).thenReturn(false);
// Receive null mailbox during timeout.
Mockito.when(_mailbox.receive()).thenReturn(null);
+
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
// Receive NoOpBlock.
Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
}
@@ -283,13 +370,27 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
// Receive EosBloc.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -322,15 +423,32 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+ Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
+ while (receivedBlock.isNoOpBlock()) {
+ receivedBlock = receiveOp.nextBlock();
+ }
List<Object[]> resultRows = receivedBlock.getContainer();
Assert.assertEquals(resultRows.size(), 1);
Assert.assertEquals(resultRows.get(0), expRow);
@@ -363,14 +481,27 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Exception e = new Exception("errorBlock");
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock());
MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
@@ -407,15 +538,31 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+ Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
+ while (receivedBlock.isNoOpBlock()) {
+ receivedBlock = receiveOp.nextBlock();
+ }
List<Object[]> resultRows = receivedBlock.getContainer();
Assert.assertEquals(resultRows.size(), 1);
Assert.assertEquals(resultRows.get(0), expRow);
@@ -444,7 +591,7 @@ public class MailboxReceiveOperatorTest {
new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
- Mockito.when(_mailbox.receive()).thenReturn(null);
+ Mockito.when(_mailbox.receive()).thenReturn(null, TransferableBlockUtils.getEndOfStreamTransferableBlock());
JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
@@ -452,15 +599,31 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Object[] expRow = new Object[]{1, 1};
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+ Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
+ while (receivedBlock.isNoOpBlock()) {
+ receivedBlock = receiveOp.nextBlock();
+ }
List<Object[]> resultRows = receivedBlock.getContainer();
Assert.assertEquals(resultRows.size(), 1);
Assert.assertEquals(resultRows.get(0), expRow);
@@ -508,7 +671,8 @@ public class MailboxReceiveOperatorTest {
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ Collections.emptyList(), Collections.emptyList(), false, false, inSchema, stageId,
+ DEFAULT_RECEIVER_STAGE_ID, null);
// Receive first block from first server.
TransferableBlock receivedBlock = receiveOp.nextBlock();
List<Object[]> resultRows = receivedBlock.getContainer();
@@ -560,13 +724,25 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
// Receive error block from first server.
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -606,14 +782,191 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+
+ // Choose whether to exercise the ordering code path or not
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ boolean sortOnReceiver = false;
+ if (_random.nextBoolean()) {
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+ sortOnReceiver = true;
+ }
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- stageId, DEFAULT_RECEIVER_STAGE_ID, null);
+ collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have returned an error-block");
}
+
+ @Test
+ public void shouldReceiveMailboxFromTwoServersWithCollationKey()
+ throws Exception {
+ String server1Host = "hash1";
+ int server1Port = 123;
+ Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+ Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+ String server2Host = "hash2";
+ int server2Port = 456;
+ Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+ Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+ int jobId = 456;
+ int stageId = 0;
+ int toPort = 8888;
+ String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
+
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+ JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
+ Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+ Mockito.when(_mailbox.isClosed()).thenReturn(false);
+ Object[] expRow1 = new Object[]{3, 3};
+ Object[] expRow2 = new Object[]{1, 1};
+ Mockito.when(_mailbox.receive())
+ .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ Object[] expRow3 = new Object[]{4, 2};
+ Object[] expRow4 = new Object[]{2, 4};
+ Object[] expRow5 = new Object[]{-1, 95};
+ JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
+ Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+ Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+ Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3),
+ OperatorTestUtil.block(inSchema, expRow4), OperatorTestUtil.block(inSchema, expRow5),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // Setup the collation key and direction
+ List<RexExpression> collationKeys = new ArrayList<>(Collections.singletonList(new RexExpression.InputRef(0)));
+ RelFieldCollation.Direction direction = _random.nextBoolean() ? RelFieldCollation.Direction.ASCENDING
+ : RelFieldCollation.Direction.DESCENDING;
+ List<RelFieldCollation.Direction> collationDirection = new ArrayList<>(Collections.singletonList(direction));
+
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
+ Long.MAX_VALUE, new HashMap<>());
+
+ MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+ RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
+ DEFAULT_RECEIVER_STAGE_ID, null);
+
+ // Receive a set of no-op blocks and skip over them
+ TransferableBlock receivedBlock = receiveOp.nextBlock();
+ while (receivedBlock.isNoOpBlock()) {
+ receivedBlock = receiveOp.nextBlock();
+ }
+ List<Object[]> resultRows = receivedBlock.getContainer();
+ // All blocks should be returned together since ordering was required
+ Assert.assertEquals(resultRows.size(), 5);
+ if (direction == RelFieldCollation.Direction.ASCENDING) {
+ Assert.assertEquals(resultRows.get(0), expRow5);
+ Assert.assertEquals(resultRows.get(1), expRow2);
+ Assert.assertEquals(resultRows.get(2), expRow4);
+ Assert.assertEquals(resultRows.get(3), expRow1);
+ Assert.assertEquals(resultRows.get(4), expRow3);
+ } else {
+ Assert.assertEquals(resultRows.get(0), expRow3);
+ Assert.assertEquals(resultRows.get(1), expRow1);
+ Assert.assertEquals(resultRows.get(2), expRow4);
+ Assert.assertEquals(resultRows.get(3), expRow2);
+ Assert.assertEquals(resultRows.get(4), expRow5);
+ }
+
+ receivedBlock = receiveOp.nextBlock();
+ Assert.assertTrue(receivedBlock.isEndOfStreamBlock());
+ }
+
+ @Test
+ public void shouldReceiveMailboxFromTwoServersWithCollationKeyTwoColumns()
+ throws Exception {
+ String server1Host = "hash1";
+ int server1Port = 123;
+ Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+ Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+ String server2Host = "hash2";
+ int server2Port = 456;
+ Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+ Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+ int jobId = 456;
+ int stageId = 0;
+ int toPort = 8888;
+ String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
+
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2", "col3"},
+ new DataSchema.ColumnDataType[]{INT, INT, STRING});
+ JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
+ Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+ Mockito.when(_mailbox.isClosed()).thenReturn(false);
+ Object[] expRow1 = new Object[]{3, 3, "queen"};
+ Object[] expRow2 = new Object[]{1, 1, "pink floyd"};
+ Mockito.when(_mailbox.receive())
+ .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ Object[] expRow3 = new Object[]{42, 2, "pink floyd"};
+ Object[] expRow4 = new Object[]{2, 4, "aerosmith"};
+ Object[] expRow5 = new Object[]{-1, 95, "foo fighters"};
+ JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
+ new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
+ Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+ Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+ Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3),
+ OperatorTestUtil.block(inSchema, expRow4), OperatorTestUtil.block(inSchema, expRow5),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // Setup the collation key and direction
+ List<RexExpression> collationKeys = new ArrayList<>(Arrays.asList(new RexExpression.InputRef(2),
+ new RexExpression.InputRef(0)));
+ RelFieldCollation.Direction direction1 = _random.nextBoolean() ? RelFieldCollation.Direction.ASCENDING
+ : RelFieldCollation.Direction.DESCENDING;
+ RelFieldCollation.Direction direction2 = RelFieldCollation.Direction.ASCENDING;
+ List<RelFieldCollation.Direction> collationDirection = new ArrayList<>(Arrays.asList(direction1, direction2));
+
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
+ Long.MAX_VALUE, new HashMap<>());
+
+ MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+ RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
+ DEFAULT_RECEIVER_STAGE_ID, null);
+
+ // Receive a set of no-op blocks and skip over them
+ TransferableBlock receivedBlock = receiveOp.nextBlock();
+ while (receivedBlock.isNoOpBlock()) {
+ receivedBlock = receiveOp.nextBlock();
+ }
+ List<Object[]> resultRows = receivedBlock.getContainer();
+ // All blocks should be returned together since ordering was required
+ Assert.assertEquals(resultRows.size(), 5);
+ if (direction1 == RelFieldCollation.Direction.ASCENDING) {
+ Assert.assertEquals(resultRows.get(0), expRow4);
+ Assert.assertEquals(resultRows.get(1), expRow5);
+ Assert.assertEquals(resultRows.get(2), expRow2);
+ Assert.assertEquals(resultRows.get(3), expRow3);
+ Assert.assertEquals(resultRows.get(4), expRow1);
+ } else {
+ Assert.assertEquals(resultRows.get(0), expRow1);
+ Assert.assertEquals(resultRows.get(1), expRow2);
+ Assert.assertEquals(resultRows.get(2), expRow3);
+ Assert.assertEquals(resultRows.get(3), expRow5);
+ Assert.assertEquals(resultRows.get(4), expRow4);
+ }
+
+ receivedBlock = receiveOp.nextBlock();
+ Assert.assertTrue(receivedBlock.isEndOfStreamBlock());
+ }
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 13735ed883..72d5ba66b8 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -90,7 +90,7 @@ public class MailboxSendOperatorTest {
OpChainExecutionContext context = getOpChainContext(deadlineMs);
MailboxSendOperator operator =
- new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
@@ -112,7 +112,7 @@ public class MailboxSendOperatorTest {
OpChainExecutionContext context = getOpChainContext(deadlineMs);
MailboxSendOperator operator =
- new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
@@ -134,7 +134,7 @@ public class MailboxSendOperatorTest {
OpChainExecutionContext context = getOpChainContext(deadlineMs);
MailboxSendOperator operator =
- new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
Mockito.when(_input.nextBlock()).thenThrow(new RuntimeException("foo!"));
@@ -157,7 +157,7 @@ public class MailboxSendOperatorTest {
OpChainExecutionContext context = getOpChainContext(deadlineMs);
MailboxSendOperator operator =
- new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
@@ -180,7 +180,7 @@ public class MailboxSendOperatorTest {
OpChainExecutionContext context = getOpChainContext(deadlineMs);
MailboxSendOperator operator =
- new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector,
+ new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, null, null, false,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID);
TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{}));
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index 11e525221d..f2edadeae5 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -70,7 +70,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock())
.thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")));
@@ -89,7 +89,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
@@ -107,7 +107,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -125,7 +125,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -141,6 +141,31 @@ public class SortOperatorTest {
Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
}
+ @Test
+ public void shouldConsumeAndSkipSortInputOneBlockWithTwoRowsInputSorted() {
+ // Given:
+ List<RexExpression> collation = collation(0);
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+ SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
+ schema, true);
+
+ // Purposefully setting input as unsorted order for validation but 'isInputSorted' should only be true if actually
+ // sorted
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Assert.assertEquals(block.getNumRows(), 2);
+ Assert.assertEquals(block.getContainer().get(0), new Object[]{2});
+ Assert.assertEquals(block.getContainer().get(1), new Object[]{1});
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+ }
+
@Test
public void shouldConsumeAndSortOnNonZeroIdxCollation() {
// Given:
@@ -148,7 +173,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new DataSchema.ColumnDataType[]{INT, INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -171,7 +196,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{STRING});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -194,7 +219,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.DESCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -217,7 +242,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -233,6 +258,30 @@ public class SortOperatorTest {
Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
}
+ @Test
+ public void shouldOffsetSortInputOneBlockWithThreeRowsInputSorted() {
+ // Given:
+ List<RexExpression> collation = collation(0);
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+ SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1,
+ schema, true);
+
+ // Set input rows as sorted since input is expected to be sorted
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Assert.assertEquals(block.getNumRows(), 2);
+ Assert.assertEquals(block.getContainer().get(0), new Object[]{2});
+ Assert.assertEquals(block.getContainer().get(1), new Object[]{3});
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+ }
+
@Test
public void shouldOffsetLimitSortInputOneBlockWithThreeRows() {
// Given:
@@ -240,7 +289,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -255,6 +304,29 @@ public class SortOperatorTest {
Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
}
+ @Test
+ public void shouldOffsetLimitSortInputOneBlockWithThreeRowsInputSorted() {
+ // Given:
+ List<RexExpression> collation = collation(0);
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+ SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1,
+ schema, true);
+
+ // Set input rows as sorted since input is expected to be sorted
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}, new Object[]{2}, new Object[]{3}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Assert.assertEquals(block.getNumRows(), 1);
+ Assert.assertEquals(block.getContainer().get(0), new Object[]{2});
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+ }
+
@Test
public void shouldRespectDefaultLimit() {
// Given:
@@ -262,7 +334,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, 1);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, false, 1);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -284,7 +356,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, -1, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, -1, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
.thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
@@ -305,7 +377,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
.thenReturn(block(schema, new Object[]{1}))
@@ -322,6 +394,31 @@ public class SortOperatorTest {
Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
}
+ @Test
+ public void shouldConsumeAndSortTwoInputBlocksWithOneRowEachInputSorted() {
+ // Given:
+ List<RexExpression> collation = collation(0);
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+ SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
+ schema, true);
+
+ // Set input rows as sorted since input is expected to be sorted
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}))
+ .thenReturn(block(schema, new Object[]{2}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Assert.assertEquals(block.getNumRows(), 2);
+ Assert.assertEquals(block.getContainer().get(0), new Object[]{1});
+ Assert.assertEquals(block.getContainer().get(1), new Object[]{2});
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+ }
+
@Test
public void shouldBreakTiesUsingSecondCollationKey() {
// Given:
@@ -329,7 +426,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -354,7 +451,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING);
DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock())
.thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3}))
@@ -379,7 +476,7 @@ public class SortOperatorTest {
List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
SortOperator op =
- new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema);
+ new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema, false);
Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}))
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{1}))
@@ -397,6 +494,32 @@ public class SortOperatorTest {
Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
}
+ @Test
+ public void shouldHandleNoOpUpstreamBlockWhileConstructingInputSorted() {
+ // Given:
+ List<RexExpression> collation = collation(0);
+ List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
+ DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
+ SortOperator op = new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0,
+ schema, true);
+
+ // Set input rows as sorted since input is expected to be sorted
+ Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1}))
+ .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{2}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ // When:
+ op.nextBlock(); // consume up until NOOP, create NOOP
+ TransferableBlock block = op.nextBlock(); // construct
+ TransferableBlock block2 = op.nextBlock(); // eos
+
+ // Then:
+ Assert.assertEquals(block.getNumRows(), 2);
+ Assert.assertEquals(block.getContainer().get(0), new Object[]{1});
+ Assert.assertEquals(block.getContainer().get(1), new Object[]{2});
+ Assert.assertTrue(block2.isEndOfStreamBlock(), "expected EOS block to propagate");
+ }
+
private static List<RexExpression> collation(int... indexes) {
return Arrays.stream(indexes).mapToObj(RexExpression.InputRef::new).collect(Collectors.toList());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org