You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/24 02:57:22 UTC
[pinot] branch master updated: [multistage][feature] add pipeline breaker stats collector (#10958)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6a47311c03 [multistage][feature] add pipeline breaker stats collector (#10958)
6a47311c03 is described below
commit 6a47311c03fdc020a4e05a3acf5cacb7843e3307
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Jun 23 19:57:16 2023 -0700
[multistage][feature] add pipeline breaker stats collector (#10958)
- [multistage][pb][stats] add stats support for pipeline breaker results
- add stats test collector
- refactor the test setup format to be easier to understand
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../runtime/plan/OpChainExecutionContext.java | 11 +++-
.../plan/pipeline/PipelineBreakerExecutor.java | 17 +++---
.../plan/pipeline/PipelineBreakerOperator.java | 1 -
.../plan/pipeline/PipelineBreakerResult.java | 12 +++-
.../executor/OpChainSchedulerServiceTest.java | 2 +-
.../runtime/executor/RoundRobinSchedulerTest.java | 2 +-
.../operator/MailboxReceiveOperatorTest.java | 58 +++++++++++---------
.../runtime/operator/MailboxSendOperatorTest.java | 2 +-
.../pinot/query/runtime/operator/OpChainTest.java | 8 +--
.../query/runtime/operator/OperatorTestUtil.java | 23 +++++++-
.../operator/SortedMailboxReceiveOperatorTest.java | 64 ++++++++++++----------
.../plan/pipeline/PipelineBreakerExecutorTest.java | 23 ++++++--
12 files changed, 139 insertions(+), 84 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index b23f41400a..b179900e0c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -24,6 +24,7 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.operator.OpChainStats;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
/**
@@ -45,7 +46,7 @@ public class OpChainExecutionContext {
@VisibleForTesting
public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
VirtualServerAddress server, long deadlineMs, StageMetadata stageMetadata,
- boolean traceEnabled) {
+ PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
@@ -54,13 +55,17 @@ public class OpChainExecutionContext {
_stageMetadata = stageMetadata;
_id = new OpChainId(requestId, server.workerId(), stageId);
_stats = new OpChainStats(_id.toString());
+ if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) {
+ _stats.getOperatorStatsMap().putAll(
+ pipelineBreakerResult.getOpChainStats().getOperatorStatsMap());
+ }
_traceEnabled = traceEnabled;
}
public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
- physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(),
- physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled());
+ physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), physicalPlanContext.getStageMetadata(),
+ physicalPlanContext.getPipelineBreakerResult(), physicalPlanContext.isTraceEnabled());
}
public MailboxService getMailboxService() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index fa33192a64..c148b37b0c 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -77,9 +77,7 @@ public class PipelineBreakerExecutor {
PhysicalPlanContext physicalPlanContext =
new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
- Map<Integer, List<TransferableBlock>> resultMap =
- PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
- return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+ return PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
} catch (Exception e) {
LOGGER.error("Unable to create pipeline breaker results for Req: " + requestId + ", Stage: "
+ distributedStagePlan.getStageId(), e);
@@ -91,14 +89,14 @@ public class PipelineBreakerExecutor {
resultMap.put(key, Collections.singletonList(errorBlock));
}
}
- return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+ return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap, null);
}
} else {
return null;
}
}
- private static Map<Integer, List<TransferableBlock>> execute(OpChainSchedulerService scheduler,
+ private static PipelineBreakerResult execute(OpChainSchedulerService scheduler,
PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext)
throws Exception {
Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>();
@@ -111,11 +109,11 @@ public class PipelineBreakerExecutor {
OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, physicalPlanContext);
pipelineWorkerMap.put(key, tempOpChain.getRoot());
}
- return runMailboxReceivePipelineBreaker(scheduler, pipelineWorkerMap, physicalPlanContext);
+ return runMailboxReceivePipelineBreaker(scheduler, context, pipelineWorkerMap, physicalPlanContext);
}
- private static Map<Integer, List<TransferableBlock>> runMailboxReceivePipelineBreaker(
- OpChainSchedulerService scheduler, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap,
+ private static PipelineBreakerResult runMailboxReceivePipelineBreaker(OpChainSchedulerService scheduler,
+ PipelineBreakerContext context, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap,
PhysicalPlanContext physicalPlanContext)
throws Exception {
PipelineBreakerOperator pipelineBreakerOperator = new PipelineBreakerOperator(
@@ -126,7 +124,8 @@ public class PipelineBreakerExecutor {
scheduler.register(pipelineBreakerOpChain);
long timeoutMs = physicalPlanContext.getDeadlineMs() - System.currentTimeMillis();
if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
- return pipelineBreakerOperator.getResultMap();
+ return new PipelineBreakerResult(context.getNodeIdMap(), pipelineBreakerOperator.getResultMap(),
+ pipelineBreakerOpChain.getStats());
} else {
throw new IOException("Exception occur when awaiting breaker results!");
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 438ce15a37..c9d4d58682 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -93,7 +93,6 @@ class PipelineBreakerOperator extends MultiStageOperator {
return _errorBlock;
}
List<TransferableBlock> blockList = _resultMap.computeIfAbsent(worker.getKey(), (k) -> new ArrayList<>());
- // TODO: only data block is handled, we also need to handle metadata block from upstream in the future.
if (!block.isEndOfStreamBlock()) {
blockList.add(block);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
index 8388337ff1..fe0f2cba61 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
@@ -20,8 +20,10 @@ package org.apache.pinot.query.runtime.plan.pipeline;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChainStats;
/**
@@ -30,10 +32,13 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
public class PipelineBreakerResult {
private final Map<PlanNode, Integer> _nodeIdMap;
private final Map<Integer, List<TransferableBlock>> _resultMap;
+ private final OpChainStats _opChainStats;
- public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap) {
+ public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap,
+ OpChainStats opChainStats) {
_nodeIdMap = nodeIdMap;
_resultMap = resultMap;
+ _opChainStats = opChainStats;
}
public Map<PlanNode, Integer> getNodeIdMap() {
@@ -43,4 +48,9 @@ public class PipelineBreakerResult {
public Map<Integer, List<TransferableBlock>> getResultMap() {
return _resultMap;
}
+
+ @Nullable
+ public OpChainStats getOpChainStats() {
+ return _opChainStats;
+ }
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 332ceaaaab..5088decbbe 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -74,7 +74,7 @@ public class OpChainSchedulerServiceTest {
private OpChain getChain(MultiStageOperator operator) {
VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
- OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, true);
+ OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, null, true);
return new OpChain(context, operator, ImmutableList.of());
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 798eb534ba..1c400c6e0f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -180,6 +180,6 @@ public class RoundRobinSchedulerTest {
private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) {
return new OpChainExecutionContext(null, requestId, stageId,
- new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, true);
+ new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, null, true);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index fed5ce422e..48ace11a3f 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -75,20 +75,25 @@ public class MailboxReceiveOperatorTest {
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
_stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
- // sending stage is 0, receiving stage is 1
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+ .addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2), ImmutableMap.of()))
+ .addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2), ImmutableMap.of()))
+ .build()).collect(Collectors.toList())).build();
_stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+ .addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(server1), ImmutableMap.of()))
+ .addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(server1), ImmutableMap.of()))
+ .build()).collect(Collectors.toList())).build();
}
@AfterMethod
@@ -105,7 +110,7 @@ public class MailboxReceiveOperatorTest {
Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
.collect(Collectors.toList())).build();
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata);
//noinspection resource
new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
}
@@ -113,7 +118,7 @@ public class MailboxReceiveOperatorTest {
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, null);
//noinspection resource
new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1);
}
@@ -125,8 +130,8 @@ public class MailboxReceiveOperatorTest {
// Short timeoutMs should result in timeout
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
- _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
+ _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
Thread.sleep(100L);
TransferableBlock mailbox = receiveOp.nextBlock();
@@ -136,8 +141,9 @@ public class MailboxReceiveOperatorTest {
}
// Longer timeout or default timeout (10s) doesn't result in timeout
- context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
- _stageMetadata1, false);
+ context =
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
+ _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
Thread.sleep(100L);
TransferableBlock mailbox = receiveOp.nextBlock();
@@ -150,7 +156,7 @@ public class MailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
assertTrue(receiveOp.nextBlock().isNoOpBlock());
}
@@ -162,7 +168,7 @@ public class MailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
}
@@ -176,7 +182,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
assertEquals(actualRows.size(), 1);
@@ -193,7 +199,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
TransferableBlock block = receiveOp.nextBlock();
assertTrue(block.isErrorBlock());
@@ -211,7 +217,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -234,7 +240,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
// Receive first block from server1
@@ -259,7 +265,7 @@ public class MailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
1)) {
TransferableBlock block = receiveOp.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 2b74bb727a..c35714aa16 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -184,7 +184,7 @@ public class MailboxSendOperatorTest {
new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
OpChainExecutionContext context =
new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE,
- stageMetadata, false);
+ stageMetadata, null, false);
return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 910cc38523..e7fdb70f3b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -192,7 +192,7 @@ public class OpChainTest {
int receivedStageId = 2;
int senderStageId = 1;
OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+ System.currentTimeMillis() + 1000, _receivingStageMetadata, null, true);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -206,7 +206,7 @@ public class OpChainTest {
OpChainExecutionContext secondStageContext =
new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+ System.currentTimeMillis() + 1000, _receivingStageMetadata, null, true);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -231,7 +231,7 @@ public class OpChainTest {
int receivedStageId = 2;
int senderStageId = 1;
OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+ System.currentTimeMillis() + 1000, _receivingStageMetadata, null, false);
Stack<MultiStageOperator> operators =
getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -243,7 +243,7 @@ public class OpChainTest {
OpChainExecutionContext secondStageContext =
new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress,
- System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+ System.currentTimeMillis() + 1000, _receivingStageMetadata, null, false);
MailboxReceiveOperator secondStageReceiveOp =
new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index 7ecd344ab3..55878658f7 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -18,13 +18,18 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
@@ -41,6 +46,12 @@ public class OperatorTestUtil {
public static final String OP_1 = "op1";
public static final String OP_2 = "op2";
+ public static Map<String, String> getDummyStats(long requestId, int stageId, VirtualServerAddress serverAddress) {
+ OperatorStats operatorStats = new OperatorStats(requestId, stageId, serverAddress);
+ String statsId = new OpChainId(requestId, serverAddress.workerId(), stageId).toString();
+ return OperatorUtils.getMetadataFromOperatorStats(ImmutableMap.of(statsId, operatorStats));
+ }
+
static {
MOCK_OPERATOR_FACTORY = new MockDataBlockOperatorFactory().registerOperator(OP_1, SIMPLE_KV_DATA_SCHEMA)
.registerOperator(OP_2, SIMPLE_KV_DATA_SCHEMA).addRows(OP_1, SIMPLE_KV_DATA_ROWS.get(0))
@@ -62,18 +73,24 @@ public class OperatorTestUtil {
return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW);
}
+ public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService,
+ VirtualServerAddress receiverAddress, long deadlineMs, StageMetadata stageMetadata) {
+ return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, deadlineMs, stageMetadata, null, false);
+ }
+
public static OpChainExecutionContext getDefaultContext() {
VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
- return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, true);
+ return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, null, true);
}
public static OpChainExecutionContext getDefaultContextWithTracingDisabled() {
VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
- return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, false);
+ return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, null, false);
}
public static OpChainExecutionContext getContext(long requestId, int stageId,
VirtualServerAddress virtualServerAddress) {
- return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, true);
+ return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, null,
+ true);
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 25ef09f279..80cf7ed1c4 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -85,20 +85,25 @@ public class SortedMailboxReceiveOperatorTest {
VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
_stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
- org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
- ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
- // sending stage is 0, receiving stage is 1
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+ .addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2), ImmutableMap.of()))
+ .addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+ ImmutableList.of(server1, server2), ImmutableMap.of()))
+ .build()).collect(Collectors.toList())).build();
_stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
- s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
- ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
- ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+ .addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(server1), ImmutableMap.of()))
+ .addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(server1), ImmutableMap.of()))
+ .build()).collect(Collectors.toList())).build();
}
@AfterMethod
@@ -115,7 +120,7 @@ public class SortedMailboxReceiveOperatorTest {
Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
.collect(Collectors.toList())).build();
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -124,7 +129,7 @@ public class SortedMailboxReceiveOperatorTest {
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, null);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -134,8 +139,8 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldThrowOnEmptyCollationKey() {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
- _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
+ _stageMetadata1);
//noinspection resource
new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), false, 1);
@@ -147,8 +152,8 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
// Short timeoutMs should result in timeout
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
- _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
+ _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -160,8 +165,9 @@ public class SortedMailboxReceiveOperatorTest {
}
// Longer timeout or default timeout (10s) doesn't result in timeout
- context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
- _stageMetadata1, false);
+ context =
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
+ _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -175,7 +181,7 @@ public class SortedMailboxReceiveOperatorTest {
public void shouldReceiveSingletonNullMailbox() {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -188,7 +194,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -203,7 +209,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -221,7 +227,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox1.poll()).thenReturn(
TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
false, 1)) {
@@ -240,7 +246,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -263,7 +269,7 @@ public class SortedMailboxReceiveOperatorTest {
when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -288,7 +294,7 @@ public class SortedMailboxReceiveOperatorTest {
OperatorTestUtil.block(DATA_SCHEMA, row4), OperatorTestUtil.block(DATA_SCHEMA, row5),
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -319,7 +325,7 @@ public class SortedMailboxReceiveOperatorTest {
TransferableBlockUtils.getEndOfStreamTransferableBlock());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
+ OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth);
try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections,
false, 1)) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 56d78effe4..6680b56942 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -87,8 +87,7 @@ public class PipelineBreakerExecutorTest {
.addMailBoxInfoMap(2, new MailboxMetadata(
ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
ImmutableList.of(_server), ImmutableMap.of()))
- .build())
- .collect(Collectors.toList())).build();
+ .build()).collect(Collectors.toList())).build();
@BeforeClass
public void setUpClass() {
@@ -127,7 +126,7 @@ public class PipelineBreakerExecutorTest {
Object[] row2 = new Object[]{2, 3};
when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
OperatorTestUtil.block(DATA_SCHEMA, row2),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 1, _server)));
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
@@ -138,6 +137,10 @@ public class PipelineBreakerExecutorTest {
Assert.assertNotNull(pipelineBreakerResult);
Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
Assert.assertEquals(pipelineBreakerResult.getResultMap().values().iterator().next().size(), 2);
+
+ // should collect stats from previous stage here
+ Assert.assertNotNull(pipelineBreakerResult.getOpChainStats());
+ Assert.assertEquals(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap().size(), 1);
}
@Test
@@ -160,9 +163,9 @@ public class PipelineBreakerExecutorTest {
Object[] row1 = new Object[]{1, 1};
Object[] row2 = new Object[]{2, 3};
when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 1, _server)));
when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2),
- TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ TransferableBlockUtils.getEndOfStreamTransferableBlock(OperatorTestUtil.getDummyStats(0, 2, _server)));
PipelineBreakerResult pipelineBreakerResult =
PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
@@ -176,6 +179,10 @@ public class PipelineBreakerExecutorTest {
Assert.assertEquals(it.next().size(), 1);
Assert.assertEquals(it.next().size(), 1);
Assert.assertFalse(it.hasNext());
+
+ // should collect stats from previous stage here
+ Assert.assertNotNull(pipelineBreakerResult.getOpChainStats());
+ Assert.assertEquals(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap().size(), 2);
}
@Test
@@ -199,6 +206,9 @@ public class PipelineBreakerExecutorTest {
Assert.assertEquals(resultBlocks.size(), 1);
Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+
+ // should have null stats from previous stage here
+ Assert.assertNull(pipelineBreakerResult.getOpChainStats());
}
@Test
@@ -268,6 +278,9 @@ public class PipelineBreakerExecutorTest {
Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
}
+
+ // should have null stats from previous stage here
+ Assert.assertNull(pipelineBreakerResult.getOpChainStats());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org