You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/01/05 02:25:35 UTC

[iotdb] branch reduce_serialization_pool created (now b686e94)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch reduce_serialization_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at b686e94  make serialization pool in LogDispatcher static to reduece the number of pools

This branch includes the following new commits:

     new b686e94  make serialization pool in LogDispatcher static to reduece the number of pools

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: make serialization pool in LogDispatcher static to reduece the number of pools

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch reduce_serialization_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b686e94118036dd2885da46d83893b7cdf5b5ac6
Author: jt <jt...@163.com>
AuthorDate: Tue Jan 5 10:22:30 2021 +0800

    make serialization pool in LogDispatcher static to reduece the number of pools
---
 .../java/org/apache/iotdb/cluster/log/LogDispatcher.java     | 12 ++++++------
 .../java/org/apache/iotdb/cluster/server/RaftServer.java     |  7 ++++++-
 .../org/apache/iotdb/cluster/integration/SingleNodeTest.java |  4 +++-
 3 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a813fa4..bd0bd3f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.log;
 
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.Future;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -53,8 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * A LogDispatcher servers a raft leader by queuing logs that the leader wants to send to the
- * follower and send the logs in an ordered manner so that the followers will not wait for previous
+ * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
+ * followers and send the logs in an ordered manner so that the followers will not wait for previous
  * logs for too long. For example: if the leader send 3 logs, log1, log2, log3, concurrently to
  * follower A, the actual reach order may be log3, log2, and log1. According to the protocol, log3
  * and log2 must halt until log1 reaches, as a result, the total delay may increase significantly.
@@ -68,12 +69,13 @@ public class LogDispatcher {
   private List<BlockingQueue<SendLogRequest>> nodeLogQueues =
       new ArrayList<>();
   private ExecutorService executorService;
-  private ExecutorService serializationService;
+  private static ExecutorService serializationService =
+      Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
     executorService = Executors.newCachedThreadPool();
-    serializationService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
     for (Node node : member.getAllNodes()) {
       if (!node.equals(member.getThisNode())) {
         nodeLogQueues.add(createQueueAndBindingThread(node));
@@ -85,8 +87,6 @@ public class LogDispatcher {
   public void close() throws InterruptedException {
     executorService.shutdownNow();
     executorService.awaitTermination(10, TimeUnit.SECONDS);
-    serializationService.shutdownNow();
-    serializationService.awaitTermination(10, TimeUnit.SECONDS);
   }
 
   public void offer(SendLogRequest log) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index fe0cc63..2e925c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server;
 
+import java.util.ConcurrentModificationException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.SynchronousQueue;
@@ -144,7 +145,11 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
       return;
     }
 
-    poolServer.stop();
+    try {
+      poolServer.stop();
+    } catch (ConcurrentModificationException e) {
+      // ignore
+    }
     socket.close();
     clientService.shutdownNow();
     socket = null;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
index b1a30c9..5027ad4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
@@ -49,7 +49,9 @@ public class SingleNodeTest extends BaseSingleNodeTest {
   @After
   public void tearDown() throws Exception {
     super.tearDown();
-    session.close();
+    if (session != null) {
+      session.close();
+    }
   }
 
   @Test