You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/03 08:57:20 UTC

[iotdb] branch ml_test_1_async created (now 2126e2febc)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch ml_test_1_async
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 2126e2febc leverage client insert thread to do serialization of request for dispatching

This branch includes the following new commits:

     new 2126e2febc leverage client insert thread to do serialization of request for dispatching

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: leverage client insert thread to do serialization of request for dispatching

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_test_1_async
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2126e2febc731d657ca6a112b87bc9cbd13a9eb6
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Aug 3 16:56:58 2022 +0800

    leverage client insert thread to do serialization of request for dispatching
---
 .../common/request/IndexedConsensusRequest.java    | 19 +++++++++++++-
 .../multileader/logdispatcher/LogDispatcher.java   | 29 +++++++++++++++++++---
 2 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index de3aca433b..1c004264dd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 
@@ -29,13 +30,19 @@ public class IndexedConsensusRequest implements IConsensusRequest {
   /** we do not need to serialize these two fields as they are useless in other nodes. */
   private final long searchIndex;
 
-  private final List<IConsensusRequest> requests;
+  private List<IConsensusRequest> requests;
+  private List<ByteBuffer> serializedRequests;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
     this.requests = requests;
   }
 
+  public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) {
+    this.searchIndex = searchIndex;
+    this.serializedRequests = serializedRequests;
+  }
+
   @Override
   public ByteBuffer serializeToByteBuffer() {
     throw new UnsupportedOperationException();
@@ -49,6 +56,16 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return searchIndex;
   }
 
+  public List<ByteBuffer> getSerializedRequests() {
+    return serializedRequests;
+  }
+
+  public List<ByteBuffer> buildSerializedRequests() {
+    List<ByteBuffer> result = new LinkedList<>();
+    this.requests.forEach(r -> result.add(r.serializeToByteBuffer()));
+    return result;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ed41ec5e16..da5ad41aaa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -106,17 +107,27 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
+    List<ByteBuffer> serializedRequest = request.buildSerializedRequests();
     threads.forEach(
         thread -> {
           logger.debug(
               "{}: Push a log to the queue, where the queue length is {}",
               impl.getThisNode().getGroupId(),
               thread.getPendingRequest().size());
-          if (!thread.getPendingRequest().offer(request)) {
+          if (!thread
+              .getPendingRequest()
+              .offer(new IndexedConsensusRequest(serializedRequest, request.getSearchIndex()))) {
+            logger.info(
+                "{}: Log queue to {} is full. skip current request: {}",
+                impl.getThisNode().getGroupId(),
+                thread.getPeer().getEndpoint().getIp(),
+                request.getSearchIndex());
             logger.debug(
                 "{}: Log queue of {} is full, ignore the log to this node",
                 impl.getThisNode().getGroupId(),
                 thread.getPeer());
+          } else {
+            thread.countQueueUsage(request.getSearchIndex());
           }
         });
   }
@@ -139,6 +150,7 @@ public class LogDispatcher {
 
     private ConsensusReqReader.ReqIterator walEntryiterator;
     private long iteratorIndex = 1;
+    private long queueProcessCount = 0;
 
     public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
       this.peer = peer;
@@ -156,6 +168,16 @@ public class LogDispatcher {
       return controller;
     }
 
+    public void countQueueUsage(long searchIndex) {
+      this.queueProcessCount++;
+      logger.info(
+          "{}: queue to {}: put request to queue. count: {}, searchIndex {}",
+          impl.getThisNode().getGroupId(),
+          getPeer().getEndpoint().getIp(),
+          this.queueProcessCount,
+          searchIndex);
+    }
+
     public long getCurrentSyncIndex() {
       return controller.getCurrentIndex();
     }
@@ -355,9 +377,8 @@ public class LogDispatcher {
 
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, List<TLogBatch> logBatches) {
-      for (IConsensusRequest innerRequest : request.getRequests()) {
-        logBatches.add(
-            new TLogBatch(innerRequest.serializeToByteBuffer(), request.getSearchIndex(), false));
+      for (ByteBuffer innerRequest : request.getSerializedRequests()) {
+        logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false));
       }
     }
   }