You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/20 01:15:49 UTC

[iotdb] 03/05: add metrics: data exchange

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/addQueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f9b21b6116351d851f9d84393afa6f721f8768e2
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Dec 19 22:25:26 2022 +0800

    add metrics: data exchange
---
 .../iotdb/commons/service/metric/enums/Metric.java |   3 +-
 .../iotdb/commons/service/metric/enums/Tag.java    |   3 +-
 .../db/mpp/execution/exchange/LocalSinkHandle.java |  36 ++--
 .../mpp/execution/exchange/LocalSourceHandle.java  |  12 ++
 .../execution/exchange/MPPDataExchangeManager.java |  18 ++
 .../db/mpp/execution/exchange/SinkHandle.java      |  61 ++++---
 .../db/mpp/execution/exchange/SourceHandle.java    |  34 +++-
 .../iotdb/db/mpp/metric/DataExchangeMetricSet.java | 192 +++++++++++++++++++++
 .../iotdb/db/mpp/metric/QueryMetricsManager.java   |  21 +++
 9 files changed, 339 insertions(+), 41 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 0fd3e8f258..e96be1f12e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -69,7 +69,8 @@ public enum Metric {
   DISPATCHER,
   QUERY_EXECUTION,
   AGGREGATION,
-  QUERY_RESOURCE;
+  QUERY_RESOURCE,
+  DATA_EXCHANGE;
 
   @Override
   public String toString() {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
index 178937bec4..b9019ba153 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
@@ -25,7 +25,8 @@ public enum Tag {
   REGION,
   STATUS,
   STAGE,
-  FROM;
+  FROM,
+  OPERATION;
 
   @Override
   public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index c9d03ab833..82e049d76d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
@@ -33,6 +34,7 @@ import java.util.Optional;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
 
 public class LocalSinkHandle implements ISinkHandle {
 
@@ -48,6 +50,8 @@ public class LocalSinkHandle implements ISinkHandle {
   private boolean aborted = false;
   private boolean closed = false;
 
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   public LocalSinkHandle(
       TFragmentInstanceId remoteFragmentInstanceId,
       String remotePlanNodeId,
@@ -102,22 +106,28 @@ public class LocalSinkHandle implements ISinkHandle {
 
   @Override
   public void send(TsBlock tsBlock) {
-    Validate.notNull(tsBlock, "tsBlocks is null");
-    synchronized (this) {
-      checkState();
-      if (!blocked.isDone()) {
-        throw new IllegalStateException("Sink handle is blocked.");
+    long startTime = System.nanoTime();
+    try {
+      Validate.notNull(tsBlock, "tsBlocks is null");
+      synchronized (this) {
+        checkState();
+        if (!blocked.isDone()) {
+          throw new IllegalStateException("Sink handle is blocked.");
+        }
       }
-    }
 
-    synchronized (queue) {
-      if (queue.hasNoMoreTsBlocks()) {
-        return;
-      }
-      logger.debug("[StartSendTsBlockOnLocal]");
-      synchronized (this) {
-        blocked = queue.add(tsBlock);
+      synchronized (queue) {
+        if (queue.hasNoMoreTsBlocks()) {
+          return;
+        }
+        logger.debug("[StartSendTsBlockOnLocal]");
+        synchronized (this) {
+          blocked = queue.add(tsBlock);
+        }
       }
+    } finally {
+      QUERY_METRICS.recordDataExchangeCost(
+          SINK_HANDLE_SEND_TSBLOCK_LOCAL, System.nanoTime() - startTime);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 64415f41dc..8a6ea77d4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -36,6 +37,8 @@ import java.nio.ByteBuffer;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SOURCE_HANDLE_GET_TSBLOCK_LOCAL;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SOURCE_HANDLE_SERIALIZE_TSBLOCK_LOCAL;
 
 public class LocalSourceHandle implements ISourceHandle {
 
@@ -55,6 +58,7 @@ public class LocalSourceHandle implements ISourceHandle {
   private final String threadName;
 
   private static final TsBlockSerde serde = new TsBlockSerde();
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
   public LocalSourceHandle(
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -88,6 +92,7 @@ public class LocalSourceHandle implements ISourceHandle {
 
   @Override
   public TsBlock receive() {
+    long startTime = System.nanoTime();
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
       checkState();
 
@@ -107,6 +112,9 @@ public class LocalSourceHandle implements ISourceHandle {
       }
       checkAndInvokeOnFinished();
       return tsBlock;
+    } finally {
+      QUERY_METRICS.recordDataExchangeCost(
+          SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
     }
   }
 
@@ -114,10 +122,14 @@ public class LocalSourceHandle implements ISourceHandle {
   public ByteBuffer getSerializedTsBlock() throws IoTDBException {
     TsBlock tsBlock = receive();
     if (tsBlock != null) {
+      long startTime = System.nanoTime();
       try {
         return serde.serialize(tsBlock);
       } catch (Exception e) {
         throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
+      } finally {
+        QUERY_METRICS.recordDataExchangeCost(
+            SOURCE_HANDLE_SERIALIZE_TSBLOCK_LOCAL, System.nanoTime() - startTime);
       }
     } else {
       return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 9f7ccf8455..48a77c3dea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
@@ -50,6 +51,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
 import static org.apache.iotdb.db.mpp.common.FragmentInstanceId.createFullId;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.GET_DATA_BLOCK_TASK_SERVER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER;
 
 public class MPPDataExchangeManager implements IMPPDataExchangeManager {
 
@@ -76,8 +80,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
   /** Handle thrift communications. */
   class MPPDataExchangeServiceImpl implements MPPDataExchangeService.Iface {
 
+    private final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
     @Override
     public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req) throws TException {
+      long startTime = System.nanoTime();
       try (SetThreadName fragmentInstanceName =
           new SetThreadName(
               createFullId(
@@ -105,11 +112,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
           }
         }
         return resp;
+      } finally {
+        QUERY_METRICS.recordDataExchangeCost(
+            GET_DATA_BLOCK_TASK_SERVER, System.nanoTime() - startTime);
       }
     }
 
     @Override
     public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) {
+      long startTime = System.nanoTime();
       try (SetThreadName fragmentInstanceName =
           new SetThreadName(
               createFullId(
@@ -133,11 +144,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
         logger.warn(
             "ack TsBlock [{}, {}) failed.", e.getStartSequenceId(), e.getEndSequenceId(), t);
         throw t;
+      } finally {
+        QUERY_METRICS.recordDataExchangeCost(
+            ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER, System.nanoTime() - startTime);
       }
     }
 
     @Override
     public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
+      long startTime = System.nanoTime();
       try (SetThreadName fragmentInstanceName =
           new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) {
         logger.debug(
@@ -171,6 +186,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
             (SourceHandle)
                 sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
         sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes());
+      } finally {
+        QUERY_METRICS.recordDataExchangeCost(
+            SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER, System.nanoTime() - startTime);
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index d92171b029..354960b2b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -49,6 +50,8 @@ import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.common.FragmentInstanceId.createFullId;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 
 public class SinkHandle implements ISinkHandle {
@@ -92,6 +95,8 @@ public class SinkHandle implements ISinkHandle {
 
   private boolean noMoreTsBlocks = false;
 
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   public SinkHandle(
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -139,30 +144,36 @@ public class SinkHandle implements ISinkHandle {
 
   @Override
   public synchronized void send(TsBlock tsBlock) {
-    Validate.notNull(tsBlock, "tsBlocks is null");
-    checkState();
-    if (!blocked.isDone()) {
-      throw new IllegalStateException("Sink handle is blocked.");
-    }
-    if (noMoreTsBlocks) {
-      return;
+    long startTime = System.nanoTime();
+    try {
+      Validate.notNull(tsBlock, "tsBlocks is null");
+      checkState();
+      if (!blocked.isDone()) {
+        throw new IllegalStateException("Sink handle is blocked.");
+      }
+      if (noMoreTsBlocks) {
+        return;
+      }
+      long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
+      int startSequenceId;
+      startSequenceId = nextSequenceId;
+      blocked =
+          localMemoryManager
+              .getQueryPool()
+              .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes)
+              .left;
+      bufferRetainedSizeInBytes += retainedSizeInBytes;
+
+      sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
+      nextSequenceId += 1;
+      currentTsBlockSize = retainedSizeInBytes;
+
+      // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
+      submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
+    } finally {
+      QUERY_METRICS.recordDataExchangeCost(
+          SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
     }
-    long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
-    int startSequenceId;
-    startSequenceId = nextSequenceId;
-    blocked =
-        localMemoryManager
-            .getQueryPool()
-            .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes)
-            .left;
-    bufferRetainedSizeInBytes += retainedSizeInBytes;
-
-    sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
-    nextSequenceId += 1;
-    currentTsBlockSize = retainedSizeInBytes;
-
-    // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
-    submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
   }
 
   @Override
@@ -358,6 +369,7 @@ public class SinkHandle implements ISinkHandle {
                 blockSizes);
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
+          long startTime = System.nanoTime();
           try (SyncDataNodeMPPDataExchangeServiceClient client =
               mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
             client.onNewDataBlockEvent(newDataBlockEvent);
@@ -373,6 +385,9 @@ public class SinkHandle implements ISinkHandle {
               Thread.currentThread().interrupt();
               sinkHandleListener.onFailure(SinkHandle.this, e);
             }
+          } finally {
+            QUERY_METRICS.recordDataExchangeCost(
+                SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - startTime);
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index c672383ca3..2ce4fd5c75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -49,6 +50,10 @@ import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.GET_DATA_BLOCK_TASK_CALLER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SOURCE_HANDLE_GET_TSBLOCK_REMOTE;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SOURCE_HANDLE_SERIALIZE_TSBLOCK_REMOTE;
 
 public class SourceHandle implements ISourceHandle {
 
@@ -89,6 +94,8 @@ public class SourceHandle implements ISourceHandle {
 
   private boolean closed = false;
 
+  private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
   public SourceHandle(
       TEndPoint remoteEndpoint,
       TFragmentInstanceId remoteFragmentInstanceId,
@@ -118,7 +125,13 @@ public class SourceHandle implements ISourceHandle {
   public synchronized TsBlock receive() {
     ByteBuffer tsBlock = getSerializedTsBlock();
     if (tsBlock != null) {
-      return serde.deserialize(tsBlock);
+      long startTime = System.nanoTime();
+      try {
+        return serde.deserialize(tsBlock);
+      } finally {
+        QUERY_METRICS.recordDataExchangeCost(
+            SOURCE_HANDLE_SERIALIZE_TSBLOCK_REMOTE, System.nanoTime() - startTime);
+      }
     } else {
       return null;
     }
@@ -126,6 +139,7 @@ public class SourceHandle implements ISourceHandle {
 
   @Override
   public synchronized ByteBuffer getSerializedTsBlock() {
+    long startTime = System.nanoTime();
     try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
 
       checkState();
@@ -153,6 +167,9 @@ public class SourceHandle implements ISourceHandle {
       }
       trySubmitGetDataBlocksTask();
       return tsBlock;
+    } finally {
+      QUERY_METRICS.recordDataExchangeCost(
+          SOURCE_HANDLE_GET_TSBLOCK_REMOTE, System.nanoTime() - startTime);
     }
   }
 
@@ -397,13 +414,17 @@ public class SourceHandle implements ISourceHandle {
         int attempt = 0;
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
+          long startTime = System.nanoTime();
           try (SyncDataNodeMPPDataExchangeServiceClient client =
               mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
             TGetDataBlockResponse resp = client.getDataBlock(req);
-            List<ByteBuffer> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
+
+            int tsBlockNum = resp.getTsBlocks().size();
+            List<ByteBuffer> tsBlocks = new ArrayList<>(tsBlockNum);
             tsBlocks.addAll(resp.getTsBlocks());
 
-            logger.debug("[EndPullTsBlocksFromRemote] Count:{}", tsBlocks.size());
+            logger.debug("[EndPullTsBlocksFromRemote] Count:{}", tsBlockNum);
+            QUERY_METRICS.recordDataBlockNum(tsBlockNum);
             executorService.submit(
                 new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
             synchronized (SourceHandle.this) {
@@ -443,6 +464,9 @@ public class SourceHandle implements ISourceHandle {
               fail(e);
               return;
             }
+          } finally {
+            QUERY_METRICS.recordDataExchangeCost(
+                GET_DATA_BLOCK_TASK_CALLER, System.nanoTime() - startTime);
           }
         }
       }
@@ -480,6 +504,7 @@ public class SourceHandle implements ISourceHandle {
                 remoteFragmentInstanceId, startSequenceId, endSequenceId);
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
+          long startTime = System.nanoTime();
           try (SyncDataNodeMPPDataExchangeServiceClient client =
               mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
             client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
@@ -504,6 +529,9 @@ public class SourceHandle implements ISourceHandle {
                 sourceHandleListener.onFailure(SourceHandle.this, e);
               }
             }
+          } finally {
+            QUERY_METRICS.recordDataExchangeCost(
+                ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - startTime);
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/DataExchangeMetricSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/DataExchangeMetricSet.java
new file mode 100644
index 0000000000..5f56579437
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/DataExchangeMetricSet.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricInfo;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataExchangeMetricSet implements IMetricSet {
+
+  private static final String metric = Metric.DATA_EXCHANGE.toString();
+
+  public static final Map<String, MetricInfo> metricInfoMap = new HashMap<>();
+
+  public static final String SOURCE_HANDLE_GET_TSBLOCK_LOCAL = "source_handle_get_tsblock_local";
+  public static final String SOURCE_HANDLE_GET_TSBLOCK_REMOTE = "source_handle_get_tsblock_remote";
+  public static final String SOURCE_HANDLE_SERIALIZE_TSBLOCK_LOCAL =
+      "source_handle_serialize_tsblock_local";
+  public static final String SOURCE_HANDLE_SERIALIZE_TSBLOCK_REMOTE =
+      "source_handle_serialize_tsblock_remote";
+  public static final String SINK_HANDLE_SEND_TSBLOCK_LOCAL = "sink_handle_send_tsblock_local";
+  public static final String SINK_HANDLE_SEND_TSBLOCK_REMOTE = "sink_handle_send_tsblock_remote";
+
+  static {
+    metricInfoMap.put(
+        SOURCE_HANDLE_GET_TSBLOCK_LOCAL,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "source_handle_get_tsblock",
+            Tag.TYPE.toString(),
+            "local"));
+    metricInfoMap.put(
+        SOURCE_HANDLE_GET_TSBLOCK_REMOTE,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "source_handle_get_tsblock",
+            Tag.TYPE.toString(),
+            "remote"));
+    metricInfoMap.put(
+        SOURCE_HANDLE_SERIALIZE_TSBLOCK_LOCAL,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "source_handle_serialize_tsblock",
+            Tag.TYPE.toString(),
+            "local"));
+    metricInfoMap.put(
+        SOURCE_HANDLE_SERIALIZE_TSBLOCK_REMOTE,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "source_handle_serialize_tsblock",
+            Tag.TYPE.toString(),
+            "remote"));
+    metricInfoMap.put(
+        SINK_HANDLE_SEND_TSBLOCK_LOCAL,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "sink_handle_send_tsblock",
+            Tag.TYPE.toString(),
+            "local"));
+    metricInfoMap.put(
+        SINK_HANDLE_SEND_TSBLOCK_REMOTE,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "sink_handle_send_tsblock",
+            Tag.TYPE.toString(),
+            "remote"));
+  }
+
+  public static final String SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER =
+      "send_new_data_block_event_task_caller";
+  public static final String SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER =
+      "send_new_data_block_event_task_server";
+  public static final String ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER =
+      "on_acknowledge_data_block_event_task_caller";
+  public static final String ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER =
+      "on_acknowledge_data_block_event_task_server";
+  public static final String GET_DATA_BLOCK_TASK_CALLER = "get_data_block_task_caller";
+  public static final String GET_DATA_BLOCK_TASK_SERVER = "get_data_block_task_server";
+
+  static {
+    metricInfoMap.put(
+        SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "send_new_data_block_event_task",
+            Tag.TYPE.toString(),
+            "caller"));
+    metricInfoMap.put(
+        SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "send_new_data_block_event_task",
+            Tag.TYPE.toString(),
+            "server"));
+    metricInfoMap.put(
+        ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "on_acknowledge_data_block_event_task",
+            Tag.TYPE.toString(),
+            "caller"));
+    metricInfoMap.put(
+        ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "on_acknowledge_data_block_event_task",
+            Tag.TYPE.toString(),
+            "server"));
+    metricInfoMap.put(
+        GET_DATA_BLOCK_TASK_CALLER,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "get_data_block_task",
+            Tag.TYPE.toString(),
+            "caller"));
+    metricInfoMap.put(
+        GET_DATA_BLOCK_TASK_SERVER,
+        new MetricInfo(
+            MetricType.TIMER,
+            metric,
+            Tag.OPERATION.toString(),
+            "get_data_block_task",
+            Tag.TYPE.toString(),
+            "server"));
+  }
+
+  public static final String GET_DATA_BLOCK_NUM = "get_data_block_num";
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    for (MetricInfo metricInfo : metricInfoMap.values()) {
+      metricService.getOrCreateTimer(
+          metricInfo.getName(), MetricLevel.IMPORTANT, metricInfo.getTagsInArray());
+    }
+    metricService.getOrCreateRate(
+        metric, MetricLevel.IMPORTANT, Tag.NAME.toString(), GET_DATA_BLOCK_NUM);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    for (MetricInfo metricInfo : metricInfoMap.values()) {
+      metricService.remove(MetricType.TIMER, metric, metricInfo.getTagsInArray());
+    }
+    metricService.remove(MetricType.RATE, metric, Tag.NAME.toString(), GET_DATA_BLOCK_NUM);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
index 6d6146c90d..a90f9b7241 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.GET_DATA_BLOCK_NUM;
+
 public class QueryMetricsManager {
 
   private final MetricService metricService = MetricService.getInstance();
@@ -85,6 +87,25 @@ public class QueryMetricsManager {
         count, Metric.QUERY_RESOURCE.toString(), MetricLevel.IMPORTANT, Tag.TYPE.toString(), type);
   }
 
+  public void recordDataExchangeCost(String stage, long costTimeInNanos) {
+    MetricInfo metricInfo = DataExchangeMetricSet.metricInfoMap.get(stage);
+    metricService.timer(
+        costTimeInNanos,
+        TimeUnit.NANOSECONDS,
+        metricInfo.getName(),
+        MetricLevel.IMPORTANT,
+        metricInfo.getTagsInArray());
+  }
+
+  public void recordDataBlockNum(int num) {
+    metricService.rate(
+        num,
+        Metric.DATA_EXCHANGE.toString(),
+        MetricLevel.IMPORTANT,
+        Tag.NAME.toString(),
+        GET_DATA_BLOCK_NUM);
+  }
+
   public static QueryMetricsManager getInstance() {
     return QueryMetricsManager.QueryMetricsManagerHolder.INSTANCE;
   }