You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/15 03:10:25 UTC

[iotdb] 01/01: enhance TLogBatch struct

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c7944ce18bfe9ccd2d05746a3482cfcf9942225a
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Tue Nov 15 11:08:30 2022 +0800

    enhance TLogBatch struct
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../common/request/IndexedConsensusRequest.java    | 15 ++++-----
 .../multileader/logdispatcher/LogDispatcher.java   | 16 +++------
 .../multileader/logdispatcher/PendingBatch.java    |  4 ++-
 .../service/MultiLeaderRPCServiceProcessor.java    | 38 +++++++---------------
 .../src/main/thrift/mutlileader.thrift             |  2 +-
 5 files changed, 25 insertions(+), 50 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index ca236da1c3..6b0cb2634d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -39,6 +39,12 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     this.searchIndex = searchIndex;
     this.requests = requests;
     this.syncIndex = -1L;
+    this.requests.forEach(
+        r -> {
+          ByteBuffer buffer = r.serializeToByteBuffer();
+          this.serializedRequests.add(buffer);
+          this.serializedSize += buffer.capacity();
+        });
   }
 
   public IndexedConsensusRequest(
@@ -61,15 +67,6 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     return serializedRequests;
   }
 
-  public void buildSerializedRequests() {
-    this.requests.forEach(
-        r -> {
-          ByteBuffer buffer = r.serializeToByteBuffer();
-          this.serializedRequests.add(buffer);
-          this.serializedSize += buffer.capacity();
-        });
-  }
-
   public long getSerializedSize() {
     return serializedSize;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 8c7a443d3c..f0b4791ea8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -44,7 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -156,9 +154,6 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
-    request.buildSerializedRequests();
-    // we put the serialization step outside the synchronized block because it is stateless and
-    // time-consuming
     synchronized (this) {
       threads.forEach(
           thread -> {
@@ -500,18 +495,15 @@ public class LogDispatcher {
         }
         targetIndex = data.getSearchIndex() + 1;
         // construct request from wal
-        for (IConsensusRequest innerRequest : data.getRequests()) {
-          logBatches.addTLogBatch(
-              new TLogBatch(innerRequest.serializeToByteBuffer(), data.getSearchIndex(), true));
-        }
+        logBatches.addTLogBatch(
+            new TLogBatch(data.getSerializedRequests(), data.getSearchIndex(), true));
       }
     }
 
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, PendingBatch logBatches) {
-      for (ByteBuffer innerRequest : request.getSerializedRequests()) {
-        logBatches.addTLogBatch(new TLogBatch(innerRequest, request.getSearchIndex(), false));
-      }
+      logBatches.addTLogBatch(
+          new TLogBatch(request.getSerializedRequests(), request.getSearchIndex(), false));
     }
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
index 5d4685b19e..dbfb52d515 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.logdispatcher;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
 
+import java.nio.Buffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -55,7 +56,8 @@ public class PendingBatch {
   public void addTLogBatch(TLogBatch batch) {
     batches.add(batch);
     // TODO Maybe we need to add in additional fields for more accurate calculations
-    serializedSize += batch.getData() == null ? 0 : batch.getData().length;
+    serializedSize +=
+        batch.getData() == null ? 0 : batch.getData().stream().mapToInt(Buffer::capacity).sum();
   }
 
   public boolean canAccumulate() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 5f90cfffb5..67bceadd12 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
@@ -56,9 +55,8 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
+import java.util.stream.Collectors;
 
 public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface {
 
@@ -104,30 +102,16 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ
       BatchIndexedConsensusRequest requestsInThisBatch =
           new BatchIndexedConsensusRequest(req.peerId);
       // We use synchronized to ensure atomicity of executing multiple logs
-      if (!req.getBatches().isEmpty()) {
-        List<IConsensusRequest> consensusRequests = new ArrayList<>();
-        long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
-        for (TLogBatch batch : req.getBatches()) {
-          IConsensusRequest request =
-              batch.isFromWAL()
-                  ? new MultiLeaderConsensusRequest(batch.data)
-                  : new ByteBufferConsensusRequest(batch.data);
-          // merge TLogBatch with same search index into one request
-          if (batch.getSearchIndex() != currentSearchIndex) {
-            requestsInThisBatch.add(
-                impl.buildIndexedConsensusRequestForRemoteRequest(
-                    currentSearchIndex, consensusRequests));
-            consensusRequests = new ArrayList<>();
-            currentSearchIndex = batch.getSearchIndex();
-          }
-          consensusRequests.add(request);
-        }
-        // write last request
-        if (!consensusRequests.isEmpty()) {
-          requestsInThisBatch.add(
-              impl.buildIndexedConsensusRequestForRemoteRequest(
-                  currentSearchIndex, consensusRequests));
-        }
+      for (TLogBatch batch : req.getBatches()) {
+        requestsInThisBatch.add(
+            impl.buildIndexedConsensusRequestForRemoteRequest(
+                batch.getSearchIndex(),
+                batch.getData().stream()
+                    .map(
+                        batch.isFromWAL()
+                            ? MultiLeaderConsensusRequest::new
+                            : ByteBufferConsensusRequest::new)
+                    .collect(Collectors.toList())));
       }
       TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
       logger.debug(
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 0336c19303..28e9047178 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -21,7 +21,7 @@ include "common.thrift"
 namespace java org.apache.iotdb.consensus.multileader.thrift
 
 struct TLogBatch {
-  1: required binary data
+  1: required list<binary> data
   2: required i64 searchIndex
   3: required bool fromWAL
 }