You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/14 08:54:42 UTC
[iotdb] branch iotdb-3791 updated: add stepTracker
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch iotdb-3791
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/iotdb-3791 by this push:
new 7d40cfdb8e add stepTracker
7d40cfdb8e is described below
commit 7d40cfdb8ed0391f00715d723d3356e5c294890d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Jul 14 16:54:20 2022 +0800
add stepTracker
---
.../multileader/client/DispatchLogHandler.java | 4 ++++
.../multileader/logdispatcher/LogDispatcher.java | 10 ++++++++
.../multileader/util/TestStateMachine.java | 7 ++----
.../org/apache/iotdb/commons}/StepTracker.java | 27 ++++++++++++++++++----
4 files changed, 38 insertions(+), 10 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index b4cefed078..318896b0c3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.multileader.client;
+import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -37,14 +38,17 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> {
private final LogDispatcherThread thread;
private final PendingBatch batch;
private int retryCount;
+ private long startTime;
public DispatchLogHandler(LogDispatcherThread thread, PendingBatch batch) {
this.thread = thread;
this.batch = batch;
+ this.startTime = System.nanoTime();
}
@Override
public void onComplete(TSyncLogRes response) {
+ StepTracker.trace("asyncSendBatch", 10, startTime, System.nanoTime());
if (response.getStatus().size() == 1
&& response.getStatus().get(0).getCode()
== TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 8ab54482f1..d8e0f3f64e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.multileader.logdispatcher;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
@@ -186,6 +187,7 @@ public class LogDispatcher {
try {
PendingBatch batch;
while (!Thread.interrupted() && !stopped) {
+ long getBatchStartTime = System.nanoTime();
while ((batch = getBatch()).isEmpty()) {
// we may block here if there is no requests in the queue
IndexedConsensusRequest request =
@@ -200,6 +202,7 @@ public class LogDispatcher {
}
// we may block here if the synchronization pipeline is full
syncStatus.addNextBatch(batch);
+ StepTracker.trace("getBatch()", 10, getBatchStartTime, System.nanoTime());
// sends batch asynchronously and migrates the retry logic into the callback handler
sendBatchAsync(batch, new DispatchLogHandler(this, batch));
}
@@ -330,16 +333,23 @@ public class LogDispatcher {
&& logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
logger.debug("construct from WAL for one Entry, index : {}", currentIndex);
try {
+ long waitForNextStartTime = System.nanoTime();
walEntryiterator.waitForNextReady();
+ StepTracker.trace(
+ "walEntryiterator.waitForNextReady()", 400, waitForNextStartTime, System.nanoTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("wait for next WAL entry is interrupted");
}
+ long nextStartTime = System.nanoTime();
IndexedConsensusRequest data = walEntryiterator.next();
+ StepTracker.trace("walEntryiterator.next()", 400, nextStartTime, System.nanoTime());
currentIndex = data.getSearchIndex();
iteratorIndex = currentIndex;
for (IConsensusRequest innerRequest : data.getRequests()) {
+ long newTlogBatchStartTime = System.nanoTime();
logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), true));
+ StepTracker.trace("newTLogBatch", 400, newTlogBatchStartTime, System.nanoTime());
}
if (currentIndex == maxIndex - 1) {
break;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index 176bf534cb..eab940cf87 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.rpc.RpcUtils;
@@ -72,10 +71,8 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
IndexedConsensusRequest indexedConsensusRequest = (IndexedConsensusRequest) request;
List<IConsensusRequest> transformedRequest = new ArrayList<>();
for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
- if (innerRequest instanceof MultiLeaderConsensusRequest) {
- ByteBuffer buffer = innerRequest.serializeToByteBuffer();
- transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
- }
+ ByteBuffer buffer = innerRequest.serializeToByteBuffer();
+ transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
}
requestSets.add(
new IndexedConsensusRequest(indexedConsensusRequest.getSearchIndex(), transformedRequest),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java b/node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
index b5e1fc88ad..1aef5cf11a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.commons;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,18 +27,24 @@ import java.util.Map;
class Metric {
private static final Logger logger = LoggerFactory.getLogger(Metric.class);
- private static final int PRINT_RATE = 1000;
+ private static final int DEFAULT_PRINT_RATE = 1000;
public String stepName;
public long invokeCount;
public long totalTime;
public long lastCycleTime;
+ public int printRate;
public Metric(String stepName) {
+ this(stepName, DEFAULT_PRINT_RATE);
+ }
+
+ public Metric(String stepName, int printRate) {
this.stepName = stepName;
this.invokeCount = 0;
this.totalTime = 0;
this.lastCycleTime = 0;
+ this.printRate = printRate;
}
public void trace(long startTime, long endTime) {
@@ -47,7 +53,7 @@ class Metric {
}
public void tryPrint() {
- if (invokeCount % PRINT_RATE == 0) {
+ if (invokeCount % printRate == 0) {
logger.info(
String.format(
"step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, Last%dAVG: %fms",
@@ -56,8 +62,8 @@ class Metric {
invokeCount,
totalTime * 1.0 / 1000000,
totalTime * 1.0 / 1000000 / invokeCount,
- PRINT_RATE,
- (totalTime * 1.0 - lastCycleTime) / 1000000 / PRINT_RATE));
+ printRate,
+ (totalTime * 1.0 - lastCycleTime) / 1000000 / printRate));
lastCycleTime = totalTime;
}
}
@@ -74,6 +80,17 @@ public class StepTracker {
metrics.get().get(stepName).tryPrint();
}
+ public static void trace(String stepName, int printRate, long startTime, long endTime) {
+ if (metrics.get() == null) {
+ metrics.set(new HashMap<>());
+ }
+ metrics
+ .get()
+ .computeIfAbsent(stepName, key -> new Metric(stepName, printRate))
+ .trace(startTime, endTime);
+ metrics.get().get(stepName).tryPrint();
+ }
+
public static void cleanup() {
metrics.set(null);
}