You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/29 08:07:47 UTC
(pinot) branch master updated: [Multi-stage] Reduce the stats transfered (#12517)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 35c89c87a5 [Multi-stage] Reduce the stats transfered (#12517)
35c89c87a5 is described below
commit 35c89c87a5473465ca273b576d5bf1e554cb0c35
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 29 00:07:41 2024 -0800
[Multi-stage] Reduce the stats transfered (#12517)
---
.../MultiStageBrokerRequestHandler.java | 20 +++++++----
.../pinot/common/datablock/DataBlockUtils.java | 2 --
.../runtime/blocks/TransferableBlockUtils.java | 3 +-
.../runtime/operator/exchange/BlockExchange.java | 39 ++++++++++++++++------
.../query/service/dispatch/QueryDispatcher.java | 25 ++++++--------
5 files changed, 55 insertions(+), 34 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 05fb1ddd52..35aff7efd2 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -19,8 +19,9 @@
package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -61,6 +62,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
@@ -179,14 +181,20 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
-
- ResultTable queryResults;
- Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
- for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
- stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
+ Map<Integer, ExecutionStatsAggregator> stageIdStatsMap;
+ if (!traceEnabled) {
+ stageIdStatsMap = Collections.singletonMap(0, new ExecutionStatsAggregator(false));
+ } else {
+ List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
+ int numStages = stagePlans.size();
+ stageIdStatsMap = Maps.newHashMapWithExpectedSize(numStages);
+ for (int stageId = 0; stageId < numStages; stageId++) {
+ stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(true));
+ }
}
long executionStartTimeNs = System.nanoTime();
+ ResultTable queryResults;
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions,
stageIdStatsMap);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 5d38168712..27f1140328 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -59,12 +59,10 @@ public final class DataBlockUtils {
}
public static MetadataBlock getEndOfStreamDataBlock() {
- // TODO: add query statistics metadata for the block.
return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
}
public static MetadataBlock getEndOfStreamDataBlock(Map<String, String> stats) {
- // TODO: add query statistics metadata for the block.
return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS, stats);
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index 01c5fd7ddd..355b6fe294 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -30,13 +30,14 @@ import org.apache.pinot.common.datablock.DataBlockUtils;
public final class TransferableBlockUtils {
private static final int MEDIAN_COLUMN_SIZE_BYTES = 8;
+ private static final TransferableBlock EMPTY_EOS = new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
private TransferableBlockUtils() {
// do not instantiate.
}
public static TransferableBlock getEndOfStreamTransferableBlock() {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+ return EMPTY_EOS;
}
public static TransferableBlock getEndOfStreamTransferableBlock(Map<String, String> statsMap) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index 453288ecb3..f8d49b6328 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.runtime.operator.exchange;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.datablock.DataBlock;
@@ -28,6 +29,7 @@ import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
/**
@@ -75,24 +77,39 @@ public abstract class BlockExchange {
*/
public boolean send(TransferableBlock block)
throws Exception {
- if (block.isEndOfStreamBlock()) {
+ if (block.isErrorBlock()) {
+ // Send error block to all mailboxes to propagate the error
for (SendingMailbox sendingMailbox : _sendingMailboxes) {
sendBlock(sendingMailbox, block);
}
return false;
- } else {
- boolean isEarlyTerminated = true;
- for (SendingMailbox sendingMailbox : _sendingMailboxes) {
- if (!sendingMailbox.isEarlyTerminated()) {
- isEarlyTerminated = false;
- break;
- }
+ }
+
+ if (block.isSuccessfulEndOfStreamBlock()) {
+ // Send metadata to only one randomly picked mailbox, and empty EOS block to other mailboxes
+ int numMailboxes = _sendingMailboxes.size();
+ int mailboxIdToSendMetadata = ThreadLocalRandom.current().nextInt(numMailboxes);
+ for (int i = 0; i < numMailboxes; i++) {
+ SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+ TransferableBlock blockToSend =
+ i == mailboxIdToSendMetadata ? block : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ sendBlock(sendingMailbox, blockToSend);
}
- if (!isEarlyTerminated) {
- route(_sendingMailboxes, block);
+ return false;
+ }
+
+ assert block.isDataBlock();
+ boolean isEarlyTerminated = true;
+ for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+ if (!sendingMailbox.isEarlyTerminated()) {
+ isEarlyTerminated = false;
+ break;
}
- return isEarlyTerminated;
}
+ if (!isEarlyTerminated) {
+ route(_sendingMailboxes, block);
+ }
+ return isEarlyTerminated;
}
protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index f217c9b01a..c13d6f6aee 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.calcite.util.Pair;
+import org.apache.commons.collections.MapUtils;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
@@ -89,7 +90,7 @@ public class QueryDispatcher {
}
public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
- Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
+ Map<String, String> queryOptions, @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
throws Exception {
long requestId = context.getRequestId();
try {
@@ -278,20 +279,16 @@ public class QueryDispatcher {
}
private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats,
- @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
- if (executionStatsAggregatorMap != null) {
- LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", opChainStats.getExecutionTime());
- for (Map.Entry<String, OperatorStats> entry : opChainStats.getOperatorStatsMap().entrySet()) {
- OperatorStats operatorStats = entry.getValue();
- ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0);
- ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId());
- rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+ @Nullable Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) {
+ if (MapUtils.isNotEmpty(statsAggregatorMap)) {
+ for (OperatorStats operatorStats : opChainStats.getOperatorStatsMap().values()) {
+ ExecutionStatsAggregator rootStatsAggregator = statsAggregatorMap.get(0);
+ rootStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>());
+ ExecutionStatsAggregator stageStatsAggregator = statsAggregatorMap.get(operatorStats.getStageId());
if (stageStatsAggregator != null) {
- if (dispatchableSubPlan != null) {
- OperatorUtils.recordTableName(operatorStats,
- dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
- }
- stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+ OperatorUtils.recordTableName(operatorStats,
+ dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
+ stageStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org