You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/03/04 13:52:37 UTC

[iotdb] 01/01: [IOTDB-5613] Remove unnecessary serialization in IoTConsensus when replicaNum is 1 to improve write performance (#9204)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5613
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3d03d468ea5853ab325f9390075c0f8e57814db1
Author: Potato <ta...@apache.org>
AuthorDate: Fri Mar 3 23:13:34 2023 +0800

    [IOTDB-5613] Remove unnecessary serialization in IoTConsensus when replicaNum is 1 to improve write performance (#9204)
    
    * finish
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * remove unused code
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * add wal & UT buildSerializedRequest
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * fix UT
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    ---------
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../common/request/IndexedConsensusRequest.java    | 19 ++++++++-----
 .../consensus/iot/IoTConsensusServerImpl.java      |  5 ++--
 .../consensus/iot/logdispatcher/LogDispatcher.java | 33 +++++++++++++---------
 .../consensus/iot/util/FakeConsensusReqReader.java |  4 ++-
 4 files changed, 36 insertions(+), 25 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 0be89ce1be..58789dbd0e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -32,19 +32,14 @@ public class IndexedConsensusRequest implements IConsensusRequest {
 
   private final long syncIndex;
   private final List<IConsensusRequest> requests;
-  private final List<ByteBuffer> serializedRequests = new ArrayList<>();
+  private final List<ByteBuffer> serializedRequests;
   private long serializedSize = 0;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
     this.requests = requests;
     this.syncIndex = -1L;
-    this.requests.forEach(
-        r -> {
-          ByteBuffer buffer = r.serializeToByteBuffer();
-          this.serializedRequests.add(buffer);
-          this.serializedSize += buffer.capacity();
-        });
+    this.serializedRequests = new ArrayList<>(requests.size());
   }
 
   public IndexedConsensusRequest(
@@ -52,6 +47,16 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     this.searchIndex = searchIndex;
     this.requests = requests;
     this.syncIndex = syncIndex;
+    this.serializedRequests = new ArrayList<>(requests.size());
+  }
+
+  public void buildSerializedRequests() {
+    this.requests.forEach(
+        r -> {
+          ByteBuffer buffer = r.serializeToByteBuffer();
+          this.serializedRequests.add(buffer);
+          this.serializedSize += buffer.capacity();
+        });
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 2ab9746a31..16e378cde4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -199,8 +199,6 @@ public class IoTConsensusServerImpl {
         }
       }
       long writeToStateMachineStartTime = System.nanoTime();
-      IndexedConsensusRequest indexedConsensusRequest =
-          buildIndexedConsensusRequestForLocalRequest(request);
       // statistic the time of checking write block
       MetricService.getInstance()
           .getOrCreateHistogram(
@@ -213,6 +211,8 @@ public class IoTConsensusServerImpl {
               Tag.REGION.toString(),
               this.consensusGroupId)
           .update(writeToStateMachineStartTime - getStateMachineLockTime);
+      IndexedConsensusRequest indexedConsensusRequest =
+          buildIndexedConsensusRequestForLocalRequest(request);
       if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
         logger.info(
             "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
@@ -220,7 +220,6 @@ public class IoTConsensusServerImpl {
             getCurrentSafelyDeletedSearchIndex(),
             indexedConsensusRequest.getSearchIndex());
       }
-      // TODO wal and memtable
       IConsensusRequest planNode = stateMachine.deserializeRequest(indexedConsensusRequest);
       TSStatus result = stateMachine.write(planNode);
       long writeToStateMachineEndTime = System.nanoTime();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 68a988558e..f6da0efd52 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -156,22 +156,26 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
-    synchronized (this) {
-      threads.forEach(
-          thread -> {
-            logger.debug(
-                "{}->{}: Push a log to the queue, where the queue length is {}",
-                impl.getThisNode().getGroupId(),
-                thread.getPeer().getEndpoint().getIp(),
-                thread.getPendingEntriesSize());
-            if (!thread.offer(request)) {
+    // we don't need to serialize and offer request when replicaNum is 1.
+    if (!threads.isEmpty()) {
+      request.buildSerializedRequests();
+      synchronized (this) {
+        threads.forEach(
+            thread -> {
               logger.debug(
-                  "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+                  "{}->{}: Push a log to the queue, where the queue length is {}",
                   impl.getThisNode().getGroupId(),
-                  thread.getPeer(),
-                  request.getSearchIndex());
-            }
-          });
+                  thread.getPeer().getEndpoint().getIp(),
+                  thread.getPendingEntriesSize());
+              if (!thread.offer(request)) {
+                logger.debug(
+                    "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+                    impl.getThisNode().getGroupId(),
+                    thread.getPeer(),
+                    request.getSearchIndex());
+              }
+            });
+      }
     }
   }
 
@@ -510,6 +514,7 @@ public class LogDispatcher {
           }
         }
         targetIndex = data.getSearchIndex() + 1;
+        data.buildSerializedRequests();
         // construct request from wal
         logBatches.addTLogEntry(
             new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true));
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
index c2560201a5..3a8c8e4e8c 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
@@ -53,6 +53,7 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
   }
 
   private class FakeConsensusReqIterator implements ConsensusReqReader.ReqIterator {
+
     private long nextSearchIndex;
 
     public FakeConsensusReqIterator(long startIndex) {
@@ -70,7 +71,8 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
         for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
           if (indexedConsensusRequest.getSearchIndex() == nextSearchIndex) {
             nextSearchIndex++;
-            return indexedConsensusRequest;
+            return new IndexedConsensusRequest(
+                indexedConsensusRequest.getSearchIndex(), indexedConsensusRequest.getRequests());
           }
         }
         return null;