You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/03 02:56:45 UTC

[iotdb] branch ml_0729_test_exp1 updated: change to serialize for all write request for multi leader

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0729_test_exp1 by this push:
     new d0cf6a5487 change to serialize for all write request for multi leader
d0cf6a5487 is described below

commit d0cf6a5487d373cc4543d593cf695941d6d42eea
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Aug 3 10:56:33 2022 +0800

    change to serialize for all write request for multi leader
---
 .../multileader/logdispatcher/LogDispatcher.java   | 27 +++++++++++++---------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 6929e27831..e465c16698 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -107,20 +107,17 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
+    List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
     threads.forEach(
         thread -> {
           logger.debug(
               "{}: Push a log to the queue, where the queue length is {}",
               impl.getThisNode().getGroupId(),
               thread.getPendingRequest().size());
-          if (thread.getPendingRequest().size()
-              < thread.config.getReplication().getMaxPendingRequestNumPerNode()) {
-            thread
-                .getPendingRequest()
-                .add(
-                    new IndexedConsensusRequest(
-                        request.buildSerializedRequests(), request.getSearchIndex()));
-            thread.countQueue();
+          if (thread
+              .getPendingRequest()
+              .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) {
+            thread.countQueue(request.getSearchIndex());
           } else {
             logger.debug(
                 "{}: Log queue of {} is full, ignore the log to this node",
@@ -130,6 +127,14 @@ public class LogDispatcher {
         });
   }
 
+  private boolean needPutIntoQueue() {
+    return threads.stream()
+        .anyMatch(
+            t ->
+                t.getPendingRequest().size()
+                    < t.config.getReplication().getMaxPendingRequestNumPerNode());
+  }
+
   public class LogDispatcherThread implements Runnable {
     private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
     private final MultiLeaderConfig config;
@@ -162,12 +167,12 @@ public class LogDispatcher {
       this.walEntryiterator = reader.getReqIterator(iteratorIndex);
     }
 
-    public void countQueue() {
+    public void countQueue(long searchIndex) {
       this.queueCount++;
       logger.info(
           String.format(
-              "DataRegion[%s]->%s: total request from queue - [%d]",
-              peer.getGroupId().getId(), peer.getEndpoint().ip, queueCount));
+              "DataRegion[%s]->%s: total request from queue: [%d], requestIndex: [%d]",
+              peer.getGroupId().getId(), peer.getEndpoint().ip, queueCount, searchIndex));
     }
 
     public IndexController getController() {