You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/29 08:07:47 UTC

(pinot) branch master updated: [Multi-stage] Reduce the stats transfered (#12517)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 35c89c87a5 [Multi-stage] Reduce the stats transfered (#12517)
35c89c87a5 is described below

commit 35c89c87a5473465ca273b576d5bf1e554cb0c35
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 29 00:07:41 2024 -0800

    [Multi-stage] Reduce the stats transfered (#12517)
---
 .../MultiStageBrokerRequestHandler.java            | 20 +++++++----
 .../pinot/common/datablock/DataBlockUtils.java     |  2 --
 .../runtime/blocks/TransferableBlockUtils.java     |  3 +-
 .../runtime/operator/exchange/BlockExchange.java   | 39 ++++++++++++++++------
 .../query/service/dispatch/QueryDispatcher.java    | 25 ++++++--------
 5 files changed, 55 insertions(+), 34 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 05fb1ddd52..35aff7efd2 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -19,8 +19,9 @@
 package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -61,6 +62,7 @@ import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.catalog.PinotCatalog;
 import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
 import org.apache.pinot.query.routing.WorkerManager;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
@@ -179,14 +181,20 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
 
     Map<String, String> queryOptions = sqlNodeAndOptions.getOptions();
     boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
-
-    ResultTable queryResults;
-    Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
-    for (int stageId = 0; stageId < dispatchableSubPlan.getQueryStageList().size(); stageId++) {
-      stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
+    Map<Integer, ExecutionStatsAggregator> stageIdStatsMap;
+    if (!traceEnabled) {
+      stageIdStatsMap = Collections.singletonMap(0, new ExecutionStatsAggregator(false));
+    } else {
+      List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
+      int numStages = stagePlans.size();
+      stageIdStatsMap = Maps.newHashMapWithExpectedSize(numStages);
+      for (int stageId = 0; stageId < numStages; stageId++) {
+        stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(true));
+      }
     }
 
     long executionStartTimeNs = System.nanoTime();
+    ResultTable queryResults;
     try {
       queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions,
           stageIdStatsMap);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
index 5d38168712..27f1140328 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java
@@ -59,12 +59,10 @@ public final class DataBlockUtils {
   }
 
   public static MetadataBlock getEndOfStreamDataBlock() {
-    // TODO: add query statistics metadata for the block.
     return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS);
   }
 
   public static MetadataBlock getEndOfStreamDataBlock(Map<String, String> stats) {
-    // TODO: add query statistics metadata for the block.
     return new MetadataBlock(MetadataBlock.MetadataBlockType.EOS, stats);
   }
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
index 01c5fd7ddd..355b6fe294 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlockUtils.java
@@ -30,13 +30,14 @@ import org.apache.pinot.common.datablock.DataBlockUtils;
 
 public final class TransferableBlockUtils {
   private static final int MEDIAN_COLUMN_SIZE_BYTES = 8;
+  private static final TransferableBlock EMPTY_EOS = new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
 
   private TransferableBlockUtils() {
     // do not instantiate.
   }
 
   public static TransferableBlock getEndOfStreamTransferableBlock() {
-    return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+    return EMPTY_EOS;
   }
 
   public static TransferableBlock getEndOfStreamTransferableBlock(Map<String, String> statsMap) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index 453288ecb3..f8d49b6328 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.runtime.operator.exchange;
 import com.google.common.base.Preconditions;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -28,6 +29,7 @@ import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
 import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 
 
 /**
@@ -75,24 +77,39 @@ public abstract class BlockExchange {
    */
   public boolean send(TransferableBlock block)
       throws Exception {
-    if (block.isEndOfStreamBlock()) {
+    if (block.isErrorBlock()) {
+      // Send error block to all mailboxes to propagate the error
       for (SendingMailbox sendingMailbox : _sendingMailboxes) {
         sendBlock(sendingMailbox, block);
       }
       return false;
-    } else {
-      boolean isEarlyTerminated = true;
-      for (SendingMailbox sendingMailbox : _sendingMailboxes) {
-        if (!sendingMailbox.isEarlyTerminated()) {
-          isEarlyTerminated = false;
-          break;
-        }
+    }
+
+    if (block.isSuccessfulEndOfStreamBlock()) {
+      // Send metadata to only one randomly picked mailbox, and empty EOS block to other mailboxes
+      int numMailboxes = _sendingMailboxes.size();
+      int mailboxIdToSendMetadata = ThreadLocalRandom.current().nextInt(numMailboxes);
+      for (int i = 0; i < numMailboxes; i++) {
+        SendingMailbox sendingMailbox = _sendingMailboxes.get(i);
+        TransferableBlock blockToSend =
+            i == mailboxIdToSendMetadata ? block : TransferableBlockUtils.getEndOfStreamTransferableBlock();
+        sendBlock(sendingMailbox, blockToSend);
       }
-      if (!isEarlyTerminated) {
-        route(_sendingMailboxes, block);
+      return false;
+    }
+
+    assert block.isDataBlock();
+    boolean isEarlyTerminated = true;
+    for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+      if (!sendingMailbox.isEarlyTerminated()) {
+        isEarlyTerminated = false;
+        break;
       }
-      return isEarlyTerminated;
     }
+    if (!isEarlyTerminated) {
+      route(_sendingMailboxes, block);
+    }
+    return isEarlyTerminated;
   }
 
   protected void sendBlock(SendingMailbox sendingMailbox, TransferableBlock block)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index f217c9b01a..c13d6f6aee 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import org.apache.calcite.util.Pair;
+import org.apache.commons.collections.MapUtils;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.proto.Worker;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -89,7 +90,7 @@ public class QueryDispatcher {
   }
 
   public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
-      Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
+      Map<String, String> queryOptions, @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregator)
       throws Exception {
     long requestId = context.getRequestId();
     try {
@@ -278,20 +279,16 @@ public class QueryDispatcher {
   }
 
   private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats,
-      @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) {
-    if (executionStatsAggregatorMap != null) {
-      LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", opChainStats.getExecutionTime());
-      for (Map.Entry<String, OperatorStats> entry : opChainStats.getOperatorStatsMap().entrySet()) {
-        OperatorStats operatorStats = entry.getValue();
-        ExecutionStatsAggregator rootStatsAggregator = executionStatsAggregatorMap.get(0);
-        ExecutionStatsAggregator stageStatsAggregator = executionStatsAggregatorMap.get(operatorStats.getStageId());
-        rootStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+      @Nullable Map<Integer, ExecutionStatsAggregator> statsAggregatorMap) {
+    if (MapUtils.isNotEmpty(statsAggregatorMap)) {
+      for (OperatorStats operatorStats : opChainStats.getOperatorStatsMap().values()) {
+        ExecutionStatsAggregator rootStatsAggregator = statsAggregatorMap.get(0);
+        rootStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>());
+        ExecutionStatsAggregator stageStatsAggregator = statsAggregatorMap.get(operatorStats.getStageId());
         if (stageStatsAggregator != null) {
-          if (dispatchableSubPlan != null) {
-            OperatorUtils.recordTableName(operatorStats,
-                dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
-          }
-          stageStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap<>());
+          OperatorUtils.recordTableName(operatorStats,
+              dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
+          stageStatsAggregator.aggregate(null, operatorStats.getExecutionStats(), new HashMap<>());
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org