You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/08/02 09:12:29 UTC

[iotdb] branch ml_0729_test_exp1 updated: remove serialization operation before dispatch from queue

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0729_test_exp1 by this push:
     new 4ccd7e9cc1 remove serialization operation before dispatch from queue
4ccd7e9cc1 is described below

commit 4ccd7e9cc1ce974f6011bdea08e4b00405d922af
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Aug 2 17:12:18 2022 +0800

    remove serialization operation before dispatch from queue
---
 .../common/request/IndexedConsensusRequest.java      | 20 +++++++++++++++++++-
 .../iotdb/consensus/config/MultiLeaderConfig.java    |  2 +-
 .../multileader/logdispatcher/LogDispatcher.java     | 15 +++++++++++----
 3 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 9c5112fa2b..f90dc6d86b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 
@@ -30,7 +31,8 @@ public class IndexedConsensusRequest implements IConsensusRequest {
   private final long searchIndex;
 
   private final long syncIndex;
-  private final List<IConsensusRequest> requests;
+  private List<IConsensusRequest> requests;
+  private List<ByteBuffer> serializedRequests;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
@@ -38,6 +40,12 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     this.syncIndex = -1L;
   }
 
+  public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) {
+    this.searchIndex = searchIndex;
+    this.serializedRequests = serializedRequests;
+    this.syncIndex = -1L;
+  }
+
   public IndexedConsensusRequest(
       long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
@@ -54,6 +62,16 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return requests;
   }
 
+  public List<ByteBuffer> getSerializedRequests() {
+    return serializedRequests;
+  }
+
+  public List<ByteBuffer> buildSerializedRequests() {
+    List<ByteBuffer> result = new LinkedList<>();
+    this.requests.forEach(r -> this.serializedRequests.add(r.serializeToByteBuffer()));
+    return result;
+  }
+
   public long getSearchIndex() {
     return searchIndex;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index fa6b9db6a5..1a9b4dac9f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -247,7 +247,7 @@ public class MultiLeaderConfig {
     }
 
     public static class Builder {
-      private int maxPendingRequestNumPerNode = 1000;
+      private int maxPendingRequestNumPerNode = 200;
       private int maxRequestPerBatch = 40;
       private int maxPendingBatch = 5;
       private int maxWaitingTimeForAccumulatingBatchInMs = 500;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ed41ec5e16..ab35102254 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -112,7 +113,14 @@ public class LogDispatcher {
               "{}: Push a log to the queue, where the queue length is {}",
               impl.getThisNode().getGroupId(),
               thread.getPendingRequest().size());
-          if (!thread.getPendingRequest().offer(request)) {
+          if (thread.getPendingRequest().size()
+              < thread.config.getReplication().getMaxPendingRequestNumPerNode()) {
+            thread
+                .getPendingRequest()
+                .add(
+                    new IndexedConsensusRequest(
+                        request.buildSerializedRequests(), request.getSearchIndex()));
+          } else {
             logger.debug(
                 "{}: Log queue of {} is full, ignore the log to this node",
                 impl.getThisNode().getGroupId(),
@@ -355,9 +363,8 @@ public class LogDispatcher {
 
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, List<TLogBatch> logBatches) {
-      for (IConsensusRequest innerRequest : request.getRequests()) {
-        logBatches.add(
-            new TLogBatch(innerRequest.serializeToByteBuffer(), request.getSearchIndex(), false));
+      for (ByteBuffer innerRequest : request.getSerializedRequests()) {
+        logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false));
       }
     }
   }