You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/13 03:47:52 UTC
[iotdb] 01/03: Add metrics for session
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch QueryMetrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bbb7fb74755b8aadc03f90341cda0081e26c544b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun Nov 13 10:15:09 2022 +0800
Add metrics for session
---
.../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 3 +
.../src/main/java/org/apache/iotdb/rpc/RpcRT.java | 75 ++++++++++++++++++++++
.../apache/iotdb/session/SessionConnection.java | 4 ++
3 files changed, 82 insertions(+)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
index 5e78af7bce..e1305baa6f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java
@@ -297,6 +297,7 @@ public class IoTDBRpcDataSet {
public boolean fetchResults() throws StatementExecutionException, IoTDBConnectionException {
TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
req.setTimeout(timeout);
+ long startTime = System.nanoTime();
try {
TSFetchResultsResp resp = client.fetchResultsV2(req);
RpcUtils.verifySuccess(resp.getStatus());
@@ -317,6 +318,8 @@ public class IoTDBRpcDataSet {
} catch (TException e) {
throw new IoTDBConnectionException(
"Cannot fetch result from server, because of network connection: {} ", e);
+ } finally {
+ RpcRT.getInstance().addCost(System.nanoTime() - startTime);
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcRT.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcRT.java
new file mode 100644
index 0000000000..24309f059a
--- /dev/null
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcRT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.rpc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class RpcRT {
+
+ private static final long RPC_RT_PRINT_INTERVAL_IN_MS = 5_000;
+
+ private static final Logger RPC_RT_LOGGER = LoggerFactory.getLogger(RpcRT.class);
+
+ private final AtomicLong totalTime;
+ private final AtomicLong totalCount;
+
+ private RpcRT() {
+ this.totalTime = new AtomicLong(0);
+ this.totalCount = new AtomicLong(0);
+
+ ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
+ scheduledExecutor.scheduleAtFixedRate(
+ this::printRpcRTStatistics,
+ 2 * RPC_RT_PRINT_INTERVAL_IN_MS,
+ RPC_RT_PRINT_INTERVAL_IN_MS,
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void printRpcRTStatistics() {
+ long totalTime = this.totalTime.get() / 1_000;
+ long totalCount = this.totalCount.get();
+ RPC_RT_LOGGER.info(
+ "rpc total time: {}us, rpc total count: {}, rpc avg time: {}us",
+ totalTime,
+ totalCount,
+ (totalTime / totalCount));
+ }
+
+ public void addCost(long costTimeInNanos) {
+ totalTime.addAndGet(costTimeInNanos);
+ totalCount.incrementAndGet();
+ }
+
+ public static RpcRT getInstance() {
+ return RpcRTHolder.INSTANCE;
+ }
+
+ private static class RpcRTHolder {
+
+ private static final RpcRT INSTANCE = new RpcRT();
+
+ private RpcRTHolder() {}
+ }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index bd2ca927bc..d162b15a6b 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.RpcRT;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -347,6 +348,7 @@ public class SessionConnection {
execReq.setFetchSize(session.fetchSize);
execReq.setTimeout(timeout);
TSExecuteStatementResp execResp;
+ long startTime = System.nanoTime();
try {
execReq.setEnableRedirectQuery(enableRedirect);
execResp = client.executeQueryStatementV2(execReq);
@@ -363,6 +365,8 @@ public class SessionConnection {
} else {
throw new IoTDBConnectionException(logForReconnectionFailure());
}
+ } finally {
+ RpcRT.getInstance().addCost(System.nanoTime() - startTime);
}
RpcUtils.verifySuccess(execResp.getStatus());