You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/03/04 13:52:36 UTC

[iotdb] branch jira5613 created (now 3d03d468ea)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5613
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 3d03d468ea [IOTDB-5613] Remove unnecessary serialization in IoTConsensus when replicaNum is 1 to improve write performance (#9204)

This branch includes the following new commits:

     new 3d03d468ea [IOTDB-5613] Remove unnecessary serialization in IoTConsensus when replicaNum is 1 to improve write performance (#9204)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5613] Remove unnecessary serialization in IoTConsensus when replicaNum is 1 to improve write performance (#9204)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5613
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3d03d468ea5853ab325f9390075c0f8e57814db1
Author: Potato <ta...@apache.org>
AuthorDate: Fri Mar 3 23:13:34 2023 +0800

    [IOTDB-5613] Remove unnecessary serialization in IoTConsensus when replicaNum is 1 to improve write performance (#9204)
    
    * finish
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * remove unused code
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * add wal & UT buildSerializedRequest
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    * fix UT
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
    
    ---------
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../common/request/IndexedConsensusRequest.java    | 19 ++++++++-----
 .../consensus/iot/IoTConsensusServerImpl.java      |  5 ++--
 .../consensus/iot/logdispatcher/LogDispatcher.java | 33 +++++++++++++---------
 .../consensus/iot/util/FakeConsensusReqReader.java |  4 ++-
 4 files changed, 36 insertions(+), 25 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 0be89ce1be..58789dbd0e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -32,19 +32,14 @@ public class IndexedConsensusRequest implements IConsensusRequest {
 
   private final long syncIndex;
   private final List<IConsensusRequest> requests;
-  private final List<ByteBuffer> serializedRequests = new ArrayList<>();
+  private final List<ByteBuffer> serializedRequests;
   private long serializedSize = 0;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
     this.requests = requests;
     this.syncIndex = -1L;
-    this.requests.forEach(
-        r -> {
-          ByteBuffer buffer = r.serializeToByteBuffer();
-          this.serializedRequests.add(buffer);
-          this.serializedSize += buffer.capacity();
-        });
+    this.serializedRequests = new ArrayList<>(requests.size());
   }
 
   public IndexedConsensusRequest(
@@ -52,6 +47,16 @@ public class IndexedConsensusRequest implements IConsensusRequest {
     this.searchIndex = searchIndex;
     this.requests = requests;
     this.syncIndex = syncIndex;
+    this.serializedRequests = new ArrayList<>(requests.size());
+  }
+
+  public void buildSerializedRequests() {
+    this.requests.forEach(
+        r -> {
+          ByteBuffer buffer = r.serializeToByteBuffer();
+          this.serializedRequests.add(buffer);
+          this.serializedSize += buffer.capacity();
+        });
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 2ab9746a31..16e378cde4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -199,8 +199,6 @@ public class IoTConsensusServerImpl {
         }
       }
       long writeToStateMachineStartTime = System.nanoTime();
-      IndexedConsensusRequest indexedConsensusRequest =
-          buildIndexedConsensusRequestForLocalRequest(request);
       // statistic the time of checking write block
       MetricService.getInstance()
           .getOrCreateHistogram(
@@ -213,6 +211,8 @@ public class IoTConsensusServerImpl {
               Tag.REGION.toString(),
               this.consensusGroupId)
           .update(writeToStateMachineStartTime - getStateMachineLockTime);
+      IndexedConsensusRequest indexedConsensusRequest =
+          buildIndexedConsensusRequestForLocalRequest(request);
       if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
         logger.info(
             "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
@@ -220,7 +220,6 @@ public class IoTConsensusServerImpl {
             getCurrentSafelyDeletedSearchIndex(),
             indexedConsensusRequest.getSearchIndex());
       }
-      // TODO wal and memtable
       IConsensusRequest planNode = stateMachine.deserializeRequest(indexedConsensusRequest);
       TSStatus result = stateMachine.write(planNode);
       long writeToStateMachineEndTime = System.nanoTime();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 68a988558e..f6da0efd52 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -156,22 +156,26 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
-    synchronized (this) {
-      threads.forEach(
-          thread -> {
-            logger.debug(
-                "{}->{}: Push a log to the queue, where the queue length is {}",
-                impl.getThisNode().getGroupId(),
-                thread.getPeer().getEndpoint().getIp(),
-                thread.getPendingEntriesSize());
-            if (!thread.offer(request)) {
+    // we don't need to serialize and offer request when replicaNum is 1.
+    if (!threads.isEmpty()) {
+      request.buildSerializedRequests();
+      synchronized (this) {
+        threads.forEach(
+            thread -> {
               logger.debug(
-                  "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+                  "{}->{}: Push a log to the queue, where the queue length is {}",
                   impl.getThisNode().getGroupId(),
-                  thread.getPeer(),
-                  request.getSearchIndex());
-            }
-          });
+                  thread.getPeer().getEndpoint().getIp(),
+                  thread.getPendingEntriesSize());
+              if (!thread.offer(request)) {
+                logger.debug(
+                    "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+                    impl.getThisNode().getGroupId(),
+                    thread.getPeer(),
+                    request.getSearchIndex());
+              }
+            });
+      }
     }
   }
 
@@ -510,6 +514,7 @@ public class LogDispatcher {
           }
         }
         targetIndex = data.getSearchIndex() + 1;
+        data.buildSerializedRequests();
         // construct request from wal
         logBatches.addTLogEntry(
             new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true));
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
index c2560201a5..3a8c8e4e8c 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/FakeConsensusReqReader.java
@@ -53,6 +53,7 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
   }
 
   private class FakeConsensusReqIterator implements ConsensusReqReader.ReqIterator {
+
     private long nextSearchIndex;
 
     public FakeConsensusReqIterator(long startIndex) {
@@ -70,7 +71,8 @@ public class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
         for (IndexedConsensusRequest indexedConsensusRequest : requestSets.getRequestSet()) {
           if (indexedConsensusRequest.getSearchIndex() == nextSearchIndex) {
             nextSearchIndex++;
-            return indexedConsensusRequest;
+            return new IndexedConsensusRequest(
+                indexedConsensusRequest.getSearchIndex(), indexedConsensusRequest.getRequests());
           }
         }
         return null;