You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/04/07 17:47:15 UTC
[pinot] branch master updated: [multistage] Split MailboxReceiveOperator into sorted and non-sorted versions (#10570)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6283ee7fb8 [multistage] Split MailboxReceiveOperator into sorted and non-sorted versions (#10570)
6283ee7fb8 is described below
commit 6283ee7fb8e583166966f4f5a520310f135c50d3
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Fri Apr 7 10:47:08 2023 -0700
[multistage] Split MailboxReceiveOperator into sorted and non-sorted versions (#10570)
* Split MailboxReceiveOperator into sorted and non-sorted versions
- sorted mailbox receiver will keep polling for data from all the mailboxes until it gets only 'null' blocks or gets EOS from all mailboxes.
- non-sorted mailbox receiver will return immediately when data arrives (behavior identical to prior to #10408
* Fix tests - ORDER BY with only limit should set isSortOnReceiver to false
- when there's fetch/offset in SortNode but no Collation, skip sorting thus speed up performance
---
.../rel/rules/PinotSortExchangeNodeInsertRule.java | 2 +-
.../test/resources/queries/BasicQueryPlans.json | 4 +-
.../resources/queries/WindowFunctionPlans.json | 20 +-
.../operator/BaseMailboxReceiveOperator.java | 136 ++++++++
.../runtime/operator/MailboxReceiveOperator.java | 176 +---------
.../pinot/query/runtime/operator/SortOperator.java | 4 +-
.../operator/SortedMailboxReceiveOperator.java | 200 ++++++++++++
.../query/runtime/operator/utils/SortUtils.java | 13 +-
.../query/runtime/plan/PhysicalPlanVisitor.java | 24 +-
.../query/service/dispatch/QueryDispatcher.java | 4 +-
.../operator/MailboxReceiveOperatorTest.java | 361 +--------------------
....java => SortedMailboxReceiveOperatorTest.java} | 332 ++++++++-----------
12 files changed, 541 insertions(+), 735 deletions(-)
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
index 28a5018a08..56d08fd9ed 100644
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
+++ b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeNodeInsertRule.java
@@ -70,7 +70,7 @@ public class PinotSortExchangeNodeInsertRule extends RelOptRule {
RelDistributions.hash(Collections.emptyList()),
sort.getCollation(),
false,
- true);
+ !sort.getCollation().getKeys().isEmpty());
call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
}
}
diff --git a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
index f90469bc5e..22a6946d00 100644
--- a/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/BasicQueryPlans.json
@@ -39,7 +39,7 @@
"Execution Plan",
"\nLogicalProject(EXPR$0=[dateTrunc('DAY', $4)])",
"\n LogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalTableScan(table=[[a]])",
"\n"
@@ -52,7 +52,7 @@
"Execution Plan",
"\nLogicalProject(day=[dateTrunc('DAY', $4)])",
"\n LogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalTableScan(table=[[a]])",
"\n"
diff --git a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index f2cb699f19..ee277e54b0 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -77,7 +77,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], $1=[$2])",
"\n LogicalWindow(window#0=[window(aggs [SUM($1)])])",
@@ -300,7 +300,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[100])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[100])",
"\n LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\n LogicalWindow(window#0=[window(aggs [SUM($2), COUNT($1)])])",
@@ -536,7 +536,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} aggs [SUM($2), COUNT($2)])])",
@@ -785,7 +785,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} aggs [SUM($1), COUNT($1), MIN($1)])])",
@@ -1097,7 +1097,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(order by [1] aggs [SUM($2), COUNT($2)])])",
@@ -1265,7 +1265,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
@@ -1433,7 +1433,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [1] aggs [SUM($2), COUNT($2)])])",
@@ -1640,7 +1640,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($2):DOUBLE NOT NULL, $3)], EXPR$2=[$4])",
"\n LogicalWindow(window#0=[window(partition {0} order by [0] aggs [SUM($1), COUNT($1), MIN($1)])])",
@@ -1846,7 +1846,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [0] aggs [SUM($2), COUNT($2)])])",
@@ -2014,7 +2014,7 @@
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
- "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n LogicalProject(col1=[$0], EXPR$1=[/(CAST($3):DOUBLE NOT NULL, $4)], EXPR$2=[$5])",
"\n LogicalWindow(window#0=[window(partition {0} order by [1] aggs [SUM($2), COUNT($2), MIN($2)])])",
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
new file mode 100644
index 0000000000..0237c11393
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.service.QueryConfig;
+
+
+/**
+ * Base class to be used by the various MailboxReceiveOperators such as the sorted and non-sorted versions. This
+ * class contains the common logic needed for MailboxReceive
+ *
+ * BaseMailboxReceiveOperator receives mailbox from mailboxService from sendingStageInstances.
+ * We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
+ * When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
+ * When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
+ */
+public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
+
+ // TODO: Unify SUPPORTED_EXCHANGE_TYPES with MailboxSendOperator.
+ protected static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES =
+ ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED,
+ RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED);
+
+ protected final MailboxService<TransferableBlock> _mailboxService;
+ protected final RelDistribution.Type _exchangeType;
+ protected final List<MailboxIdentifier> _sendingMailbox;
+ protected final long _deadlineTimestampNano;
+ protected int _serverIdx;
+ protected TransferableBlock _upstreamErrorBlock;
+
+ protected static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
+ int receiverStageId, VirtualServerAddress receiver) {
+ return new JsonMailboxIdentifier(
+ String.format("%s_%s", jobId, senderStageId),
+ new VirtualServerAddress(sender),
+ receiver,
+ senderStageId,
+ receiverStageId);
+ }
+
+ public BaseMailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
+ RelDistribution.Type exchangeType, int senderStageId, int receiverStageId, Long timeoutMs) {
+ super(context);
+ _mailboxService = context.getMailboxService();
+ VirtualServerAddress receiver = context.getServer();
+ long jobId = context.getRequestId();
+ Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
+ "Exchange/Distribution type: " + exchangeType + " is not supported!");
+ long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
+ _deadlineTimestampNano = timeoutNano + System.nanoTime();
+
+ _exchangeType = exchangeType;
+ if (_exchangeType == RelDistribution.Type.SINGLETON) {
+ VirtualServer singletonInstance = null;
+ for (VirtualServer serverInstance : sendingStageInstances) {
+ if (serverInstance.getHostname().equals(_mailboxService.getHostname())
+ && serverInstance.getQueryMailboxPort() == _mailboxService.getMailboxPort()) {
+ Preconditions.checkState(singletonInstance == null, "multiple instance found for singleton exchange type!");
+ singletonInstance = serverInstance;
+ }
+ }
+
+ if (singletonInstance == null) {
+ // TODO: fix WorkerManager assignment, this should not happen if we properly assign workers.
+ // see: https://github.com/apache/pinot/issues/9611
+ _sendingMailbox = Collections.emptyList();
+ } else {
+ _sendingMailbox =
+ Collections.singletonList(toMailboxId(singletonInstance, jobId, senderStageId, receiverStageId, receiver));
+ }
+ } else {
+ _sendingMailbox = new ArrayList<>(sendingStageInstances.size());
+ for (VirtualServer instance : sendingStageInstances) {
+ _sendingMailbox.add(toMailboxId(instance, jobId, senderStageId, receiverStageId, receiver));
+ }
+ }
+ _upstreamErrorBlock = null;
+ _serverIdx = 0;
+ }
+
+ public List<MailboxIdentifier> getSendingMailbox() {
+ return _sendingMailbox;
+ }
+
+ @Override
+ public List<MultiStageOperator> getChildOperators() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+ _mailboxService.releaseReceivingMailbox(sendingMailbox);
+ }
+ }
+
+ @Override
+ public void cancel(Throwable t) {
+ super.cancel(t);
+ for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+ _mailboxService.releaseReceivingMailbox(sendingMailbox);
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index f65f9b8d15..b2e70c1dcf 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -18,35 +18,17 @@
*/
package org.apache.pinot.query.runtime.operator;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
-import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.routing.VirtualServer;
-import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.utils.SortUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.query.service.QueryConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,119 +36,22 @@ import org.slf4j.LoggerFactory;
/**
* This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
* {@link MultiStageOperator#getNextBlock()}()} API.
- *
- * MailboxReceiveOperator receives mailbox from mailboxService from sendingStageInstances.
- * We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
- * When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
- * When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
- *
- * TODO: Once sorting on the {@code MailboxSendOperator} is available, modify this to use a k-way merge instead of
- * resorting via the PriorityQueue.
*/
-public class MailboxReceiveOperator extends MultiStageOperator {
+public class MailboxReceiveOperator extends BaseMailboxReceiveOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
- // TODO: Unify SUPPORTED_EXCHANGE_TYPES with MailboxSendOperator.
- private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES =
- ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED,
- RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED);
-
- private final MailboxService<TransferableBlock> _mailboxService;
- private final RelDistribution.Type _exchangeType;
- private final List<RexExpression> _collationKeys;
- private final List<RelFieldCollation.Direction> _collationDirections;
- private final boolean _isSortOnSender;
- private final boolean _isSortOnReceiver;
- private final DataSchema _dataSchema;
- private final List<MailboxIdentifier> _sendingMailbox;
- private final long _deadlineTimestampNano;
- private final PriorityQueue<Object[]> _priorityQueue;
- private int _serverIdx;
- private TransferableBlock _upstreamErrorBlock;
- private boolean _isSortedBlockConstructed;
-
- private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, int senderStageId,
- int receiverStageId, VirtualServerAddress receiver) {
- return new JsonMailboxIdentifier(
- String.format("%s_%s", jobId, senderStageId),
- new VirtualServerAddress(sender),
- receiver,
- senderStageId,
- receiverStageId);
- }
-
- public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
- List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender,
- boolean isSortOnReceiver, DataSchema dataSchema, int senderStageId, int receiverStageId) {
- this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, collationKeys,
- collationDirections, isSortOnSender, isSortOnReceiver, dataSchema, senderStageId,
+ public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, int senderStageId,
+ int receiverStageId) {
+ this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId,
receiverStageId, context.getTimeoutMs());
}
// TODO: Move deadlineInNanoSeconds to OperatorContext.
// TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context.
public MailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
- RelDistribution.Type exchangeType, List<RexExpression> collationKeys,
- List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender, boolean isSortOnReceiver,
- DataSchema dataSchema, int senderStageId, int receiverStageId, Long timeoutMs) {
- super(context);
- _mailboxService = context.getMailboxService();
- VirtualServerAddress receiver = context.getServer();
- long jobId = context.getRequestId();
- Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType),
- "Exchange/Distribution type: " + exchangeType + " is not supported!");
- long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L;
- _deadlineTimestampNano = timeoutNano + System.nanoTime();
-
- _exchangeType = exchangeType;
- if (_exchangeType == RelDistribution.Type.SINGLETON) {
- VirtualServer singletonInstance = null;
- for (VirtualServer serverInstance : sendingStageInstances) {
- if (serverInstance.getHostname().equals(_mailboxService.getHostname())
- && serverInstance.getQueryMailboxPort() == _mailboxService.getMailboxPort()) {
- Preconditions.checkState(singletonInstance == null, "multiple instance found for singleton exchange type!");
- singletonInstance = serverInstance;
- }
- }
-
- if (singletonInstance == null) {
- // TODO: fix WorkerManager assignment, this should not happen if we properly assign workers.
- // see: https://github.com/apache/pinot/issues/9611
- _sendingMailbox = Collections.emptyList();
- } else {
- _sendingMailbox =
- Collections.singletonList(toMailboxId(singletonInstance, jobId, senderStageId, receiverStageId, receiver));
- }
- } else {
- _sendingMailbox = new ArrayList<>(sendingStageInstances.size());
- for (VirtualServer instance : sendingStageInstances) {
- _sendingMailbox.add(toMailboxId(instance, jobId, senderStageId, receiverStageId, receiver));
- }
- }
- _collationKeys = collationKeys;
- _collationDirections = collationDirections;
- _isSortOnSender = isSortOnSender;
- _isSortOnReceiver = isSortOnReceiver;
- _dataSchema = dataSchema;
- if (CollectionUtils.isEmpty(collationKeys) || !_isSortOnReceiver) {
- _priorityQueue = null;
- } else {
- _priorityQueue = new PriorityQueue<>(new SortUtils.SortComparator(collationKeys, collationDirections,
- dataSchema, false));
- }
- _upstreamErrorBlock = null;
- _serverIdx = 0;
- _isSortedBlockConstructed = false;
- }
-
- public List<MailboxIdentifier> getSendingMailbox() {
- return _sendingMailbox;
- }
-
- @Override
- public List<MultiStageOperator> getChildOperators() {
- return ImmutableList.of();
+ RelDistribution.Type exchangeType, int senderStageId, int receiverStageId, Long timeoutMs) {
+ super(context, sendingStageInstances, exchangeType, senderStageId, receiverStageId, timeoutMs);
}
@Nullable
@@ -178,7 +63,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
@Override
protected TransferableBlock getNextBlock() {
if (_upstreamErrorBlock != null) {
- cleanUpResourcesOnError();
return _upstreamErrorBlock;
} else if (System.nanoTime() >= _deadlineTimestampNano) {
return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
@@ -201,20 +85,12 @@ public class MailboxReceiveOperator extends MultiStageOperator {
// Get null block when pulling times out from mailbox.
if (block != null) {
if (block.isErrorBlock()) {
- cleanUpResourcesOnError();
_upstreamErrorBlock =
TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
return _upstreamErrorBlock;
}
if (!block.isEndOfStreamBlock()) {
- if (_priorityQueue != null) {
- // Ordering is enabled, add rows to the PriorityQueue
- List<Object[]> container = block.getContainer();
- _priorityQueue.addAll(container);
- } else {
- // Ordering is not enabled, return the input block as is
- return block;
- }
+ return block;
} else {
if (_opChainStats != null && !block.getResultMetadata().isEmpty()) {
for (Map.Entry<String, OperatorStats> entry : block.getResultMetadata().entrySet()) {
@@ -231,18 +107,6 @@ public class MailboxReceiveOperator extends MultiStageOperator {
}
}
- if (((openMailboxCount == 0) || (openMailboxCount <= eosMailboxCount))
- && (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) {
- // Some data is present in the PriorityQueue, these need to be sent upstream
- LinkedList<Object[]> rows = new LinkedList<>();
- while (_priorityQueue.size() > 0) {
- Object[] row = _priorityQueue.poll();
- rows.addFirst(row);
- }
- _isSortedBlockConstructed = true;
- return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
- }
-
// there are two conditions in which we should return EOS: (1) there were
// no mailboxes to open (this shouldn't happen because the second condition
// should be hit first, but is defensive) (2) every mailbox that was opened
@@ -253,30 +117,4 @@ public class MailboxReceiveOperator extends MultiStageOperator {
: TransferableBlockUtils.getEndOfStreamTransferableBlock();
return block;
}
-
- private void cleanUpResourcesOnError() {
- if (_priorityQueue != null) {
- _priorityQueue.clear();
- }
- }
-
- public boolean hasCollationKeys() {
- return !CollectionUtils.isEmpty(_collationKeys);
- }
-
- @Override
- public void close() {
- super.close();
- for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
- _mailboxService.releaseReceivingMailbox(sendingMailbox);
- }
- }
-
- @Override
- public void cancel(Throwable t) {
- super.cancel(t);
- for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
- _mailboxService.releaseReceivingMailbox(sendingMailbox);
- }
- }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 7668dc0397..1350242508 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -80,8 +80,10 @@ public class SortOperator extends MultiStageOperator {
_priorityQueue = null;
_rows = new ArrayList<>();
} else {
+ // Use the opposite direction as specified by the collation directions since we need the PriorityQueue to decide
+ // which elements to keep and which to remove based on the limits.
_priorityQueue = new PriorityQueue<>(_numRowsToKeep,
- new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false));
+ new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false, true));
_rows = null;
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
new file mode 100644
index 0000000000..b565ed2d4c
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperator.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.routing.VirtualServer;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.utils.SortUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This {@code SortedMailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
+ * {@link MultiStageOperator#getNextBlock()}()} API in a sorted manner.
+ *
+ * TODO: Once sorting on the {@code MailboxSendOperator} is available, modify this to use a k-way merge instead of
+ * resorting via the PriorityQueue.
+ */
+public class SortedMailboxReceiveOperator extends BaseMailboxReceiveOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SortedMailboxReceiveOperator.class);
+ private static final String EXPLAIN_NAME = "SORTED_MAILBOX_RECEIVE";
+
+ private final List<RexExpression> _collationKeys;
+ private final List<RelFieldCollation.Direction> _collationDirections;
+ private final boolean _isSortOnSender;
+ private final boolean _isSortOnReceiver;
+ private final DataSchema _dataSchema;
+ private final PriorityQueue<Object[]> _priorityQueue;
+ private boolean _isSortedBlockConstructed;
+
+ public SortedMailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType,
+ List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender,
+ boolean isSortOnReceiver, DataSchema dataSchema, int senderStageId, int receiverStageId) {
+ this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, collationKeys,
+ collationDirections, isSortOnSender, isSortOnReceiver, dataSchema, senderStageId,
+ receiverStageId, context.getTimeoutMs());
+ }
+
+ // TODO: Move deadlineInNanoSeconds to OperatorContext.
+ // TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context.
+ public SortedMailboxReceiveOperator(OpChainExecutionContext context, List<VirtualServer> sendingStageInstances,
+ RelDistribution.Type exchangeType, List<RexExpression> collationKeys,
+ List<RelFieldCollation.Direction> collationDirections, boolean isSortOnSender, boolean isSortOnReceiver,
+ DataSchema dataSchema, int senderStageId, int receiverStageId, Long timeoutMs) {
+ super(context, sendingStageInstances, exchangeType, senderStageId, receiverStageId, timeoutMs);
+ _collationKeys = collationKeys;
+ _collationDirections = collationDirections;
+ _isSortOnSender = isSortOnSender;
+ _isSortOnReceiver = isSortOnReceiver;
+ _dataSchema = dataSchema;
+ Preconditions.checkState(!CollectionUtils.isEmpty(collationKeys) && isSortOnReceiver,
+ "Collation keys should exist and sorting must be enabled otherwise use non-sorted MailboxReceiveOperator");
+ // No need to switch the direction since all rows will be stored in the priority queue without applying limits
+ _priorityQueue = new PriorityQueue<>(new SortUtils.SortComparator(collationKeys, collationDirections,
+ dataSchema, false, false));
+ _isSortedBlockConstructed = false;
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ if (_upstreamErrorBlock != null) {
+ return _upstreamErrorBlock;
+ } else if (System.nanoTime() >= _deadlineTimestampNano) {
+ return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+ }
+
+ int startingIdx = _serverIdx;
+ int openMailboxCount = 0;
+ int eosMailboxCount = 0;
+ boolean foundNonNullTransferableBlock;
+ // For all non-singleton distribution, we poll from every instance to check mailbox content.
+ // Since this operator needs to wait for all incoming data before it can send the data to the
+ // upstream operators, this operator will keep trying to poll from all mailboxes until it only
+ // receives null blocks or EOS for all mailboxes. This operator does not need to yield itself
+ // for backpressure at the moment but once support is added for k-way merge when input data is
+ // sorted this operator will have to return some data blocks without waiting for all the data.
+ // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
+ do {
+ // Reset the following for each loop
+ foundNonNullTransferableBlock = false;
+ openMailboxCount = 0;
+ eosMailboxCount = 0;
+ for (int i = 0; i < _sendingMailbox.size(); i++) {
+ // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
+ _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+ MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
+ try {
+ ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
+ if (!mailbox.isClosed()) {
+ openMailboxCount++;
+ TransferableBlock block = mailbox.receive();
+ // Get null block when pulling times out from mailbox.
+ if (block != null) {
+ foundNonNullTransferableBlock = true;
+ if (block.isErrorBlock()) {
+ _upstreamErrorBlock =
+ TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
+ return _upstreamErrorBlock;
+ }
+ if (!block.isEndOfStreamBlock()) {
+ // Add rows to the PriorityQueue to order them
+ List<Object[]> container = block.getContainer();
+ _priorityQueue.addAll(container);
+ } else {
+ if (_opChainStats != null && !block.getResultMetadata().isEmpty()) {
+ for (Map.Entry<String, OperatorStats> entry : block.getResultMetadata().entrySet()) {
+ _opChainStats.getOperatorStatsMap().compute(entry.getKey(), (_key, _value) -> entry.getValue());
+ }
+ }
+ eosMailboxCount++;
+ }
+ }
+ }
+ } catch (Exception e) {
+ return TransferableBlockUtils.getErrorTransferableBlock(
+ new RuntimeException(String.format("Error polling mailbox=%s", mailboxId), e));
+ }
+ }
+ } while (foundNonNullTransferableBlock && ((openMailboxCount > 0) && (openMailboxCount > eosMailboxCount)));
+
+ if (((openMailboxCount == 0) || (openMailboxCount == eosMailboxCount))
+ && (!CollectionUtils.isEmpty(_priorityQueue)) && !_isSortedBlockConstructed) {
+ // Some data is present in the PriorityQueue, these need to be sent upstream
+ List<Object[]> rows = new ArrayList<>();
+ while (_priorityQueue.size() > 0) {
+ Object[] row = _priorityQueue.poll();
+ rows.add(row);
+ }
+ _isSortedBlockConstructed = true;
+ return new TransferableBlock(rows, _dataSchema, DataBlock.Type.ROW);
+ }
+
+ // there are two conditions in which we should return EOS: (1) there were
+ // no mailboxes to open (this shouldn't happen because the second condition
+ // should be hit first, but is defensive) (2) every mailbox that was opened
+ // returned an EOS block. in every other scenario, there are mailboxes that
+ // are not yet exhausted and we should wait for more data to be available
+ TransferableBlock block =
+ openMailboxCount > 0 && openMailboxCount > eosMailboxCount ? TransferableBlockUtils.getNoOpTransferableBlock()
+ : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ return block;
+ }
+
+ private void cleanUpResources() {
+ if (_priorityQueue != null) {
+ _priorityQueue.clear();
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ cleanUpResources();
+ }
+
+ @Override
+ public void cancel(Throwable t) {
+ super.cancel(t);
+ cleanUpResources();
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
index 820402f360..adc00d9c78 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/SortUtils.java
@@ -36,8 +36,16 @@ public class SortUtils {
private final int[] _multipliers;
private final boolean[] _useDoubleComparison;
+ /**
+ * Sort comparator for use with priority queues
+ * @param collationKeys collation keys to sort on
+ * @param collationDirections collation direction for each collation key to sort on
+ * @param dataSchema data schema to use
+ * @param isNullHandlingEnabled 'true' if null handling is enabled. Not supported yet
+ * @param switchDirections 'true' if the opposite sort direction should be used as what is specified
+ */
public SortComparator(List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections,
- DataSchema dataSchema, boolean isNullHandlingEnabled) {
+ DataSchema dataSchema, boolean isNullHandlingEnabled, boolean switchDirections) {
DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
_size = collationKeys.size();
_valueIndices = new int[_size];
@@ -45,7 +53,8 @@ public class SortUtils {
_useDoubleComparison = new boolean[_size];
for (int i = 0; i < _size; i++) {
_valueIndices[i] = ((RexExpression.InputRef) collationKeys.get(i)).getIndex();
- _multipliers[i] = collationDirections.get(i).isDescending() ? 1 : -1;
+ _multipliers[i] = switchDirections ? (collationDirections.get(i).isDescending() ? 1 : -1)
+ : (collationDirections.get(i).isDescending() ? -1 : 1);
_useDoubleComparison[i] = columnDataTypes[_valueIndices[i]].isNumber();
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index ca89173205..1890f1306c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -39,6 +39,7 @@ import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.operator.SortOperator;
+import org.apache.pinot.query.runtime.operator.SortedMailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.TransformOperator;
import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
@@ -61,12 +62,20 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) {
- MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
- node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.isSortOnReceiver(),
- node.getDataSchema(), node.getSenderStageId(), node.getStageId());
- context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
- return mailboxReceiveOperator;
+ if (node.isSortOnReceiver()) {
+ SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
+ new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
+ node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.isSortOnReceiver(),
+ node.getDataSchema(), node.getSenderStageId(), node.getStageId());
+ context.addReceivingMailboxes(sortedMailboxReceiveOperator.getSendingMailbox());
+ return sortedMailboxReceiveOperator;
+ } else {
+ MailboxReceiveOperator mailboxReceiveOperator =
+ new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
+ node.getSenderStageId(), node.getStageId());
+ context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox());
+ return mailboxReceiveOperator;
+ }
}
@Override
@@ -122,8 +131,7 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<MultiStageOperator,
@Override
public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
- boolean isInputSorted =
- nextOperator instanceof MailboxReceiveOperator && ((MailboxReceiveOperator) nextOperator).hasCollationKeys();
+ boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator;
return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(),
node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema(), isInputSorted);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 43345b3b08..011fed8472 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.service.dispatch;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Deadline;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -291,8 +290,7 @@ public class QueryDispatcher {
DataSchema dataSchema, OpChainExecutionContext context) {
// timeout is set for reduce stage
MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, Collections.emptyList(),
- Collections.emptyList(), false, false, dataSchema, stageId, reducerStageId);
+ new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, stageId, reducerStageId);
return mailboxReceiveOperator;
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index bfbb977e7c..7e932067e1 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -20,20 +20,16 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelFieldCollation;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
-import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.routing.VirtualServer;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
@@ -48,7 +44,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
-import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
public class MailboxReceiveOperatorTest {
@@ -88,24 +83,12 @@ public class MailboxReceiveOperatorTest {
@Test
public void shouldTimeoutOnExtraLongSleep()
throws InterruptedException {
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
// shorter timeoutMs should result in error.
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
new HashMap<>());
MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
- collationDirections, false, sortOnReceiver, inSchema, 456, 789, 10L);
+ new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 10L);
Thread.sleep(200L);
TransferableBlock mailbox = receiveOp.nextBlock();
Assert.assertTrue(mailbox.isErrorBlock());
@@ -115,15 +98,13 @@ public class MailboxReceiveOperatorTest {
// longer timeout or default timeout (10s) doesn't result in error.
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
new HashMap<>());
- receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
- collationDirections, false, sortOnReceiver, inSchema, 456, 789, 2000L);
+ receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 2000L);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
- collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
+ receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, null);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
@@ -142,24 +123,12 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_server2.getHostname()).thenReturn("singleton");
Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
+ 456, 789, null);
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
@@ -173,24 +142,11 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_server2.getHostname()).thenReturn("singleton");
Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys, collationDirections, false, sortOnReceiver, inSchema,
- 456, 789, null);
+ RelDistribution.Type.RANGE_DISTRIBUTED, 456, 789, null);
}
@Test
@@ -214,26 +170,13 @@ public class MailboxReceiveOperatorTest {
String toHost = "toHost";
VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
// Receive end of stream block directly when there is no match.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -265,26 +208,13 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(true);
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
// Receive end of stream block directly when mailbox is close.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -319,26 +249,13 @@ public class MailboxReceiveOperatorTest {
// Receive null mailbox during timeout.
Mockito.when(_mailbox.receive()).thenReturn(null);
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
// Receive NoOpBlock.
Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
}
@@ -371,26 +288,13 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox.isClosed()).thenReturn(false);
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
-
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
// Receive EosBloc.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -426,25 +330,13 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
while (receivedBlock.isNoOpBlock()) {
receivedBlock = receiveOp.nextBlock();
@@ -484,24 +376,13 @@ public class MailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock());
MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
@@ -541,24 +422,13 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
while (receivedBlock.isNoOpBlock()) {
receivedBlock = receiveOp.nextBlock();
@@ -602,24 +472,13 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
while (receivedBlock.isNoOpBlock()) {
receivedBlock = receiveOp.nextBlock();
@@ -671,8 +530,7 @@ public class MailboxReceiveOperatorTest {
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- Collections.emptyList(), Collections.emptyList(), false, false, inSchema, stageId,
- DEFAULT_RECEIVER_STAGE_ID, null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
// Receive first block from first server.
TransferableBlock receivedBlock = receiveOp.nextBlock();
List<Object[]> resultRows = receivedBlock.getContainer();
@@ -725,24 +583,13 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
// Receive error block from first server.
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -783,190 +630,14 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
- // Choose whether to exercise the ordering code path or not
- List<RexExpression> collationKeys = new ArrayList<>();
- List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
-
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
MailboxReceiveOperator receiveOp =
new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ stageId, DEFAULT_RECEIVER_STAGE_ID, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have returned an error-block");
}
-
- @Test
- public void shouldReceiveMailboxFromTwoServersWithCollationKey()
- throws Exception {
- String server1Host = "hash1";
- int server1Port = 123;
- Mockito.when(_server1.getHostname()).thenReturn(server1Host);
- Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
-
- String server2Host = "hash2";
- int server2Port = 456;
- Mockito.when(_server2.getHostname()).thenReturn(server2Host);
- Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
-
- int jobId = 456;
- int stageId = 0;
- int toPort = 8888;
- String toHost = "toHost";
- VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
-
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
- Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
- Mockito.when(_mailbox.isClosed()).thenReturn(false);
- Object[] expRow1 = new Object[]{3, 3};
- Object[] expRow2 = new Object[]{1, 1};
- Mockito.when(_mailbox.receive())
- .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
- Object[] expRow3 = new Object[]{4, 2};
- Object[] expRow4 = new Object[]{2, 4};
- Object[] expRow5 = new Object[]{-1, 95};
- JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
- Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
- Mockito.when(_mailbox2.isClosed()).thenReturn(false);
- Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3),
- OperatorTestUtil.block(inSchema, expRow4), OperatorTestUtil.block(inSchema, expRow5),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
- // Setup the collation key and direction
- List<RexExpression> collationKeys = new ArrayList<>(Collections.singletonList(new RexExpression.InputRef(0)));
- RelFieldCollation.Direction direction = _random.nextBoolean() ? RelFieldCollation.Direction.ASCENDING
- : RelFieldCollation.Direction.DESCENDING;
- List<RelFieldCollation.Direction> collationDirection = new ArrayList<>(Collections.singletonList(direction));
-
- OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
-
- MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
- DEFAULT_RECEIVER_STAGE_ID, null);
-
- // Receive a set of no-op blocks and skip over them
- TransferableBlock receivedBlock = receiveOp.nextBlock();
- while (receivedBlock.isNoOpBlock()) {
- receivedBlock = receiveOp.nextBlock();
- }
- List<Object[]> resultRows = receivedBlock.getContainer();
- // All blocks should be returned together since ordering was required
- Assert.assertEquals(resultRows.size(), 5);
- if (direction == RelFieldCollation.Direction.ASCENDING) {
- Assert.assertEquals(resultRows.get(0), expRow5);
- Assert.assertEquals(resultRows.get(1), expRow2);
- Assert.assertEquals(resultRows.get(2), expRow4);
- Assert.assertEquals(resultRows.get(3), expRow1);
- Assert.assertEquals(resultRows.get(4), expRow3);
- } else {
- Assert.assertEquals(resultRows.get(0), expRow3);
- Assert.assertEquals(resultRows.get(1), expRow1);
- Assert.assertEquals(resultRows.get(2), expRow4);
- Assert.assertEquals(resultRows.get(3), expRow2);
- Assert.assertEquals(resultRows.get(4), expRow5);
- }
-
- receivedBlock = receiveOp.nextBlock();
- Assert.assertTrue(receivedBlock.isEndOfStreamBlock());
- }
-
- @Test
- public void shouldReceiveMailboxFromTwoServersWithCollationKeyTwoColumns()
- throws Exception {
- String server1Host = "hash1";
- int server1Port = 123;
- Mockito.when(_server1.getHostname()).thenReturn(server1Host);
- Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
-
- String server2Host = "hash2";
- int server2Port = 456;
- Mockito.when(_server2.getHostname()).thenReturn(server2Host);
- Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
-
- int jobId = 456;
- int stageId = 0;
- int toPort = 8888;
- String toHost = "toHost";
- VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
-
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2", "col3"},
- new DataSchema.ColumnDataType[]{INT, INT, STRING});
- JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
- Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
- Mockito.when(_mailbox.isClosed()).thenReturn(false);
- Object[] expRow1 = new Object[]{3, 3, "queen"};
- Object[] expRow2 = new Object[]{1, 1, "pink floyd"};
- Mockito.when(_mailbox.receive())
- .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
- Object[] expRow3 = new Object[]{42, 2, "pink floyd"};
- Object[] expRow4 = new Object[]{2, 4, "aerosmith"};
- Object[] expRow5 = new Object[]{-1, 95, "foo fighters"};
- JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
- Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
- Mockito.when(_mailbox2.isClosed()).thenReturn(false);
- Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3),
- OperatorTestUtil.block(inSchema, expRow4), OperatorTestUtil.block(inSchema, expRow5),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
- // Setup the collation key and direction
- List<RexExpression> collationKeys = new ArrayList<>(Arrays.asList(new RexExpression.InputRef(2),
- new RexExpression.InputRef(0)));
- RelFieldCollation.Direction direction1 = _random.nextBoolean() ? RelFieldCollation.Direction.ASCENDING
- : RelFieldCollation.Direction.DESCENDING;
- RelFieldCollation.Direction direction2 = RelFieldCollation.Direction.ASCENDING;
- List<RelFieldCollation.Direction> collationDirection = new ArrayList<>(Arrays.asList(direction1, direction2));
-
- OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
- Long.MAX_VALUE, new HashMap<>());
-
- MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
- DEFAULT_RECEIVER_STAGE_ID, null);
-
- // Receive a set of no-op blocks and skip over them
- TransferableBlock receivedBlock = receiveOp.nextBlock();
- while (receivedBlock.isNoOpBlock()) {
- receivedBlock = receiveOp.nextBlock();
- }
- List<Object[]> resultRows = receivedBlock.getContainer();
- // All blocks should be returned together since ordering was required
- Assert.assertEquals(resultRows.size(), 5);
- if (direction1 == RelFieldCollation.Direction.ASCENDING) {
- Assert.assertEquals(resultRows.get(0), expRow4);
- Assert.assertEquals(resultRows.get(1), expRow5);
- Assert.assertEquals(resultRows.get(2), expRow2);
- Assert.assertEquals(resultRows.get(3), expRow3);
- Assert.assertEquals(resultRows.get(4), expRow1);
- } else {
- Assert.assertEquals(resultRows.get(0), expRow1);
- Assert.assertEquals(resultRows.get(1), expRow2);
- Assert.assertEquals(resultRows.get(2), expRow3);
- Assert.assertEquals(resultRows.get(3), expRow5);
- Assert.assertEquals(resultRows.get(4), expRow4);
- }
-
- receivedBlock = receiveOp.nextBlock();
- Assert.assertTrue(receivedBlock.isEndOfStreamBlock());
- }
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
similarity index 77%
copy from pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
copy to pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index bfbb977e7c..659f7e8860 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -51,7 +51,7 @@ import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
-public class MailboxReceiveOperatorTest {
+public class SortedMailboxReceiveOperatorTest {
private static final int DEFAULT_RECEIVER_STAGE_ID = 10;
@@ -88,24 +88,19 @@ public class MailboxReceiveOperatorTest {
@Test
public void shouldTimeoutOnExtraLongSleep()
throws InterruptedException {
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
// shorter timeoutMs should result in error.
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L,
new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
- collationDirections, false, sortOnReceiver, inSchema, 456, 789, 10L);
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
+ collationDirections, false, true, inSchema, 456, 789, 10L);
Thread.sleep(200L);
TransferableBlock mailbox = receiveOp.nextBlock();
Assert.assertTrue(mailbox.isErrorBlock());
@@ -115,15 +110,15 @@ public class MailboxReceiveOperatorTest {
// longer timeout or default timeout (10s) doesn't result in error.
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L,
new HashMap<>());
- receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
- collationDirections, false, sortOnReceiver, inSchema, 456, 789, 2000L);
+ receiveOp = new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, 456, 789, 2000L);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, collationKeys,
- collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
+ receiveOp = new SortedMailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, 456, 789, null);
Thread.sleep(200L);
mailbox = receiveOp.nextBlock();
Assert.assertFalse(mailbox.isErrorBlock());
@@ -144,22 +139,17 @@ public class MailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, 456, 789, null);
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, 456, 789, null);
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
@@ -175,22 +165,17 @@ public class MailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys, collationDirections, false, sortOnReceiver, inSchema,
- 456, 789, null);
+ SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
+ ImmutableList.of(_server1, _server2), RelDistribution.Type.RANGE_DISTRIBUTED, collationKeys,
+ collationDirections, false, true, inSchema, 456, 789, null);
}
@Test
@@ -216,23 +201,18 @@ public class MailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
null);
// Receive end of stream block directly when there is no match.
@@ -267,23 +247,18 @@ public class MailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
null);
// Receive end of stream block directly when mailbox is close.
@@ -321,23 +296,18 @@ public class MailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
null);
// Receive NoOpBlock.
Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
@@ -373,23 +343,18 @@ public class MailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
null);
// Receive EosBloc.
Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
@@ -426,24 +391,18 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
-
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
while (receivedBlock.isNoOpBlock()) {
@@ -484,23 +443,18 @@ public class MailboxReceiveOperatorTest {
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON,
+ collationKeys, collationDirections, false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -508,8 +462,9 @@ public class MailboxReceiveOperatorTest {
Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("errorBlock"));
}
- @Test
- public void shouldReceiveMailboxFromTwoServersOneClose()
+ @Test(expectedExceptions = IllegalStateException.class,
+ expectedExceptionsMessageRegExp = ".*Collation keys should exist.*")
+ public void shouldThrowOnEmptyCollationKey()
throws Exception {
String server1Host = "hash1";
int server1Port = 123;
@@ -527,49 +482,60 @@ public class MailboxReceiveOperatorTest {
String toHost = "toHost";
VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
- JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
- Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
- Mockito.when(_mailbox.isClosed()).thenReturn(true);
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
+ Long.MAX_VALUE, new HashMap<>());
+
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+ RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
+ DEFAULT_RECEIVER_STAGE_ID, null);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
+ expectedExceptionsMessageRegExp = ".*sorting must be enabled.*")
+ public void shouldThrowOnShouldSortOnReceiverFalse()
+ throws Exception {
+ String server1Host = "hash1";
+ int server1Port = 123;
+ Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+ Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+ String server2Host = "hash2";
+ int server2Port = 456;
+ Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+ Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+ int jobId = 456;
+ int stageId = 0;
+ int toPort = 8888;
+ String toHost = "toHost";
+ VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
- JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
- new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
- Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
- Mockito.when(_mailbox2.isClosed()).thenReturn(false);
- Object[] expRow = new Object[]{1, 1};
DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
- Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
- TransferableBlock receivedBlock = receiveOp.nextBlock();
- while (receivedBlock.isNoOpBlock()) {
- receivedBlock = receiveOp.nextBlock();
- }
- List<Object[]> resultRows = receivedBlock.getContainer();
- Assert.assertEquals(resultRows.size(), 1);
- Assert.assertEquals(resultRows.get(0), expRow);
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+ RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, false, inSchema, stageId,
+ DEFAULT_RECEIVER_STAGE_ID, null);
}
@Test
- public void shouldReceiveMailboxFromTwoServersOneNull()
+ public void shouldReceiveMailboxFromTwoServersOneClose()
throws Exception {
String server1Host = "hash1";
int server1Port = 123;
@@ -590,8 +556,7 @@ public class MailboxReceiveOperatorTest {
JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
- Mockito.when(_mailbox.isClosed()).thenReturn(false);
- Mockito.when(_mailbox.receive()).thenReturn(null, TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_mailbox.isClosed()).thenReturn(true);
JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
@@ -602,24 +567,19 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+ RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
+ DEFAULT_RECEIVER_STAGE_ID, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
while (receivedBlock.isNoOpBlock()) {
receivedBlock = receiveOp.nextBlock();
@@ -630,7 +590,7 @@ public class MailboxReceiveOperatorTest {
}
@Test
- public void shouldReceiveMailboxFromTwoServers()
+ public void shouldReceiveMailboxFromTwoServersOneNull()
throws Exception {
String server1Host = "hash1";
int server1Port = 123;
@@ -648,47 +608,41 @@ public class MailboxReceiveOperatorTest {
String toHost = "toHost";
VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0);
- DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
Mockito.when(_mailbox.isClosed()).thenReturn(false);
- Object[] expRow1 = new Object[]{1, 1};
- Object[] expRow2 = new Object[]{2, 2};
- Mockito.when(_mailbox.receive())
- .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ Mockito.when(_mailbox.receive()).thenReturn(null, TransferableBlockUtils.getEndOfStreamTransferableBlock());
- Object[] expRow3 = new Object[]{3, 3};
JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId),
new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID);
Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
- Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+ Object[] expRow = new Object[]{1, 1};
+ DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+ Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ List<RexExpression> collationKeys = new ArrayList<>();
+ List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
+
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- Collections.emptyList(), Collections.emptyList(), false, false, inSchema, stageId,
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+ RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
DEFAULT_RECEIVER_STAGE_ID, null);
- // Receive first block from first server.
TransferableBlock receivedBlock = receiveOp.nextBlock();
+ while (receivedBlock.isNoOpBlock()) {
+ receivedBlock = receiveOp.nextBlock();
+ }
List<Object[]> resultRows = receivedBlock.getContainer();
Assert.assertEquals(resultRows.size(), 1);
- Assert.assertEquals(resultRows.get(0), expRow1);
- // Receive second block from first server.
- receivedBlock = receiveOp.nextBlock();
- resultRows = receivedBlock.getContainer();
- Assert.assertEquals(resultRows.size(), 1);
- Assert.assertEquals(resultRows.get(0), expRow2);
-
- // Receive from second server.
- receivedBlock = receiveOp.nextBlock();
- resultRows = receivedBlock.getContainer();
- Assert.assertEquals(resultRows.size(), 1);
- Assert.assertEquals(resultRows.get(0), expRow3);
+ Assert.assertEquals(resultRows.get(0), expRow);
}
@Test
@@ -725,24 +679,19 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+ RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
+ DEFAULT_RECEIVER_STAGE_ID, null);
// Receive error block from first server.
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock());
@@ -783,24 +732,19 @@ public class MailboxReceiveOperatorTest {
Mockito.when(_mailbox2.isClosed()).thenReturn(false);
Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
- // Choose whether to exercise the ordering code path or not
List<RexExpression> collationKeys = new ArrayList<>();
List<RelFieldCollation.Direction> collationDirections = new ArrayList<>();
- boolean sortOnReceiver = false;
- if (_random.nextBoolean()) {
- collationKeys.add(new RexExpression.InputRef(0));
- collationDirections.add(RelFieldCollation.Direction.ASCENDING);
- sortOnReceiver = true;
- }
+ collationKeys.add(new RexExpression.InputRef(0));
+ collationDirections.add(RelFieldCollation.Direction.ASCENDING);
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp =
- new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED,
- collationKeys, collationDirections, false, sortOnReceiver, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID,
- null);
+ SortedMailboxReceiveOperator receiveOp =
+ new SortedMailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
+ RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirections, false, true, inSchema, stageId,
+ DEFAULT_RECEIVER_STAGE_ID, null);
TransferableBlock receivedBlock = receiveOp.nextBlock();
Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have returned an error-block");
}
@@ -856,9 +800,9 @@ public class MailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
- DEFAULT_RECEIVER_STAGE_ID, null);
+ SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
+ ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection,
+ false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID, null);
// Receive a set of no-op blocks and skip over them
TransferableBlock receivedBlock = receiveOp.nextBlock();
@@ -940,9 +884,9 @@ public class MailboxReceiveOperatorTest {
new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE,
Long.MAX_VALUE, new HashMap<>());
- MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2),
- RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection, false, true, inSchema, stageId,
- DEFAULT_RECEIVER_STAGE_ID, null);
+ SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
+ ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, collationKeys, collationDirection,
+ false, true, inSchema, stageId, DEFAULT_RECEIVER_STAGE_ID, null);
// Receive a set of no-op blocks and skip over them
TransferableBlock receivedBlock = receiveOp.nextBlock();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org