You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/04 08:58:14 UTC

[iotdb] branch ml_test_1_async updated: add more metrics for each step in leader and follower

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_test_1_async
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_test_1_async by this push:
     new cf18b64979 add more metrics for each step in leader and follower
cf18b64979 is described below

commit cf18b649798382fc160438ebf565462bbd94b04f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Aug 4 16:57:59 2022 +0800

    add more metrics for each step in leader and follower
---
 .../multileader/MultiLeaderServerImpl.java         |  11 +-
 .../multileader/client/DispatchLogHandler.java     |   3 +
 .../multileader/logdispatcher/LogDispatcher.java   | 146 +++++++++++----------
 .../service/MultiLeaderRPCServiceProcessor.java    |   8 +-
 4 files changed, 95 insertions(+), 73 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index f8a08aa553..0fb33fce8e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -110,8 +110,10 @@ public class MultiLeaderServerImpl {
    * records the index of the log and writes locally, and then asynchronous replication is performed
    */
   public TSStatus write(IConsensusRequest request) {
+    long leaderWriteStartTime = System.nanoTime();
     synchronized (stateMachine) {
-      long startTime = System.nanoTime();
+      StepTracker.trace("LeaderWriteWaitLock", leaderWriteStartTime, System.nanoTime());
+      long startTimeAfterLock = System.nanoTime();
       try {
         IndexedConsensusRequest indexedConsensusRequest =
             buildIndexedConsensusRequestForLocalRequest(request);
@@ -124,6 +126,8 @@ public class MultiLeaderServerImpl {
         }
         // TODO wal and memtable
         TSStatus result = stateMachine.write(indexedConsensusRequest);
+        StepTracker.trace("stateMachineWrite", startTimeAfterLock, System.nanoTime());
+        long offerStartTime = System.nanoTime();
         if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           logDispatcher.offer(indexedConsensusRequest);
         } else {
@@ -134,10 +138,11 @@ public class MultiLeaderServerImpl {
               result.getCode());
           index.decrementAndGet();
         }
-
+        StepTracker.trace("serializeAndOfferToQueue", offerStartTime, System.nanoTime());
         return result;
       } finally {
-        StepTracker.trace("MultiLeaderWrite", startTime, System.nanoTime());
+        StepTracker.trace("MultiLeaderWriteAfterLock", startTimeAfterLock, System.nanoTime());
+        StepTracker.trace("MultiLeaderWriteWhole", leaderWriteStartTime, System.nanoTime());
       }
     }
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index b4cefed078..ab6cfe655f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.multileader.client;
 
+import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -37,6 +38,7 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> {
   private final LogDispatcherThread thread;
   private final PendingBatch batch;
   private int retryCount;
+  private final long startTime = System.nanoTime();
 
   public DispatchLogHandler(LogDispatcherThread thread, PendingBatch batch) {
     this.thread = thread;
@@ -45,6 +47,7 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> {
 
   @Override
   public void onComplete(TSyncLogRes response) {
+    StepTracker.trace("leaderSendUtilResponse", 25, startTime, System.nanoTime());
     if (response.getStatus().size() == 1
         && response.getStatus().get(0).getCode()
             == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 6eb7c2e7de..3c3f5ae113 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
@@ -221,7 +222,9 @@ public class LogDispatcher {
             }
           }
           // we may block here if the synchronization pipeline is full
+          long getBatchSlotStartTime = System.nanoTime();
           syncStatus.addNextBatch(batch);
+          StepTracker.trace("getBatchSlot", getBatchSlotStartTime, System.nanoTime());
           // sends batch asynchronously and migrates the retry logic into the callback handler
           sendBatchAsync(batch, new DispatchLogHandler(this, batch));
         }
@@ -241,82 +244,89 @@ public class LogDispatcher {
     }
 
     public PendingBatch getBatch() {
-      PendingBatch batch;
-      List<TLogBatch> logBatches = new ArrayList<>();
-      long startIndex = syncStatus.getNextSendingIndex();
-      logger.debug("[GetBatch] startIndex: {}", startIndex);
-      long endIndex;
-      if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
-        // Use drainTo instead of poll to reduce lock overhead
-        logger.debug(
-            "{} : pendingRequest Size: {}, bufferedRequest size: {}",
-            impl.getThisNode().getGroupId(),
-            pendingRequest.size(),
-            bufferedRequest.size());
-        pendingRequest.drainTo(
-            bufferedRequest,
-            config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
-        // remove all request that searchIndex < startIndex
-        Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
-        while (iterator.hasNext()) {
-          IndexedConsensusRequest request = iterator.next();
-          if (request.getSearchIndex() < startIndex) {
-            iterator.remove();
-          } else {
-            break;
+      long getBatchStartTime = System.nanoTime();
+      try {
+        PendingBatch batch;
+        List<TLogBatch> logBatches = new ArrayList<>();
+        long startIndex = syncStatus.getNextSendingIndex();
+        logger.debug("[GetBatch] startIndex: {}", startIndex);
+        long endIndex;
+        if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) {
+          // Use drainTo instead of poll to reduce lock overhead
+          logger.debug(
+              "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+              impl.getThisNode().getGroupId(),
+              pendingRequest.size(),
+              bufferedRequest.size());
+          pendingRequest.drainTo(
+              bufferedRequest,
+              config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size());
+          // remove all request that searchIndex < startIndex
+          Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+          while (iterator.hasNext()) {
+            IndexedConsensusRequest request = iterator.next();
+            if (request.getSearchIndex() < startIndex) {
+              iterator.remove();
+            } else {
+              break;
+            }
           }
         }
-      }
-      if (bufferedRequest.isEmpty()) { // only execute this after a restart
-        endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, logBatches);
-        batch = new PendingBatch(startIndex, endIndex, logBatches);
-        logger.debug(
-            "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
-      } else {
-        // Notice that prev searchIndex >= startIndex
-        Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
-        IndexedConsensusRequest prev = iterator.next();
-        // Prevents gap between logs. For example, some requests are not written into the queue when
-        // the queue is full. In this case, requests need to be loaded from the WAL
-        endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
-        if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
+        if (bufferedRequest.isEmpty()) { // only execute this after a restart
+          endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, logBatches);
           batch = new PendingBatch(startIndex, endIndex, logBatches);
-          logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
-          return batch;
-        }
-        constructBatchIndexedFromConsensusRequest(prev, logBatches);
-        endIndex = prev.getSearchIndex();
-        iterator.remove();
-        while (iterator.hasNext()
-            && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) {
-          IndexedConsensusRequest current = iterator.next();
-          // Prevents gap between logs. For example, some logs are not written into the queue when
+          logger.debug(
+              "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch);
+        } else {
+          // Notice that prev searchIndex >= startIndex
+          Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+          IndexedConsensusRequest prev = iterator.next();
+          // Prevents gap between logs. For example, some requests are not written into the queue
+          // when
           // the queue is full. In this case, requests need to be loaded from the WAL
-          if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
-            endIndex =
-                constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches);
-            if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
-              batch = new PendingBatch(startIndex, endIndex, logBatches);
-              logger.debug(
-                  "gap {} : accumulated a {} from queue and wal when gap",
-                  impl.getThisNode().getGroupId(),
-                  batch);
-              return batch;
-            }
+          endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
+          if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
+            batch = new PendingBatch(startIndex, endIndex, logBatches);
+            logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch);
+            return batch;
           }
-          constructBatchIndexedFromConsensusRequest(current, logBatches);
-          endIndex = current.getSearchIndex();
-          prev = current;
-          // We might not be able to remove all the elements in the bufferedRequest in the
-          // current function, but that's fine, we'll continue processing these elements in the
-          // bufferedRequest the next time we go into the function, they're never lost
+          constructBatchIndexedFromConsensusRequest(prev, logBatches);
+          endIndex = prev.getSearchIndex();
           iterator.remove();
+          while (iterator.hasNext()
+              && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) {
+            IndexedConsensusRequest current = iterator.next();
+            // Prevents gap between logs. For example, some logs are not written into the queue when
+            // the queue is full. In this case, requests need to be loaded from the WAL
+            if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
+              endIndex =
+                  constructBatchFromWAL(
+                      prev.getSearchIndex(), current.getSearchIndex(), logBatches);
+              if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) {
+                batch = new PendingBatch(startIndex, endIndex, logBatches);
+                logger.debug(
+                    "gap {} : accumulated a {} from queue and wal when gap",
+                    impl.getThisNode().getGroupId(),
+                    batch);
+                return batch;
+              }
+            }
+            constructBatchIndexedFromConsensusRequest(current, logBatches);
+            endIndex = current.getSearchIndex();
+            prev = current;
+            // We might not be able to remove all the elements in the bufferedRequest in the
+            // current function, but that's fine, we'll continue processing these elements in the
+            // bufferedRequest the next time we go into the function, they're never lost
+            iterator.remove();
+          }
+          batch = new PendingBatch(startIndex, endIndex, logBatches);
+          logger.debug(
+              "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch);
         }
-        batch = new PendingBatch(startIndex, endIndex, logBatches);
-        logger.debug(
-            "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch);
+        return batch;
+      } finally {
+        StepTracker.trace("getBatch()", 25, getBatchStartTime, System.nanoTime());
       }
-      return batch;
     }
 
     public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index b2018db1d8..c6d675be5e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -53,7 +53,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
 
   @Override
   public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) {
-    long startTime = System.nanoTime();
+    long syncLogStartTime = System.nanoTime();
     try {
       ConsensusGroupId groupId =
           ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -83,18 +83,22 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
                     : new ByteBufferConsensusRequest(batch.data);
             // merge TLogBatch with same search index into one request
             if (batch.getSearchIndex() != currentSearchIndex) {
+              long followerWriteStartTime = System.nanoTime();
               statuses.add(
                   impl.getStateMachine()
                       .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+              StepTracker.trace("followerWrite", followerWriteStartTime, System.nanoTime());
               consensusRequests = new ArrayList<>();
             }
             consensusRequests.add(request);
           }
           // write last request
           if (!consensusRequests.isEmpty()) {
+            long followerWriteStartTime = System.nanoTime();
             statuses.add(
                 impl.getStateMachine()
                     .write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+            StepTracker.trace("followerWrite", followerWriteStartTime, System.nanoTime());
           }
           StepTracker.trace("stateMachineWriteBatch", stateMachineStartTime, System.nanoTime());
         }
@@ -104,7 +108,7 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
     } catch (Exception e) {
       resultHandler.onError(e);
     } finally {
-      StepTracker.trace("syncLog", 25, startTime, System.nanoTime());
+      StepTracker.trace("syncLog", 25, syncLogStartTime, System.nanoTime());
     }
   }