You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/20 01:15:49 UTC
[iotdb] 03/05: add metrics: data exchange
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/addQueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f9b21b6116351d851f9d84393afa6f721f8768e2
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Dec 19 22:25:26 2022 +0800
add metrics: data exchange
---
.../iotdb/commons/service/metric/enums/Metric.java | 3 +-
.../iotdb/commons/service/metric/enums/Tag.java | 3 +-
.../db/mpp/execution/exchange/LocalSinkHandle.java | 36 ++--
.../mpp/execution/exchange/LocalSourceHandle.java | 12 ++
.../execution/exchange/MPPDataExchangeManager.java | 18 ++
.../db/mpp/execution/exchange/SinkHandle.java | 61 ++++---
.../db/mpp/execution/exchange/SourceHandle.java | 34 +++-
.../iotdb/db/mpp/metric/DataExchangeMetricSet.java | 192 +++++++++++++++++++++
.../iotdb/db/mpp/metric/QueryMetricsManager.java | 21 +++
9 files changed, 339 insertions(+), 41 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 0fd3e8f258..e96be1f12e 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -69,7 +69,8 @@ public enum Metric {
DISPATCHER,
QUERY_EXECUTION,
AGGREGATION,
- QUERY_RESOURCE;
+ QUERY_RESOURCE,
+ DATA_EXCHANGE;
@Override
public String toString() {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
index 178937bec4..b9019ba153 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
@@ -25,7 +25,8 @@ public enum Tag {
REGION,
STATUS,
STAGE,
- FROM;
+ FROM,
+ OPERATION;
@Override
public String toString() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index c9d03ab833..82e049d76d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -33,6 +34,7 @@ import java.util.Optional;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
public class LocalSinkHandle implements ISinkHandle {
@@ -48,6 +50,8 @@ public class LocalSinkHandle implements ISinkHandle {
private boolean aborted = false;
private boolean closed = false;
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
public LocalSinkHandle(
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
@@ -102,22 +106,28 @@ public class LocalSinkHandle implements ISinkHandle {
@Override
public void send(TsBlock tsBlock) {
- Validate.notNull(tsBlock, "tsBlocks is null");
- synchronized (this) {
- checkState();
- if (!blocked.isDone()) {
- throw new IllegalStateException("Sink handle is blocked.");
+ long startTime = System.nanoTime();
+ try {
+ Validate.notNull(tsBlock, "tsBlocks is null");
+ synchronized (this) {
+ checkState();
+ if (!blocked.isDone()) {
+ throw new IllegalStateException("Sink handle is blocked.");
+ }
}
- }
- synchronized (queue) {
- if (queue.hasNoMoreTsBlocks()) {
- return;
- }
- logger.debug("[StartSendTsBlockOnLocal]");
- synchronized (this) {
- blocked = queue.add(tsBlock);
+ synchronized (queue) {
+ if (queue.hasNoMoreTsBlocks()) {
+ return;
+ }
+ logger.debug("[StartSendTsBlockOnLocal]");
+ synchronized (this) {
+ blocked = queue.add(tsBlock);
+ }
}
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ SINK_HANDLE_SEND_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
index 64415f41dc..8a6ea77d4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSourceHandle.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -36,6 +37,8 @@ import java.nio.ByteBuffer;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SOURCE_HANDLE_GET_TSBLOCK_LOCAL;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SOURCE_HANDLE_SERIALIZE_TSBLOCK_LOCAL;
public class LocalSourceHandle implements ISourceHandle {
@@ -55,6 +58,7 @@ public class LocalSourceHandle implements ISourceHandle {
private final String threadName;
private static final TsBlockSerde serde = new TsBlockSerde();
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
public LocalSourceHandle(
TFragmentInstanceId remoteFragmentInstanceId,
@@ -88,6 +92,7 @@ public class LocalSourceHandle implements ISourceHandle {
@Override
public TsBlock receive() {
+ long startTime = System.nanoTime();
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
checkState();
@@ -107,6 +112,9 @@ public class LocalSourceHandle implements ISourceHandle {
}
checkAndInvokeOnFinished();
return tsBlock;
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
}
@@ -114,10 +122,14 @@ public class LocalSourceHandle implements ISourceHandle {
public ByteBuffer getSerializedTsBlock() throws IoTDBException {
TsBlock tsBlock = receive();
if (tsBlock != null) {
+ long startTime = System.nanoTime();
try {
return serde.serialize(tsBlock);
} catch (Exception e) {
throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ SOURCE_HANDLE_SERIALIZE_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
} else {
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 9f7ccf8455..48a77c3dea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
@@ -50,6 +51,9 @@ import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import static org.apache.iotdb.db.mpp.common.FragmentInstanceId.createFullId;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.GET_DATA_BLOCK_TASK_SERVER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER;
public class MPPDataExchangeManager implements IMPPDataExchangeManager {
@@ -76,8 +80,11 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
/** Handle thrift communications. */
class MPPDataExchangeServiceImpl implements MPPDataExchangeService.Iface {
+ private final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
@Override
public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req) throws TException {
+ long startTime = System.nanoTime();
try (SetThreadName fragmentInstanceName =
new SetThreadName(
createFullId(
@@ -105,11 +112,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
}
}
return resp;
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ GET_DATA_BLOCK_TASK_SERVER, System.nanoTime() - startTime);
}
}
@Override
public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) {
+ long startTime = System.nanoTime();
try (SetThreadName fragmentInstanceName =
new SetThreadName(
createFullId(
@@ -133,11 +144,15 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
logger.warn(
"ack TsBlock [{}, {}) failed.", e.getStartSequenceId(), e.getEndSequenceId(), t);
throw t;
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER, System.nanoTime() - startTime);
}
}
@Override
public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
+ long startTime = System.nanoTime();
try (SetThreadName fragmentInstanceName =
new SetThreadName(createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId))) {
logger.debug(
@@ -171,6 +186,9 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
(SourceHandle)
sourceHandles.get(e.getTargetFragmentInstanceId()).get(e.getTargetPlanNodeId());
sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes());
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER, System.nanoTime() - startTime);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index d92171b029..354960b2b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -49,6 +50,8 @@ import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.common.FragmentInstanceId.createFullId;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
public class SinkHandle implements ISinkHandle {
@@ -92,6 +95,8 @@ public class SinkHandle implements ISinkHandle {
private boolean noMoreTsBlocks = false;
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
public SinkHandle(
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
@@ -139,30 +144,36 @@ public class SinkHandle implements ISinkHandle {
@Override
public synchronized void send(TsBlock tsBlock) {
- Validate.notNull(tsBlock, "tsBlocks is null");
- checkState();
- if (!blocked.isDone()) {
- throw new IllegalStateException("Sink handle is blocked.");
- }
- if (noMoreTsBlocks) {
- return;
+ long startTime = System.nanoTime();
+ try {
+ Validate.notNull(tsBlock, "tsBlocks is null");
+ checkState();
+ if (!blocked.isDone()) {
+ throw new IllegalStateException("Sink handle is blocked.");
+ }
+ if (noMoreTsBlocks) {
+ return;
+ }
+ long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
+ int startSequenceId;
+ startSequenceId = nextSequenceId;
+ blocked =
+ localMemoryManager
+ .getQueryPool()
+ .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes)
+ .left;
+ bufferRetainedSizeInBytes += retainedSizeInBytes;
+
+ sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
+ nextSequenceId += 1;
+ currentTsBlockSize = retainedSizeInBytes;
+
+ // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
+ submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
}
- long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
- int startSequenceId;
- startSequenceId = nextSequenceId;
- blocked =
- localMemoryManager
- .getQueryPool()
- .reserve(localFragmentInstanceId.getQueryId(), retainedSizeInBytes)
- .left;
- bufferRetainedSizeInBytes += retainedSizeInBytes;
-
- sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
- nextSequenceId += 1;
- currentTsBlockSize = retainedSizeInBytes;
-
- // TODO: consider merge multiple NewDataBlockEvent for less network traffic.
- submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
}
@Override
@@ -358,6 +369,7 @@ public class SinkHandle implements ISinkHandle {
blockSizes);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
+ long startTime = System.nanoTime();
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
client.onNewDataBlockEvent(newDataBlockEvent);
@@ -373,6 +385,9 @@ public class SinkHandle implements ISinkHandle {
Thread.currentThread().interrupt();
sinkHandleListener.onFailure(SinkHandle.this, e);
}
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - startTime);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index c672383ca3..2ce4fd5c75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceCl
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
+import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -49,6 +50,10 @@ import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.GET_DATA_BLOCK_TASK_CALLER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SOURCE_HANDLE_GET_TSBLOCK_REMOTE;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.SOURCE_HANDLE_SERIALIZE_TSBLOCK_REMOTE;
public class SourceHandle implements ISourceHandle {
@@ -89,6 +94,8 @@ public class SourceHandle implements ISourceHandle {
private boolean closed = false;
+ private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
+
public SourceHandle(
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
@@ -118,7 +125,13 @@ public class SourceHandle implements ISourceHandle {
public synchronized TsBlock receive() {
ByteBuffer tsBlock = getSerializedTsBlock();
if (tsBlock != null) {
- return serde.deserialize(tsBlock);
+ long startTime = System.nanoTime();
+ try {
+ return serde.deserialize(tsBlock);
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ SOURCE_HANDLE_SERIALIZE_TSBLOCK_REMOTE, System.nanoTime() - startTime);
+ }
} else {
return null;
}
@@ -126,6 +139,7 @@ public class SourceHandle implements ISourceHandle {
@Override
public synchronized ByteBuffer getSerializedTsBlock() {
+ long startTime = System.nanoTime();
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
checkState();
@@ -153,6 +167,9 @@ public class SourceHandle implements ISourceHandle {
}
trySubmitGetDataBlocksTask();
return tsBlock;
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ SOURCE_HANDLE_GET_TSBLOCK_REMOTE, System.nanoTime() - startTime);
}
}
@@ -397,13 +414,17 @@ public class SourceHandle implements ISourceHandle {
int attempt = 0;
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
+ long startTime = System.nanoTime();
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
TGetDataBlockResponse resp = client.getDataBlock(req);
- List<ByteBuffer> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
+
+ int tsBlockNum = resp.getTsBlocks().size();
+ List<ByteBuffer> tsBlocks = new ArrayList<>(tsBlockNum);
tsBlocks.addAll(resp.getTsBlocks());
- logger.debug("[EndPullTsBlocksFromRemote] Count:{}", tsBlocks.size());
+ logger.debug("[EndPullTsBlocksFromRemote] Count:{}", tsBlockNum);
+ QUERY_METRICS.recordDataBlockNum(tsBlockNum);
executorService.submit(
new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
synchronized (SourceHandle.this) {
@@ -443,6 +464,9 @@ public class SourceHandle implements ISourceHandle {
fail(e);
return;
}
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ GET_DATA_BLOCK_TASK_CALLER, System.nanoTime() - startTime);
}
}
}
@@ -480,6 +504,7 @@ public class SourceHandle implements ISourceHandle {
remoteFragmentInstanceId, startSequenceId, endSequenceId);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
+ long startTime = System.nanoTime();
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
@@ -504,6 +529,9 @@ public class SourceHandle implements ISourceHandle {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
+ } finally {
+ QUERY_METRICS.recordDataExchangeCost(
+ ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - startTime);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/DataExchangeMetricSet.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/DataExchangeMetricSet.java
new file mode 100644
index 0000000000..5f56579437
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/DataExchangeMetricSet.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricInfo;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DataExchangeMetricSet implements IMetricSet {
+
+ private static final String metric = Metric.DATA_EXCHANGE.toString();
+
+ public static final Map<String, MetricInfo> metricInfoMap = new HashMap<>();
+
+ public static final String SOURCE_HANDLE_GET_TSBLOCK_LOCAL = "source_handle_get_tsblock_local";
+ public static final String SOURCE_HANDLE_GET_TSBLOCK_REMOTE = "source_handle_get_tsblock_remote";
+ public static final String SOURCE_HANDLE_SERIALIZE_TSBLOCK_LOCAL =
+ "source_handle_serialize_tsblock_local";
+ public static final String SOURCE_HANDLE_SERIALIZE_TSBLOCK_REMOTE =
+ "source_handle_serialize_tsblock_remote";
+ public static final String SINK_HANDLE_SEND_TSBLOCK_LOCAL = "sink_handle_send_tsblock_local";
+ public static final String SINK_HANDLE_SEND_TSBLOCK_REMOTE = "sink_handle_send_tsblock_remote";
+
+ static {
+ metricInfoMap.put(
+ SOURCE_HANDLE_GET_TSBLOCK_LOCAL,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "source_handle_get_tsblock",
+ Tag.TYPE.toString(),
+ "local"));
+ metricInfoMap.put(
+ SOURCE_HANDLE_GET_TSBLOCK_REMOTE,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "source_handle_get_tsblock",
+ Tag.TYPE.toString(),
+ "remote"));
+ metricInfoMap.put(
+ SOURCE_HANDLE_SERIALIZE_TSBLOCK_LOCAL,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "source_handle_serialize_tsblock",
+ Tag.TYPE.toString(),
+ "local"));
+ metricInfoMap.put(
+ SOURCE_HANDLE_SERIALIZE_TSBLOCK_REMOTE,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "source_handle_serialize_tsblock",
+ Tag.TYPE.toString(),
+ "remote"));
+ metricInfoMap.put(
+ SINK_HANDLE_SEND_TSBLOCK_LOCAL,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "sink_handle_send_tsblock",
+ Tag.TYPE.toString(),
+ "local"));
+ metricInfoMap.put(
+ SINK_HANDLE_SEND_TSBLOCK_REMOTE,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "sink_handle_send_tsblock",
+ Tag.TYPE.toString(),
+ "remote"));
+ }
+
+ public static final String SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER =
+ "send_new_data_block_event_task_caller";
+ public static final String SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER =
+ "send_new_data_block_event_task_server";
+ public static final String ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER =
+ "on_acknowledge_data_block_event_task_caller";
+ public static final String ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER =
+ "on_acknowledge_data_block_event_task_server";
+ public static final String GET_DATA_BLOCK_TASK_CALLER = "get_data_block_task_caller";
+ public static final String GET_DATA_BLOCK_TASK_SERVER = "get_data_block_task_server";
+
+ static {
+ metricInfoMap.put(
+ SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "send_new_data_block_event_task",
+ Tag.TYPE.toString(),
+ "caller"));
+ metricInfoMap.put(
+ SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "send_new_data_block_event_task",
+ Tag.TYPE.toString(),
+ "server"));
+ metricInfoMap.put(
+ ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "on_acknowledge_data_block_event_task",
+ Tag.TYPE.toString(),
+ "caller"));
+ metricInfoMap.put(
+ ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "on_acknowledge_data_block_event_task",
+ Tag.TYPE.toString(),
+ "server"));
+ metricInfoMap.put(
+ GET_DATA_BLOCK_TASK_CALLER,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "get_data_block_task",
+ Tag.TYPE.toString(),
+ "caller"));
+ metricInfoMap.put(
+ GET_DATA_BLOCK_TASK_SERVER,
+ new MetricInfo(
+ MetricType.TIMER,
+ metric,
+ Tag.OPERATION.toString(),
+ "get_data_block_task",
+ Tag.TYPE.toString(),
+ "server"));
+ }
+
+ public static final String GET_DATA_BLOCK_NUM = "get_data_block_num";
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ for (MetricInfo metricInfo : metricInfoMap.values()) {
+ metricService.getOrCreateTimer(
+ metricInfo.getName(), MetricLevel.IMPORTANT, metricInfo.getTagsInArray());
+ }
+ metricService.getOrCreateRate(
+ metric, MetricLevel.IMPORTANT, Tag.NAME.toString(), GET_DATA_BLOCK_NUM);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ for (MetricInfo metricInfo : metricInfoMap.values()) {
+ metricService.remove(MetricType.TIMER, metric, metricInfo.getTagsInArray());
+ }
+ metricService.remove(MetricType.RATE, metric, Tag.NAME.toString(), GET_DATA_BLOCK_NUM);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
index 6d6146c90d..a90f9b7241 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/metric/QueryMetricsManager.java
@@ -27,6 +27,8 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.db.mpp.metric.DataExchangeMetricSet.GET_DATA_BLOCK_NUM;
+
public class QueryMetricsManager {
private final MetricService metricService = MetricService.getInstance();
@@ -85,6 +87,25 @@ public class QueryMetricsManager {
count, Metric.QUERY_RESOURCE.toString(), MetricLevel.IMPORTANT, Tag.TYPE.toString(), type);
}
+ public void recordDataExchangeCost(String stage, long costTimeInNanos) {
+ MetricInfo metricInfo = DataExchangeMetricSet.metricInfoMap.get(stage);
+ metricService.timer(
+ costTimeInNanos,
+ TimeUnit.NANOSECONDS,
+ metricInfo.getName(),
+ MetricLevel.IMPORTANT,
+ metricInfo.getTagsInArray());
+ }
+
+ public void recordDataBlockNum(int num) {
+ metricService.rate(
+ num,
+ Metric.DATA_EXCHANGE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ GET_DATA_BLOCK_NUM);
+ }
+
public static QueryMetricsManager getInstance() {
return QueryMetricsManager.QueryMetricsManagerHolder.INSTANCE;
}