You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/21 04:28:11 UTC
[pinot] branch master updated: [multistage][cleanup] clean up PhysicalPlanContext usage (#10950)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 73830ae3ce [multistage][cleanup] clean up PhysicalPlanContext usage (#10950)
73830ae3ce is described below
commit 73830ae3ce94428b7e9991d1bd160bd62a14b7c1
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Jun 20 21:28:05 2023 -0700
[multistage][cleanup] clean up PhysicalPlanContext usage (#10950)
* [clean up] server plan should not extend physical plan, it should encapsulate
* [clean up] reformat opChainExecutionContext, constructor to only take PhysicalPlanContext directly
* [clean up] remove timeoutMs as argument and member var from plan context
---
.../apache/pinot/query/runtime/QueryRunner.java | 31 ++++----
.../runtime/plan/OpChainExecutionContext.java | 12 +--
.../query/runtime/plan/PhysicalPlanContext.java | 13 +---
.../plan/pipeline/PipelineBreakerExecutor.java | 4 +-
.../plan/server/ServerPlanRequestContext.java | 28 +++----
.../plan/server/ServerPlanRequestUtils.java | 21 ++----
.../plan/server/ServerPlanRequestVisitor.java | 6 +-
.../query/service/dispatch/QueryDispatcher.java | 14 ++--
.../executor/OpChainSchedulerServiceTest.java | 2 +-
.../runtime/executor/RoundRobinSchedulerTest.java | 2 +-
.../operator/MailboxReceiveOperatorTest.java | 87 ++++++++--------------
.../runtime/operator/MailboxSendOperatorTest.java | 2 +-
.../pinot/query/runtime/operator/OpChainTest.java | 41 ++++------
.../query/runtime/operator/OperatorTestUtil.java | 10 +--
.../operator/SortedMailboxReceiveOperatorTest.java | 85 +++++++--------------
15 files changed, 134 insertions(+), 224 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 914c6c2ed0..aa16f0b220 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -151,7 +151,7 @@ public class QueryRunner {
PipelineBreakerResult pipelineBreakerResult;
try {
pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService,
- distributedStagePlan, timeoutMs, deadlineMs, requestId, isTraceEnabled);
+ distributedStagePlan, deadlineMs, requestId, isTraceEnabled);
} catch (Exception e) {
LOGGER.error("Error executing pre-stage pipeline breaker for: {}:{}", requestId,
distributedStagePlan.getStageId(), e);
@@ -174,7 +174,7 @@ public class QueryRunner {
try {
PlanNode stageRoot = distributedStagePlan.getStageRoot();
OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
- new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+ new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
_scheduler.register(rootOperator);
} catch (Exception e) {
@@ -203,19 +203,18 @@ public class QueryRunner {
PipelineBreakerResult pipelineBreakerResult, long timeoutMs, long deadlineMs, long requestId) {
boolean isTraceEnabled =
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
+ PhysicalPlanContext planContext = new PhysicalPlanContext(_mailboxService, requestId,
+ distributedStagePlan.getStageId(), deadlineMs, distributedStagePlan.getServer(),
+ distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled);
List<ServerPlanRequestContext> serverPlanRequestContexts =
- constructServerQueryRequests(distributedStagePlan, requestMetadataMap, pipelineBreakerResult,
- _helixPropertyStore, _mailboxService, deadlineMs);
+ constructServerQueryRequests(planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore);
List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()));
}
MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
- OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(),
- distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(),
- isTraceEnabled);
+ OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(planContext);
MultiStageOperator leafStageOperator =
new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest,
serverQueryRequests, sendNode.getDataSchema());
@@ -226,9 +225,9 @@ public class QueryRunner {
return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
}
- private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
- Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult,
- ZkHelixPropertyStore<ZNRecord> helixPropertyStore, MailboxService mailboxService, long deadlineMs) {
+ private static List<ServerPlanRequestContext> constructServerQueryRequests(PhysicalPlanContext planContext,
+ DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
+ ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
String rawTableName = StageMetadata.getTableName(stageMetadata);
@@ -244,17 +243,15 @@ public class QueryRunner {
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap,
- pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata),
- TableType.OFFLINE, tableEntry.getValue(), deadlineMs));
+ requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
+ schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue()));
} else if (TableType.REALTIME.name().equals(tableType)) {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap,
- pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata),
- TableType.REALTIME, tableEntry.getValue(), deadlineMs));
+ requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
+ schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue()));
} else {
throw new IllegalArgumentException("Unsupported table type key: " + tableType);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 5da20c4e70..b23f41400a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.plan;
+import com.google.common.annotations.VisibleForTesting;
import java.util.function.Consumer;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -35,21 +36,20 @@ public class OpChainExecutionContext {
private final long _requestId;
private final int _stageId;
private final VirtualServerAddress _server;
- private final long _timeoutMs;
private final long _deadlineMs;
private final StageMetadata _stageMetadata;
private final OpChainId _id;
private final OpChainStats _stats;
private final boolean _traceEnabled;
+ @VisibleForTesting
public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
- VirtualServerAddress server, long timeoutMs, long deadlineMs, StageMetadata stageMetadata,
+ VirtualServerAddress server, long deadlineMs, StageMetadata stageMetadata,
boolean traceEnabled) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
_server = server;
- _timeoutMs = timeoutMs;
_deadlineMs = deadlineMs;
_stageMetadata = stageMetadata;
_id = new OpChainId(requestId, server.workerId(), stageId);
@@ -59,7 +59,7 @@ public class OpChainExecutionContext {
public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
- physicalPlanContext.getServer(), physicalPlanContext.getTimeoutMs(), physicalPlanContext.getDeadlineMs(),
+ physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(),
physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled());
}
@@ -83,10 +83,6 @@ public class OpChainExecutionContext {
return _server;
}
- public long getTimeoutMs() {
- return _timeoutMs;
- }
-
public long getDeadlineMs() {
return _deadlineMs;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
index 2da5772930..a2fb6f4643 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
@@ -29,8 +29,6 @@ public class PhysicalPlanContext {
protected final MailboxService _mailboxService;
protected final long _requestId;
protected final int _stageId;
- // TODO: Timeout is not needed since deadline is already present.
- private final long _timeoutMs;
private final long _deadlineMs;
protected final VirtualServerAddress _server;
protected final StageMetadata _stageMetadata;
@@ -39,13 +37,12 @@ public class PhysicalPlanContext {
private final OpChainExecutionContext _opChainExecutionContext;
private final boolean _traceEnabled;
- public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
- long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata,
- PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
+ public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long deadlineMs,
+ VirtualServerAddress server, StageMetadata stageMetadata, PipelineBreakerResult pipelineBreakerResult,
+ boolean traceEnabled) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
- _timeoutMs = timeoutMs;
_deadlineMs = deadlineMs;
_server = server;
_stageMetadata = stageMetadata;
@@ -62,10 +59,6 @@ public class PhysicalPlanContext {
return _stageId;
}
- public long getTimeoutMs() {
- return _timeoutMs;
- }
-
public long getDeadlineMs() {
return _deadlineMs;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index a59a310fed..0202127314 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -53,7 +53,7 @@ public class PipelineBreakerExecutor {
* Currently, pipeline breaker executor can only execute mailbox receive pipeline breaker.
*/
public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
- MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs,
+ MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long deadlineMs,
long requestId, boolean isTraceEnabled)
throws Exception {
PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(
@@ -65,7 +65,7 @@ public class PipelineBreakerExecutor {
// receive-mail callbacks.
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
PhysicalPlanContext physicalPlanContext =
- new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+ new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
Map<Integer, List<TransferableBlock>> resultMap =
PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 5c7bc12503..fb1c3af0c1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -20,12 +20,7 @@ package org.apache.pinot.query.runtime.plan.server;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.spi.config.table.TableType;
@@ -33,22 +28,21 @@ import org.apache.pinot.spi.config.table.TableType;
* Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
* {@link PinotQuery} to execute on server.
*/
-public class ServerPlanRequestContext extends PhysicalPlanContext {
- protected TableType _tableType;
- protected TimeBoundaryInfo _timeBoundaryInfo;
+public class ServerPlanRequestContext {
+ private final PhysicalPlanContext _planContext;
+ private final TableType _tableType;
- protected PinotQuery _pinotQuery;
- protected InstanceRequest _instanceRequest;
+ private PinotQuery _pinotQuery;
+ private InstanceRequest _instanceRequest;
- public ServerPlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
- long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata,
- PipelineBreakerResult pipelineBreakerResult, PinotQuery pinotQuery,
- TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) {
- super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, pipelineBreakerResult,
- traceEnabled);
+ public ServerPlanRequestContext(PhysicalPlanContext planContext, PinotQuery pinotQuery, TableType tableType) {
+ _planContext = planContext;
_pinotQuery = pinotQuery;
_tableType = tableType;
- _timeBoundaryInfo = timeBoundaryInfo;
+ }
+
+ public PhysicalPlanContext getPlanContext() {
+ return _planContext;
}
public TableType getTableType() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 7a0454798b..13659ae810 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -36,11 +36,10 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -70,10 +69,9 @@ public class ServerPlanRequestUtils {
// do not instantiate.
}
- public static ServerPlanRequestContext build(MailboxService mailboxService, DistributedStagePlan stagePlan,
- Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult,
- TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
- List<String> segmentList, long deadlineMs) {
+ public static ServerPlanRequestContext build(PhysicalPlanContext planContext, DistributedStagePlan stagePlan,
+ Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo,
+ TableType tableType, List<String> segmentList) {
// Before-visit: construct the ServerPlanRequestContext baseline
// Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing.
long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16) + (
@@ -89,13 +87,10 @@ public class ServerPlanRequestUtils {
}
LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
pinotQuery.setExplain(false);
- ServerPlanRequestContext context =
- new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs,
- stagePlan.getServer(), stagePlan.getStageMetadata(), pipelineBreakerResult, pinotQuery, tableType,
- timeBoundaryInfo, traceEnabled);
+ ServerPlanRequestContext serverContext = new ServerPlanRequestContext(planContext, pinotQuery, tableType);
// visit the plan and create query physical plan.
- ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), context);
+ ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext);
// Post-visit: finalize context.
// 1. global rewrite/optimize
@@ -128,8 +123,8 @@ public class ServerPlanRequestUtils {
instanceRequest.setSearchSegments(segmentList);
instanceRequest.setQuery(brokerRequest);
- context.setInstanceRequest(instanceRequest);
- return context;
+ serverContext.setInstanceRequest(instanceRequest);
+ return serverContext;
}
/**
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index 7cda76bfa2..8fa6df2756 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -43,6 +43,7 @@ import org.apache.pinot.query.planner.plannode.TableScanNode;
import org.apache.pinot.query.planner.plannode.ValueNode;
import org.apache.pinot.query.planner.plannode.WindowNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -109,8 +110,9 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla
staticSide = node.getInputs().get(1);
}
staticSide.visit(this, context);
- int resultMapId = context.getPipelineBreakerResult().getNodeIdMap().get(dynamicSide);
- List<TransferableBlock> transferableBlocks = context.getPipelineBreakerResult().getResultMap().getOrDefault(
+ PipelineBreakerResult pipelineBreakerResult = context.getPlanContext().getPipelineBreakerResult();
+ int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide);
+ List<TransferableBlock> transferableBlocks = pipelineBreakerResult.getResultMap().getOrDefault(
resultMapId, Collections.emptyList());
List<Object[]> resultDataContainer = new ArrayList<>();
DataSchema dataSchema = dynamicSide.getDataSchema();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index d7eb25a944..fb1dc05619 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -58,6 +58,7 @@ import org.apache.pinot.query.runtime.operator.OpChainStats;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.service.QueryConfig;
@@ -193,12 +194,13 @@ public class QueryDispatcher {
DispatchablePlanFragment reduceStagePlanFragment = dispatchableSubPlan.getQueryStageList().get(reduceStageId);
MailboxReceiveNode reduceNode = (MailboxReceiveNode) reduceStagePlanFragment.getPlanFragment().getFragmentRoot();
VirtualServerAddress server = new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), 0);
- OpChainExecutionContext context =
- new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs,
- System.currentTimeMillis() + timeoutMs,
- new StageMetadata.Builder().setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList())
- .addCustomProperties(reduceStagePlanFragment.getCustomProperties()).build(),
- traceEnabled);
+ StageMetadata brokerStageMetadata = new StageMetadata.Builder()
+ .setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList())
+ .addCustomProperties(reduceStagePlanFragment.getCustomProperties())
+ .build();
+ PhysicalPlanContext planContext = new PhysicalPlanContext(mailboxService, requestId, reduceStageId,
+ System.currentTimeMillis() + timeoutMs, server, brokerStageMetadata, null, traceEnabled);
+ OpChainExecutionContext context = new OpChainExecutionContext(planContext);
MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(context, reduceNode.getSenderStageId());
List<DataBlock> resultDataBlocks =
reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, dispatchableSubPlan,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index fbbab9e4d9..332ceaaaab 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -74,7 +74,7 @@ public class OpChainSchedulerServiceTest {
private OpChain getChain(MultiStageOperator operator) {
VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
- OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null, true);
+ OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, true);
return new OpChain(context, operator, ImmutableList.of());
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 6589fe079f..798eb534ba 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -180,6 +180,6 @@ public class RoundRobinSchedulerTest {
private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) {
return new OpChainExecutionContext(null, requestId, stageId,
- new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null, true);
+ new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, true);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index bcec439843..fed5ce422e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -74,40 +74,21 @@ public class MailboxReceiveOperatorTest {
when(_mailboxService.getPort()).thenReturn(123);
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
- _stageMetadataBoth = new StageMetadata.Builder()
- .setWorkerMetadataList(Stream.of(server1, server2).map(
- s -> new WorkerMetadata.Builder()
- .setVirtualServerAddress(s)
- .addMailBoxInfoMap(0,
- new MailboxMetadata(
- ImmutableList.of(
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of()))
- .addMailBoxInfoMap(1,
- new MailboxMetadata(
- ImmutableList.of(
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of()))
- .build())
- .collect(Collectors.toList()))
- .build();
+ _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
// sending stage is 0, receiving stage is 1
- _stageMetadata1 = new StageMetadata.Builder()
- .setWorkerMetadataList(Stream.of(server1).map(
- s -> new WorkerMetadata.Builder()
- .setVirtualServerAddress(s)
- .addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of()))
- .addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of()))
- .build()).collect(Collectors.toList()))
- .build();
+ _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
}
@AfterMethod
@@ -120,13 +101,11 @@ public class MailboxReceiveOperatorTest {
public void shouldThrowSingletonNoMatchMailboxServer() {
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
- StageMetadata stageMetadata = new StageMetadata.Builder()
- .setWorkerMetadataList(Stream.of(server1, server2).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList()))
- .build();
+ StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
+ Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
+ .collect(Collectors.toList())).build();
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- stageMetadata, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false);
//noinspection resource
new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
}
@@ -134,8 +113,7 @@ public class MailboxReceiveOperatorTest {
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- null, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false);
//noinspection resource
new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1);
}
@@ -147,7 +125,7 @@ public class MailboxReceiveOperatorTest {
// Short timeoutMs should result in timeout
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
_stageMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
Thread.sleep(100L);
@@ -158,8 +136,8 @@ public class MailboxReceiveOperatorTest {
}
// Longer timeout or default timeout (10s) doesn't result in timeout
- context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
- System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
+ context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
+ _stageMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
Thread.sleep(100L);
TransferableBlock mailbox = receiveOp.nextBlock();
@@ -172,8 +150,7 @@ public class MailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
assertTrue(receiveOp.nextBlock().isNoOpBlock());
}
@@ -185,8 +162,7 @@ public class MailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -200,8 +176,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
assertEquals(actualRows.size(), 1);
@@ -218,8 +193,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
TransferableBlock block = receiveOp.nextBlock();
assertTrue(block.isErrorBlock());
@@ -237,8 +211,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -261,8 +234,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
// Receive first block from server1
@@ -287,8 +259,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
TransferableBlock block = receiveOp.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index ade517f1a1..2b74bb727a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -183,7 +183,7 @@ public class MailboxSendOperatorTest {
.setWorkerMetadataList(Collections.singletonList(
new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, Long.MAX_VALUE,
+ new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE,
stageMetadata, false);
return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 4af33067fa..910cc38523 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -91,21 +91,14 @@ public class OpChainTest {
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
_serverAddress = new VirtualServerAddress("localhost", 123, 0);
- _receivingStageMetadata = new StageMetadata.Builder()
- .setWorkerMetadataList(Stream.of(_serverAddress).map(
- s -> new WorkerMetadata.Builder()
- .setVirtualServerAddress(s)
- .addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
- ImmutableList.of(s), ImmutableMap.of()))
- .addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
- ImmutableList.of(s), ImmutableMap.of()))
- .addMailBoxInfoMap(2, new MailboxMetadata(
- ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
- ImmutableList.of(s), ImmutableMap.of()))
- .build()).collect(Collectors.toList()))
- .build();
+ _receivingStageMetadata = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_serverAddress).map(
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
+ ImmutableMap.of())).addMailBoxInfoMap(1,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
+ ImmutableMap.of())).addMailBoxInfoMap(2,
+ new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
+ ImmutableMap.of())).build()).collect(Collectors.toList())).build();
when(_mailboxService1.getReceivingMailbox(any())).thenReturn(_mailbox1);
when(_mailboxService2.getReceivingMailbox(any())).thenReturn(_mailbox2);
@@ -198,9 +191,8 @@ public class OpChainTest {
int receivedStageId = 2;
int senderStageId = 1;
- OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+ OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
+ System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -214,7 +206,7 @@ public class OpChainTest {
OpChainExecutionContext secondStageContext =
new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress,
- 1000, System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+ System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -238,9 +230,8 @@ public class OpChainTest {
int receivedStageId = 2;
int senderStageId = 1;
- OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+ OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
+ System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -251,7 +242,7 @@ public class OpChainTest {
opChain.getStats().queued();
OpChainExecutionContext secondStageContext =
- new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, 1000,
+ new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress,
System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
@@ -290,8 +281,8 @@ public class OpChainTest {
List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2})), queryContext));
LeafStageTransferableBlockOperator leafOp = new LeafStageTransferableBlockOperator(context,
- LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList),
- Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema);
+ LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList),
+ Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema);
//Transform operator
RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index b5196e812f..7ecd344ab3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -27,6 +27,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
+
public class OperatorTestUtil {
// simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java.
private static final List<List<Object[]>> SIMPLE_KV_DATA_ROWS =
@@ -63,19 +64,16 @@ public class OperatorTestUtil {
public static OpChainExecutionContext getDefaultContext() {
VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
- return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
- null, true);
+ return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, true);
}
public static OpChainExecutionContext getDefaultContextWithTracingDisabled() {
VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
- return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
- null, false);
+ return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, false);
}
public static OpChainExecutionContext getContext(long requestId, int stageId,
VirtualServerAddress virtualServerAddress) {
- return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
- null, true);
+ return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, true);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 5bd973f4c2..25ef09f279 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -84,40 +84,21 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getPort()).thenReturn(123);
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
- _stageMetadataBoth = new StageMetadata.Builder()
- .setWorkerMetadataList(Stream.of(server1, server2).map(
- s -> new WorkerMetadata.Builder()
- .setVirtualServerAddress(s)
- .addMailBoxInfoMap(0,
- new MailboxMetadata(
- ImmutableList.of(
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of()))
- .addMailBoxInfoMap(1,
- new MailboxMetadata(
- ImmutableList.of(
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of()))
- .build())
- .collect(Collectors.toList()))
- .build();
+ _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
// sending stage is 0, receiving stage is 1
- _stageMetadata1 = new StageMetadata.Builder()
- .setWorkerMetadataList(Stream.of(server1).map(
- s -> new WorkerMetadata.Builder()
- .setVirtualServerAddress(s)
- .addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of()))
- .addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of()))
- .build()).collect(Collectors.toList()))
- .build();
+ _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
}
@AfterMethod
@@ -134,8 +115,7 @@ public class SortedMailboxReceiveOperatorTest {
Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
.collect(Collectors.toList())).build();
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- stageMetadata, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -144,8 +124,7 @@ public class SortedMailboxReceiveOperatorTest {
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, null,
- false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -155,7 +134,7 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldThrowOnEmptyCollationKey() {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
_stageMetadata1, false);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
@@ -168,7 +147,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
// Short timeoutMs should result in timeout
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
_stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
@@ -181,8 +160,8 @@ public class SortedMailboxReceiveOperatorTest {
}
// Longer timeout or default timeout (10s) doesn't result in timeout
- context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
- System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
+ context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
+ _stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -196,8 +175,7 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldReceiveSingletonNullMailbox() {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -210,8 +188,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -226,8 +203,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -245,8 +221,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(
TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadata1, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -265,8 +240,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -289,8 +263,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -315,8 +288,7 @@ public class SortedMailboxReceiveOperatorTest {
OperatorTestUtil.block(DATA_SCHEMA, row4), OperatorTestUtil.block(DATA_SCHEMA, row5),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -347,8 +319,7 @@ public class SortedMailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
- _stageMetadataBoth, false);
+ new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections,
false, 1)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org