You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/14 08:54:42 UTC

[iotdb] branch iotdb-3791 updated: add stepTracker

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch iotdb-3791
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/iotdb-3791 by this push:
     new 7d40cfdb8e add stepTracker
7d40cfdb8e is described below

commit 7d40cfdb8ed0391f00715d723d3356e5c294890d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Jul 14 16:54:20 2022 +0800

    add stepTracker
---
 .../multileader/client/DispatchLogHandler.java     |  4 ++++
 .../multileader/logdispatcher/LogDispatcher.java   | 10 ++++++++
 .../multileader/util/TestStateMachine.java         |  7 ++----
 .../org/apache/iotdb/commons}/StepTracker.java     | 27 ++++++++++++++++++----
 4 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index b4cefed078..318896b0c3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.multileader.client;
 
+import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -37,14 +38,17 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> {
   private final LogDispatcherThread thread;
   private final PendingBatch batch;
   private int retryCount;
+  private long startTime;
 
   public DispatchLogHandler(LogDispatcherThread thread, PendingBatch batch) {
     this.thread = thread;
     this.batch = batch;
+    this.startTime = System.nanoTime();
   }
 
   @Override
   public void onComplete(TSyncLogRes response) {
+    StepTracker.trace("asyncSendBatch", 10, startTime, System.nanoTime());
     if (response.getStatus().size() == 1
         && response.getStatus().get(0).getCode()
             == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 8ab54482f1..d8e0f3f64e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
@@ -186,6 +187,7 @@ public class LogDispatcher {
       try {
         PendingBatch batch;
         while (!Thread.interrupted() && !stopped) {
+          long getBatchStartTime = System.nanoTime();
           while ((batch = getBatch()).isEmpty()) {
             // we may block here if there is no requests in the queue
             IndexedConsensusRequest request =
@@ -200,6 +202,7 @@ public class LogDispatcher {
           }
           // we may block here if the synchronization pipeline is full
           syncStatus.addNextBatch(batch);
+          StepTracker.trace("getBatch()", 10, getBatchStartTime, System.nanoTime());
           // sends batch asynchronously and migrates the retry logic into the callback handler
           sendBatchAsync(batch, new DispatchLogHandler(this, batch));
         }
@@ -330,16 +333,23 @@ public class LogDispatcher {
           && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) {
         logger.debug("construct from WAL for one Entry, index : {}", currentIndex);
         try {
+          long waitForNextStartTime = System.nanoTime();
           walEntryiterator.waitForNextReady();
+          StepTracker.trace(
+              "walEntryiterator.waitForNextReady()", 400, waitForNextStartTime, System.nanoTime());
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("wait for next WAL entry is interrupted");
         }
+        long nextStartTime = System.nanoTime();
         IndexedConsensusRequest data = walEntryiterator.next();
+        StepTracker.trace("walEntryiterator.next()", 400, nextStartTime, System.nanoTime());
         currentIndex = data.getSearchIndex();
         iteratorIndex = currentIndex;
         for (IConsensusRequest innerRequest : data.getRequests()) {
+          long newTlogBatchStartTime = System.nanoTime();
           logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), true));
+          StepTracker.trace("newTLogBatch", 400, newTlogBatchStartTime, System.nanoTime());
         }
         if (currentIndex == maxIndex - 1) {
           break;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index 176bf534cb..eab940cf87 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -72,10 +71,8 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
       IndexedConsensusRequest indexedConsensusRequest = (IndexedConsensusRequest) request;
       List<IConsensusRequest> transformedRequest = new ArrayList<>();
       for (IConsensusRequest innerRequest : indexedConsensusRequest.getRequests()) {
-        if (innerRequest instanceof MultiLeaderConsensusRequest) {
-          ByteBuffer buffer = innerRequest.serializeToByteBuffer();
-          transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
-        }
+        ByteBuffer buffer = innerRequest.serializeToByteBuffer();
+        transformedRequest.add(new TestEntry(buffer.getInt(), Peer.deserialize(buffer)));
       }
       requestSets.add(
           new IndexedConsensusRequest(indexedConsensusRequest.getSearchIndex(), transformedRequest),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java b/node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
index b5e1fc88ad..1aef5cf11a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/StepTracker.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan;
+package org.apache.iotdb.commons;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,18 +27,24 @@ import java.util.Map;
 
 class Metric {
   private static final Logger logger = LoggerFactory.getLogger(Metric.class);
-  private static final int PRINT_RATE = 1000;
+  private static final int DEFAULT_PRINT_RATE = 1000;
 
   public String stepName;
   public long invokeCount;
   public long totalTime;
   public long lastCycleTime;
+  public int printRate;
 
   public Metric(String stepName) {
+    this(stepName, DEFAULT_PRINT_RATE);
+  }
+
+  public Metric(String stepName, int printRate) {
     this.stepName = stepName;
     this.invokeCount = 0;
     this.totalTime = 0;
     this.lastCycleTime = 0;
+    this.printRate = printRate;
   }
 
   public void trace(long startTime, long endTime) {
@@ -47,7 +53,7 @@ class Metric {
   }
 
   public void tryPrint() {
-    if (invokeCount % PRINT_RATE == 0) {
+    if (invokeCount % printRate == 0) {
       logger.info(
           String.format(
               "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, Last%dAVG: %fms",
@@ -56,8 +62,8 @@ class Metric {
               invokeCount,
               totalTime * 1.0 / 1000000,
               totalTime * 1.0 / 1000000 / invokeCount,
-              PRINT_RATE,
-              (totalTime * 1.0 - lastCycleTime) / 1000000 / PRINT_RATE));
+              printRate,
+              (totalTime * 1.0 - lastCycleTime) / 1000000 / printRate));
       lastCycleTime = totalTime;
     }
   }
@@ -74,6 +80,17 @@ public class StepTracker {
     metrics.get().get(stepName).tryPrint();
   }
 
+  public static void trace(String stepName, int printRate, long startTime, long endTime) {
+    if (metrics.get() == null) {
+      metrics.set(new HashMap<>());
+    }
+    metrics
+        .get()
+        .computeIfAbsent(stepName, key -> new Metric(stepName, printRate))
+        .trace(startTime, endTime);
+    metrics.get().get(stepName).tryPrint();
+  }
+
   public static void cleanup() {
     metrics.set(null);
   }