You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/26 02:54:37 UTC

[iotdb] 01/02: cherry-pick from xinyu. make serialization to aysnc in iot

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fast_write_test_with_guoneng
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3f77ddf116a76718c5521183d19bf2e93cded5ae
Author: BUAAserein <18...@buaa.edu.cn>
AuthorDate: Tue Apr 11 21:04:37 2023 +0800

    cherry-pick from xinyu. make serialization to aysnc in iot
    
    improve iot
---
 .../consensus/iot/IoTConsensusServerImpl.java      | 17 +++++++++
 .../consensus/iot/logdispatcher/LogDispatcher.java | 42 ++++++++++++----------
 2 files changed, 41 insertions(+), 18 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 201322fa48..032d9ad187 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
@@ -81,6 +82,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -115,6 +117,8 @@ public class IoTConsensusServerImpl {
 
   private final String consensusGroupId;
 
+  private ExecutorService executorService;
+
   public IoTConsensusServerImpl(
       String storageDir,
       Peer thisNode,
@@ -143,6 +147,9 @@ public class IoTConsensusServerImpl {
     this.searchIndex = new AtomicLong(currentSearchIndex);
     this.consensusGroupId = thisNode.getGroupId().toString();
     this.metrics = new IoTConsensusServerMetrics(this);
+    this.executorService =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors(), "Serialization");
   }
 
   public IStateMachine getStateMachine() {
@@ -158,6 +165,16 @@ public class IoTConsensusServerImpl {
   public void stop() {
     logDispatcher.stop();
     stateMachine.stop();
+    executorService.shutdownNow();
+    int timeout = 10;
+    try {
+      if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
+        logger.error("Unable to shutdown serialization service after {} seconds", timeout);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.error("Unexpected Interruption when closing serialization service ");
+    }
     MetricService.getInstance().removeMetricSet(this.metrics);
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index f6da0efd52..dac55282f7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -158,24 +158,27 @@ public class LogDispatcher {
   public void offer(IndexedConsensusRequest request) {
     // we don't need to serialize and offer request when replicaNum is 1.
     if (!threads.isEmpty()) {
-      request.buildSerializedRequests();
-      synchronized (this) {
-        threads.forEach(
-            thread -> {
-              logger.debug(
-                  "{}->{}: Push a log to the queue, where the queue length is {}",
-                  impl.getThisNode().getGroupId(),
-                  thread.getPeer().getEndpoint().getIp(),
-                  thread.getPendingEntriesSize());
-              if (!thread.offer(request)) {
-                logger.debug(
-                    "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
-                    impl.getThisNode().getGroupId(),
-                    thread.getPeer(),
-                    request.getSearchIndex());
-              }
-            });
-      }
+      executorService.submit(
+          () -> {
+            request.buildSerializedRequests();
+            synchronized (this) {
+              threads.forEach(
+                  thread -> {
+                    logger.debug(
+                        "{}->{}: Push a log to the queue, where the queue length is {}",
+                        impl.getThisNode().getGroupId(),
+                        thread.getPeer().getEndpoint().getIp(),
+                        thread.getPendingEntriesSize());
+                    if (!thread.offer(request)) {
+                      logger.debug(
+                          "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}",
+                          impl.getThisNode().getGroupId(),
+                          thread.getPeer(),
+                          request.getSearchIndex());
+                    }
+                  });
+            }
+          });
     }
   }
 
@@ -252,6 +255,9 @@ public class LogDispatcher {
 
     /** try to offer a request into queue with memory control. */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
+      if (indexedConsensusRequest.getSearchIndex() - getCurrentSyncIndex() > 10000) {
+        return false;
+      }
       if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
         return false;
       }