You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/04 07:46:18 UTC

[iotdb] branch ml_test_1_async updated: change offer to put do disable wal

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_test_1_async
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_test_1_async by this push:
     new b3413871ee change offer to put do disable wal
b3413871ee is described below

commit b3413871ee2c6eac4bd0ab297e166961a3d18e1e
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 2 15:46:01 2022 +0800

    change offer to put do disable wal
---
 .../multileader/logdispatcher/LogDispatcher.java   | 40 ++++++++++++++--------
 1 file changed, 26 insertions(+), 14 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index da5ad41aaa..faffa7bf60 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
@@ -114,21 +115,32 @@ public class LogDispatcher {
               "{}: Push a log to the queue, where the queue length is {}",
               impl.getThisNode().getGroupId(),
               thread.getPendingRequest().size());
-          if (!thread
-              .getPendingRequest()
-              .offer(new IndexedConsensusRequest(serializedRequest, request.getSearchIndex()))) {
-            logger.info(
-                "{}: Log queue to {} is full. skip current request: {}",
-                impl.getThisNode().getGroupId(),
-                thread.getPeer().getEndpoint().getIp(),
-                request.getSearchIndex());
-            logger.debug(
-                "{}: Log queue of {} is full, ignore the log to this node",
-                impl.getThisNode().getGroupId(),
-                thread.getPeer());
-          } else {
-            thread.countQueueUsage(request.getSearchIndex());
+          long startTime = System.nanoTime();
+          try {
+            thread
+                .getPendingRequest()
+                .put(new IndexedConsensusRequest(serializedRequest, request.getSearchIndex()));
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } finally {
+            StepTracker.trace("putToQueue", startTime, System.nanoTime());
           }
+          //          if (!thread
+          //              .getPendingRequest()
+          //              .put(new IndexedConsensusRequest(serializedRequest,
+          // request.getSearchIndex()))) {
+          //            logger.info(
+          //                "{}: Log queue to {} is full. skip current request: {}",
+          //                impl.getThisNode().getGroupId(),
+          //                thread.getPeer().getEndpoint().getIp(),
+          //                request.getSearchIndex());
+          //            logger.debug(
+          //                "{}: Log queue of {} is full, ignore the log to this node",
+          //                impl.getThisNode().getGroupId(),
+          //                thread.getPeer());
+          //          } else {
+          //            thread.countQueueUsage(request.getSearchIndex());
+          //          }
         });
   }