You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2023/04/01 14:11:08 UTC
[pinot] branch master updated: Do not serialize metrics in each Operator (#10473)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 941cbf84f9 Do not serialize metrics in each Operator (#10473)
941cbf84f9 is described below
commit 941cbf84f912d7cd47f86299c8f4652cd046e097
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Sat Apr 1 19:41:00 2023 +0530
Do not serialize metrics in each Operator (#10473)
* WIP: Do not serialize metrics
* No need to pass stats between operator. Only collected in the end at the send operator
* Use opchain stats to record operatorStats
* No need to serialie metrics in receive operator
* Remove attachStats method and create stats object inside context itself
* Make stats thread safe
* Add test for opchain stats
* Ensure SendOperator stats are populated before serializing stats
* Fix variable scope
* Use operator stats map directly from opchain stats
* unify return statements outside inner for loop in MailboxSendOperator
---------
Co-authored-by: Kartik Khare <kh...@Kartiks-MacBook-Pro.local>
---
.../apache/pinot/query/runtime/QueryRunner.java | 6 +-
.../runtime/executor/OpChainSchedulerService.java | 1 -
.../LeafStageTransferableBlockOperator.java | 3 +-
.../runtime/operator/MailboxReceiveOperator.java | 7 +-
.../runtime/operator/MailboxSendOperator.java | 12 +-
.../query/runtime/operator/MultiStageOperator.java | 42 ++-----
.../pinot/query/runtime/operator/OpChain.java | 11 +-
.../pinot/query/runtime/operator/OpChainStats.java | 21 +++-
.../query/runtime/operator/OperatorStats.java | 14 +--
.../runtime/operator/utils/OperatorUtils.java | 4 +-
.../runtime/plan/OpChainExecutionContext.java | 14 +++
.../query/service/dispatch/QueryDispatcher.java | 25 +++--
.../executor/OpChainSchedulerServiceTest.java | 6 +-
.../runtime/executor/RoundRobinSchedulerTest.java | 34 ++++--
.../runtime/operator/MailboxSendOperatorTest.java | 2 +-
.../pinot/query/runtime/operator/OpChainTest.java | 124 +++++++++++++++++++++
16 files changed, 237 insertions(+), 89 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index b0d0433f42..097b05b455 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -52,6 +52,7 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
@@ -213,8 +214,9 @@ public class QueryRunner {
OpChainExecutionContext opChainExecutionContext =
new OpChainExecutionContext(_mailboxService, requestId, sendNode.getStageId(), _rootServer, deadlineMs,
deadlineMs, distributedStagePlan.getMetadataMap());
- mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext,
- new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema()),
+ MultiStageOperator leafStageOperator =
+ new LeafStageTransferableBlockOperator(opChainExecutionContext, serverQueryResults, sendNode.getDataSchema());
+ mailboxSendOperator = new MailboxSendOperator(opChainExecutionContext, leafStageOperator,
sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(),
sendNode.getCollationDirections(), sendNode.isSortOnSender(), sendNode.getStageId(),
sendNode.getReceiverStageId());
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 3706627349..1f29584dcc 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -111,7 +111,6 @@ public class OpChainSchedulerService extends AbstractExecutionThreadService {
LOGGER.error("({}): Completed erroneously {} {}", operatorChain, operatorChain.getStats(),
result.getDataBlock().getExceptions());
} else {
- operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
LOGGER.debug("({}): Completed {}", operatorChain, operatorChain.getStats());
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 9af6e8819d..a6ed81a3d8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -75,7 +75,8 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator {
_errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null);
_currentIndex = 0;
for (InstanceResponseBlock instanceResponseBlock : baseResultBlock) {
- _operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata());
+ OperatorStats operatorStats = _opChainStats.getOperatorStats(context, getOperatorId());
+ operatorStats.recordExecutionStats(instanceResponseBlock.getResponseMetadata());
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index fe9782c4c6..f65f9b8d15 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import javax.annotation.Nullable;
@@ -215,8 +216,10 @@ public class MailboxReceiveOperator extends MultiStageOperator {
return block;
}
} else {
- if (!block.getResultMetadata().isEmpty()) {
- _operatorStatsMap.putAll(block.getResultMetadata());
+ if (_opChainStats != null && !block.getResultMetadata().isEmpty()) {
+ for (Map.Entry<String, OperatorStats> entry : block.getResultMetadata().entrySet()) {
+ _opChainStats.getOperatorStatsMap().compute(entry.getKey(), (_key, _value) -> entry.getValue());
+ }
}
eosMailboxCount++;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 3be593a70a..ad0c24abb7 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -40,6 +40,7 @@ import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,10 +152,19 @@ public class MailboxSendOperator extends MultiStageOperator {
try {
transferableBlock = _dataTableBlockBaseOperator.nextBlock();
while (!transferableBlock.isNoOpBlock()) {
- _exchange.send(transferableBlock);
if (transferableBlock.isEndOfStreamBlock()) {
+ if (transferableBlock.isSuccessfulEndOfStreamBlock()) {
+ //Stats need to be populated here because the block is being sent to the mailbox
+ // and the receiving opChain will not be able to access the stats from the previous opChain
+ TransferableBlock eosBlockWithStats = TransferableBlockUtils.getEndOfStreamTransferableBlock(
+ OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
+ _exchange.send(eosBlockWithStats);
+ } else {
+ _exchange.send(transferableBlock);
+ }
return transferableBlock;
}
+ _exchange.send(transferableBlock);
transferableBlock = _dataTableBlockBaseOperator.nextBlock();
}
} catch (final Exception e) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 160b3d0f58..881c499c0f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -19,14 +19,9 @@
package org.apache.pinot.query.runtime.operator;
import com.google.common.base.Joiner;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.InvocationScope;
@@ -37,23 +32,15 @@ import org.slf4j.LoggerFactory;
public abstract class MultiStageOperator implements Operator<TransferableBlock>, AutoCloseable {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class);
- // TODO: Move to OperatorContext class.
- protected final OperatorStats _operatorStats;
- protected final Map<String, OperatorStats> _operatorStatsMap;
private final String _operatorId;
private final OpChainExecutionContext _context;
+ protected final OpChainStats _opChainStats;
public MultiStageOperator(OpChainExecutionContext context) {
_context = context;
- _operatorStats =
- new OperatorStats(_context, toExplainString());
- _operatorStatsMap = new HashMap<>();
_operatorId =
Joiner.on("_").join(toExplainString(), _context.getRequestId(), _context.getStageId(), _context.getServer());
- }
-
- public Map<String, OperatorStats> getOperatorStatsMap() {
- return _operatorStatsMap;
+ _opChainStats = _context.getStats();
}
@Override
@@ -62,28 +49,19 @@ public abstract class MultiStageOperator implements Operator<TransferableBlock>,
throw new EarlyTerminationException("Interrupted while processing next block");
}
try (InvocationScope ignored = Tracing.getTracer().createScope(getClass())) {
- _operatorStats.startTimer();
+ OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, _operatorId);
+ operatorStats.startTimer();
TransferableBlock nextBlock = getNextBlock();
- _operatorStats.endTimer(nextBlock);
-
- _operatorStats.recordRow(1, nextBlock.getNumRows());
- if (nextBlock.isEndOfStreamBlock()) {
- if (nextBlock.isSuccessfulEndOfStreamBlock()) {
- for (MultiStageOperator op : getChildOperators()) {
- _operatorStatsMap.putAll(op.getOperatorStatsMap());
- }
- if (!_operatorStats.getExecutionStats().isEmpty()) {
- _operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), _operatorId);
- _operatorStatsMap.put(_operatorId, _operatorStats);
- }
- return TransferableBlockUtils.getEndOfStreamTransferableBlock(
- OperatorUtils.getMetadataFromOperatorStats(_operatorStatsMap));
- }
- }
+ operatorStats.recordRow(1, nextBlock.getNumRows());
+ operatorStats.endTimer(nextBlock);
return nextBlock;
}
}
+ public String getOperatorId() {
+ return _operatorId;
+ }
+
// Make it protected because we should always call nextBlock()
protected abstract TransferableBlock getNextBlock();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
index 84fac943c2..53176787ce 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java
@@ -37,16 +37,11 @@ public class OpChain implements AutoCloseable {
private final OpChainStats _stats;
private final OpChainId _id;
- public OpChain(MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes, int virtualServerId,
- long requestId, int stageId) {
+ public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes) {
_root = root;
_receivingMailbox = new HashSet<>(receivingMailboxes);
- _id = new OpChainId(requestId, virtualServerId, stageId);
- _stats = new OpChainStats(_id.toString());
- }
-
- public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<MailboxIdentifier> receivingMailboxes) {
- this(root, receivingMailboxes, context.getServer().virtualId(), context.getRequestId(), context.getStageId());
+ _id = context.getId();
+ _stats = context.getStats();
}
public Operator<TransferableBlock> getRoot() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
index 60dba3a0de..5b2cc2a065 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainStats.java
@@ -21,12 +21,13 @@ package org.apache.pinot.query.runtime.operator;
import com.google.common.base.Stopwatch;
import com.google.common.base.Suppliers;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
@@ -50,7 +51,7 @@ public class OpChainStats {
private final AtomicLong _queuedCount = new AtomicLong();
private final String _id;
- private Map<String, OperatorStats> _operatorStatsMap = new HashMap<>();
+ private final ConcurrentHashMap<String, OperatorStats> _operatorStatsMap = new ConcurrentHashMap<>();
public OpChainStats(String id) {
_id = id;
@@ -73,12 +74,16 @@ public class OpChainStats {
}
}
- public Map<String, OperatorStats> getOperatorStatsMap() {
+ public ConcurrentHashMap<String, OperatorStats> getOperatorStatsMap() {
return _operatorStatsMap;
}
- public void setOperatorStatsMap(Map<String, OperatorStats> operatorStatsMap) {
- _operatorStatsMap = operatorStatsMap;
+ public OperatorStats getOperatorStats(OpChainExecutionContext context, String operatorId) {
+ return _operatorStatsMap.computeIfAbsent(operatorId, (id) -> {
+ OperatorStats operatorStats = new OperatorStats(context);
+ operatorStats.recordSingleStat(DataTable.MetadataKey.OPERATOR_ID.getName(), operatorId);
+ return operatorStats;
+ });
}
private void startExecutionTimer() {
@@ -89,6 +94,10 @@ public class OpChainStats {
}
}
+ public long getExecutionTime() {
+ return _executeStopwatch.elapsed(TimeUnit.MILLISECONDS);
+ }
+
@Override
public String toString() {
return String.format("(%s) Queued Count: %s, Executing Time: %sms, Queued Time: %sms", _id, _queuedCount.get(),
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
index 52ca89a3ee..2655a5c286 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java
@@ -35,9 +35,8 @@ public class OperatorStats {
// TODO: add a operatorId for better tracking purpose.
private final int _stageId;
private final long _requestId;
- private final VirtualServerAddress _serverAddress;
- private final String _operatorType;
+ private final VirtualServerAddress _serverAddress;
private int _numBlock = 0;
private int _numRows = 0;
@@ -45,16 +44,15 @@ public class OperatorStats {
private final Map<String, String> _executionStats;
private boolean _processingStarted = false;
- public OperatorStats(OpChainExecutionContext context, String operatorType) {
- this(context.getRequestId(), context.getStageId(), context.getServer(), operatorType);
+ public OperatorStats(OpChainExecutionContext context) {
+ this(context.getRequestId(), context.getStageId(), context.getServer());
}
//TODO: remove this constructor after the context constructor can be used in serialization and deserialization
- public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress, String operatorType) {
+ public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress) {
_stageId = stageId;
_requestId = requestId;
_serverAddress = serverAddress;
- _operatorType = operatorType;
_executionStats = new HashMap<>();
}
@@ -115,10 +113,6 @@ public class OperatorStats {
return _serverAddress;
}
- public String getOperatorType() {
- return _operatorType;
- }
-
@Override
public String toString() {
return OperatorUtils.operatorStatsToJson(this);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index ab79339891..601f169cb3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -81,7 +81,6 @@ public class OperatorUtils {
jsonOut.put("requestId", operatorStats.getRequestId());
jsonOut.put("stageId", operatorStats.getStageId());
jsonOut.put("serverAddress", operatorStats.getServerAddress().toString());
- jsonOut.put("operatorType", operatorStats.getOperatorType());
jsonOut.put("executionStats", operatorStats.getExecutionStats());
return JsonUtils.objectToString(jsonOut);
} catch (Exception e) {
@@ -97,10 +96,9 @@ public class OperatorUtils {
int stageId = operatorStatsNode.get("stageId").asInt();
String serverAddressStr = operatorStatsNode.get("serverAddress").asText();
VirtualServerAddress serverAddress = VirtualServerAddress.parse(serverAddressStr);
- String operatorType = operatorStatsNode.get("operatorType").asText();
OperatorStats operatorStats =
- new OperatorStats(requestId, stageId, serverAddress, operatorType);
+ new OperatorStats(requestId, stageId, serverAddress);
operatorStats.recordExecutionStats(
JsonUtils.jsonNodeToObject(operatorStatsNode.get("executionStats"), new TypeReference<Map<String, String>>() {
}));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 1f500ce8b2..64141a024b 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -23,6 +23,8 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.apache.pinot.query.runtime.operator.OpChainStats;
/**
@@ -38,6 +40,8 @@ public class OpChainExecutionContext {
private final long _timeoutMs;
private final long _deadlineMs;
private final Map<Integer, StageMetadata> _metadataMap;
+ private final OpChainId _id;
+ private final OpChainStats _stats;
public OpChainExecutionContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId,
VirtualServerAddress server, long timeoutMs, long deadlineMs, Map<Integer, StageMetadata> metadataMap) {
@@ -48,6 +52,8 @@ public class OpChainExecutionContext {
_timeoutMs = timeoutMs;
_deadlineMs = deadlineMs;
_metadataMap = metadataMap;
+ _id = new OpChainId(requestId, server.virtualId(), stageId);
+ _stats = new OpChainStats(_id.toString());
}
public OpChainExecutionContext(PlanRequestContext planRequestContext) {
@@ -83,4 +89,12 @@ public class OpChainExecutionContext {
public Map<Integer, StageMetadata> getMetadataMap() {
return _metadataMap;
}
+
+ public OpChainId getId() {
+ return _id;
+ }
+
+ public OpChainStats getStats() {
+ return _stats;
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 4d01c67105..36c93a5827 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -55,6 +55,7 @@ import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.OpChainStats;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -175,12 +176,16 @@ public class QueryDispatcher {
public static ResultTable runReducer(long requestId, QueryPlan queryPlan, int reduceStageId, long timeoutMs,
MailboxService<TransferableBlock> mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) {
MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId);
+ VirtualServerAddress server =
+ new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0);
+ OpChainExecutionContext context =
+ new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs, timeoutMs,
+ queryPlan.getStageMetadataMap());
MailboxReceiveOperator mailboxReceiveOperator =
- createReduceStageOperator(mailboxService, queryPlan.getStageMetadataMap(), requestId,
- reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(),
- new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs);
+ createReduceStageOperator(
+ reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), context);
List<DataBlock> resultDataBlocks =
- reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, queryPlan);
+ reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, queryPlan, context.getStats());
return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(),
queryPlan.getQueryStageMap().get(0).getDataSchema());
}
@@ -193,7 +198,8 @@ public class QueryDispatcher {
}
private static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs,
- @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, QueryPlan queryPlan) {
+ @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, QueryPlan queryPlan,
+ OpChainStats stats) {
List<DataBlock> resultDataBlocks = new ArrayList<>();
TransferableBlock transferableBlock;
long timeoutWatermark = System.nanoTime() + timeoutMs * 1_000_000L;
@@ -209,7 +215,7 @@ public class QueryDispatcher {
continue;
} else if (transferableBlock.isEndOfStreamBlock()) {
if (executionStatsAggregatorMap != null) {
- for (Map.Entry<String, OperatorStats> entry : transferableBlock.getResultMetadata().entrySet()) {
+ for (Map.Entry<String, OperatorStats> entry : stats.getOperatorStatsMap().entrySet()) {
LOGGER.info("Broker Query Execution Stats - OperatorId: {}, OperatorStats: {}", entry.getKey(),
OperatorUtils.operatorStatsToJson(entry.getValue()));
OperatorStats operatorStats = entry.getValue();
@@ -276,11 +282,8 @@ public class QueryDispatcher {
return new DataSchema(colNames, colTypes);
}
- private static MailboxReceiveOperator createReduceStageOperator(MailboxService<TransferableBlock> mailboxService,
- Map<Integer, StageMetadata> stageMetadataMap, long jobId, int stageId, int reducerStageId, DataSchema dataSchema,
- VirtualServerAddress server, long timeoutMs) {
- OpChainExecutionContext context =
- new OpChainExecutionContext(mailboxService, jobId, stageId, server, timeoutMs, timeoutMs, stageMetadataMap);
+ private static MailboxReceiveOperator createReduceStageOperator(int stageId, int reducerStageId,
+ DataSchema dataSchema, OpChainExecutionContext context) {
// timeout is set for reduce stage
MailboxReceiveOperator mailboxReceiveOperator =
new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, Collections.emptyList(),
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 345b4ca51f..a30adf04f4 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -24,9 +24,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.Answer;
@@ -71,7 +73,9 @@ public class OpChainSchedulerServiceTest {
}
private OpChain getChain(MultiStageOperator operator) {
- return new OpChain(operator, ImmutableList.of(), 1, 123, 1);
+ VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
+ OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null);
+ return new OpChain(context, operator, ImmutableList.of());
}
@Test
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 88974b1cb3..34b39a697b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -22,8 +22,10 @@ import com.google.common.collect.ImmutableList;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.query.mailbox.JsonMailboxIdentifier;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -76,8 +78,9 @@ public class RoundRobinSchedulerTest {
@Test
public void testSchedulerHappyPath()
throws InterruptedException {
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID,
- 123, DEFAULT_RECEIVER_STAGE_ID);
+ OpChain chain =
+ new OpChain(getOpChainExecutionContext(DEFAULT_RECEIVER_STAGE_ID, 123, DEFAULT_VIRTUAL_SERVER_ID), _operator,
+ ImmutableList.of(MAILBOX_1));
_scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS);
_scheduler.register(chain);
@@ -102,8 +105,9 @@ public class RoundRobinSchedulerTest {
@Test
public void testSchedulerWhenSenderDies()
throws InterruptedException {
- OpChain chain = new OpChain(_operator, ImmutableList.of(MAILBOX_1), DEFAULT_VIRTUAL_SERVER_ID,
- DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+ OpChain chain = new OpChain(
+ getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, DEFAULT_VIRTUAL_SERVER_ID), _operator,
+ ImmutableList.of(MAILBOX_1));
_scheduler = new RoundRobinScheduler(DEFAULT_RELEASE_TIMEOUT_MS);
_scheduler.register(chain);
@@ -132,12 +136,17 @@ public class RoundRobinSchedulerTest {
// When parallelism is > 1, multiple OpChains with the same requestId and stageId would be registered in the same
// scheduler. Data received on a given mailbox should wake up exactly 1 OpChain corresponding to the virtual
// server-id determined by the Mailbox.
- OpChain chain1 = new OpChain(_operator, ImmutableList.of(MAILBOX_1), MAILBOX_1.getToHost().virtualId(),
- DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
- OpChain chain2 = new OpChain(_operator, ImmutableList.of(MAILBOX_2), MAILBOX_2.getToHost().virtualId(),
- DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
- OpChain chain3 = new OpChain(_operator, ImmutableList.of(MAILBOX_3), MAILBOX_3.getToHost().virtualId(),
- DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID);
+ OpChain chain1 = new OpChain(
+ getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_1.getToHost().virtualId()),
+ _operator, ImmutableList.of(MAILBOX_1));
+ OpChain chain2 = new OpChain(
+ getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_2.getToHost().virtualId()),
+ _operator, ImmutableList.of(MAILBOX_2));
+ OpChain chain3 = new OpChain(
+ getOpChainExecutionContext(DEFAULT_REQUEST_ID, DEFAULT_RECEIVER_STAGE_ID, MAILBOX_3.getToHost().virtualId()),
+ _operator, ImmutableList.of(MAILBOX_3));
+
+
// Register 3 OpChains. Keep release timeout high to avoid unintended OpChain wake-ups.
_scheduler = new RoundRobinScheduler(10_000);
_scheduler.register(chain1);
@@ -172,4 +181,9 @@ public class RoundRobinSchedulerTest {
Assert.assertEquals(0,
_scheduler.aliveChainsSize() + _scheduler.readySize() + _scheduler.seenMailSize() + _scheduler.availableSize());
}
+
+ private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) {
+ return new OpChainExecutionContext(null, requestId, stageId,
+ new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null);
+ }
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 72d5ba66b8..25b307b22b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -169,7 +169,7 @@ public class MailboxSendOperatorTest {
// Then:
Assert.assertTrue(block.isEndOfStreamBlock(), "expected EOS block to propagate");
- Mockito.verify(_exchange).send(eosBlock);
+ Assert.assertEquals(block, eosBlock);
}
@Test
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
new file mode 100644
index 0000000000..6f83f9ac77
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import java.util.ArrayList;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class OpChainTest {
+ private AutoCloseable _mocks;
+ @Mock
+ private MultiStageOperator _upstreamOperator;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testExecutionTimerStats() {
+ Mockito.when(_upstreamOperator.nextBlock()).then(x -> {
+ Thread.sleep(1000);
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ });
+
+ OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _upstreamOperator, new ArrayList<>());
+ opChain.getStats().executing();
+ opChain.getRoot().nextBlock();
+ opChain.getStats().queued();
+
+ Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
+
+ Mockito.when(_upstreamOperator.nextBlock()).then(x -> {
+ Thread.sleep(20);
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ });
+
+ opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _upstreamOperator, new ArrayList<>());
+ opChain.getStats().executing();
+ opChain.getRoot().nextBlock();
+ opChain.getStats().queued();
+
+ Assert.assertTrue(opChain.getStats().getExecutionTime() >= 20);
+ Assert.assertTrue(opChain.getStats().getExecutionTime() < 100);
+ }
+
+ @Test
+ public void testStatsCollection() {
+ OpChainExecutionContext context = OperatorTestUtil.getDefaultContext();
+ DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context);
+
+ OpChain opChain = new OpChain(context, dummyMultiStageOperator, new ArrayList<>());
+ opChain.getStats().executing();
+ opChain.getRoot().nextBlock();
+ opChain.getStats().queued();
+
+ Assert.assertTrue(opChain.getStats().getExecutionTime() >= 1000);
+ Assert.assertEquals(opChain.getStats().getOperatorStatsMap().size(), 1);
+ Assert.assertTrue(opChain.getStats().getOperatorStatsMap().containsKey(dummyMultiStageOperator.getOperatorId()));
+
+ Map<String, String> executionStats =
+ opChain.getStats().getOperatorStatsMap().get(dummyMultiStageOperator.getOperatorId()).getExecutionStats();
+ Assert.assertTrue(
+ Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())) >= 1000);
+ Assert.assertTrue(
+ Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())) <= 2000);
+ }
+
+ static class DummyMultiStageOperator extends MultiStageOperator {
+ public DummyMultiStageOperator(OpChainExecutionContext context) {
+ super(context);
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // IGNORE
+ }
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return "DUMMY";
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org