You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/04 11:09:56 UTC

[iotdb] branch expr_vgraft updated: tmp save

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_vgraft by this push:
     new 31cdac29ac tmp save
31cdac29ac is described below

commit 31cdac29ac40a8fbbbf21d06af4550dba10cf73a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue Oct 4 19:09:51 2022 +0800

    tmp save
---
 .../iotdb/cluster/config/ClusterDescriptor.java    |   8 +-
 .../iotdb/cluster/expr/vgraft/KeyManager.java      |  25 +--
 .../cluster/expr/vgraft/TrustValueHolder.java      |  80 +++++-----
 .../iotdb/cluster/log/FragmentedLogDispatcher.java |   3 +-
 .../iotdb/cluster/log/IndirectLogDispatcher.java   |  15 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  33 ++--
 .../apache/iotdb/cluster/log/VotingLogList.java    |  67 ++++----
 .../cluster/log/appender/BlockingLogAppender.java  |   5 +-
 .../log/appender/SlidingWindowLogAppender.java     |   5 +-
 .../iotdb/cluster/log/applier/DataLogApplier.java  |   1 -
 .../manage/FilePartitionedSnapshotLogManager.java  |   9 +-
 .../log/manage/MetaSingleSnapshotLogManager.java   |   1 -
 .../server/handlers/caller/LogCatchUpHandler.java  |   5 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 173 ++++++++-------------
 .../cluster/server/service/BaseAsyncService.java   |   3 +-
 .../cluster/server/service/BaseSyncService.java    |   3 +-
 .../cluster/server/service/DataAsyncService.java   |   4 +-
 .../cluster/server/service/DataGroupEngine.java    |  40 ++---
 .../server/service/DataGroupServiceImpls.java      |   7 +-
 .../cluster/server/service/DataSyncService.java    |   4 +-
 .../cluster/server/service/MetaAsyncService.java   |   3 +-
 .../cluster/server/service/MetaSyncService.java    |   7 +-
 .../threadpool/WrappedThreadPoolExecutor.java      |  10 +-
 23 files changed, 245 insertions(+), 266 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index b3af782f2b..562cff9c07 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -386,15 +386,11 @@ public class ClusterDescriptor {
 
     config.setUseVGRaft(
         Boolean.parseBoolean(
-            properties.getProperty(
-                "use_vg_raft",
-                String.valueOf(config.isUseVGRaft()))));
+            properties.getProperty("use_vg_raft", String.valueOf(config.isUseVGRaft()))));
 
     config.setUseCRaft(
         Boolean.parseBoolean(
-            properties.getProperty(
-                "use_c_raft",
-                String.valueOf(config.isUseCRaft()))));
+            properties.getProperty("use_c_raft", String.valueOf(config.isUseCRaft()))));
 
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/KeyManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/KeyManager.java
index 67faaec11e..7b2af5ad6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/KeyManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/KeyManager.java
@@ -1,5 +1,8 @@
 package org.apache.iotdb.cluster.expr.vgraft;
 
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.InvalidKeyException;
@@ -13,16 +16,12 @@ import java.security.SignatureException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
 
 public class KeyManager {
 
   public static KeyManager INSTANCE = new KeyManager();
 
-  private KeyManager() {
-
-  }
+  private KeyManager() {}
 
   private KeyPairGenerator keygen;
   private SecureRandom sRandom;
@@ -42,8 +41,10 @@ public class KeyManager {
       dsa.update(ClusterUtils.nodeToString(thisNode).getBytes(StandardCharsets.UTF_8));
       nodeSignature = dsa.sign();
       dummySignature = ClusterUtils.nodeToString(thisNode).getBytes(StandardCharsets.UTF_8);
-    } catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException |
-             SignatureException e) {
+    } catch (NoSuchAlgorithmException
+        | NoSuchProviderException
+        | InvalidKeyException
+        | SignatureException e) {
       throw new RuntimeException(e);
     }
   }
@@ -59,13 +60,15 @@ public class KeyManager {
       dsa = Signature.getInstance("SHA1withDSA", "SUN");
       dsa.initVerify(keyPair.getPublic());
       dsa.verify(nodeSignature);
-    } catch (NoSuchAlgorithmException | NoSuchProviderException | SignatureException |
-             InvalidKeyException e) {
+    } catch (NoSuchAlgorithmException
+        | NoSuchProviderException
+        | SignatureException
+        | InvalidKeyException e) {
       throw new RuntimeException(e);
     }
 
-    return Arrays.equals(ClusterUtils.nodeToString(node).getBytes(StandardCharsets.UTF_8),
-        signature);
+    return Arrays.equals(
+        ClusterUtils.nodeToString(node).getBytes(StandardCharsets.UTF_8), signature);
   }
 
   public boolean verifyNodeSignature(Node node, ByteBuffer buffer) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/TrustValueHolder.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/TrustValueHolder.java
index d8669faa31..806290fa6d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/TrustValueHolder.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/vgraft/TrustValueHolder.java
@@ -1,23 +1,5 @@
 package org.apache.iotdb.cluster.expr.vgraft;
 
-import java.nio.charset.StandardCharsets;
-import java.security.InvalidKeyException;
-import java.security.KeyPair;
-import java.security.KeyPairGenerator;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.SecureRandom;
-import java.security.Signature;
-import java.security.SignatureException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -25,12 +7,22 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 public class TrustValueHolder {
 
   private static final Logger logger = LoggerFactory.getLogger(TrustValueHolder.class);
@@ -39,7 +31,6 @@ public class TrustValueHolder {
   private final Map<Node, Long> trustValueMap = new ConcurrentHashMap<>();
   private final RaftMember member;
 
-
   public TrustValueHolder(RaftMember member) {
     this.member = member;
   }
@@ -86,17 +77,19 @@ public class TrustValueHolder {
           TimeUnit.MILLISECONDS)) {
         return verifiers;
       } else {
-        return nodeTrustValueList.subList(0, M).stream().map(Entry::getKey)
+        return nodeTrustValueList.subList(0, M).stream()
+            .map(Entry::getKey)
             .collect(Collectors.toList());
       }
     } catch (InterruptedException e) {
-      return nodeTrustValueList.subList(0, M).stream().map(Entry::getKey)
+      return nodeTrustValueList.subList(0, M).stream()
+          .map(Entry::getKey)
           .collect(Collectors.toList());
     }
   }
 
-  private AsyncMethodCallback<Void> getPingCallback(Entry<Node, Long> entry, int M,
-      List<Node> verifiers, CountDownLatch countDownLatch) {
+  private AsyncMethodCallback<Void> getPingCallback(
+      Entry<Node, Long> entry, int M, List<Node> verifiers, CountDownLatch countDownLatch) {
     return new AsyncMethodCallback<Void>() {
       @Override
       public void onComplete(Void unused) {
@@ -115,28 +108,37 @@ public class TrustValueHolder {
     };
   }
 
-  private void pingVerifiersSync(List<Map.Entry<Node, Long>> nodeTrustValueList, int M,
-      List<Node> verifiers, CountDownLatch countDownLatch) {
+  private void pingVerifiersSync(
+      List<Map.Entry<Node, Long>> nodeTrustValueList,
+      int M,
+      List<Node> verifiers,
+      CountDownLatch countDownLatch) {
 
     for (Map.Entry<Node, Long> entry : nodeTrustValueList) {
       AsyncMethodCallback<Void> callback = getPingCallback(entry, M, verifiers, countDownLatch);
 
-      member.getSerialToParallelPool().submit(() -> {
-        Client syncClient = member.getSyncClient(entry.getKey());
-        try {
-          syncClient.ping();
-          callback.onComplete(null);
-        } catch (TException e) {
-          callback.onError(e);
-        } finally {
-          ClientUtils.putBackSyncClient(syncClient);
-        }
-      });
+      member
+          .getSerialToParallelPool()
+          .submit(
+              () -> {
+                Client syncClient = member.getSyncClient(entry.getKey());
+                try {
+                  syncClient.ping();
+                  callback.onComplete(null);
+                } catch (TException e) {
+                  callback.onError(e);
+                } finally {
+                  ClientUtils.putBackSyncClient(syncClient);
+                }
+              });
     }
   }
 
-  private void pingVerifiersAsync(List<Map.Entry<Node, Long>> nodeTrustValueList, int M,
-      List<Node> verifiers, CountDownLatch countDownLatch) {
+  private void pingVerifiersAsync(
+      List<Map.Entry<Node, Long>> nodeTrustValueList,
+      int M,
+      List<Node> verifiers,
+      CountDownLatch countDownLatch) {
 
     for (Map.Entry<Node, Long> entry : nodeTrustValueList) {
       AsyncMethodCallback<Void> callback = getPingCallback(entry, M, verifiers, countDownLatch);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
index 4a08476869..3a77d15eb0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java
@@ -19,15 +19,14 @@
 
 package org.apache.iotdb.cluster.log;
 
-import java.util.Map.Entry;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-
 import org.apache.iotdb.tsfile.utils.Pair;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index 1665edbb58..0f99bb9fe7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.log;
 
-import java.util.concurrent.ArrayBlockingQueue;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -29,9 +28,9 @@ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.WeightedList;
-
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.tsfile.utils.Pair;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +40,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -87,8 +87,15 @@ public class IndirectLogDispatcher extends LogDispatcher {
 
     for (int i = 0; i < bindingThreadNum; i++) {
       for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
-        executorServices.computeIfAbsent(pair.left, n -> IoTDBThreadPoolFactory.newCachedThreadPool(
-                "LogDispatcher-" + member.getName() + "-" + ClusterUtils.nodeToString(pair.left)))
+        executorServices
+            .computeIfAbsent(
+                pair.left,
+                n ->
+                    IoTDBThreadPoolFactory.newCachedThreadPool(
+                        "LogDispatcher-"
+                            + member.getName()
+                            + "-"
+                            + ClusterUtils.nodeToString(pair.left)))
             .submit(newDispatcherThread(pair.left, pair.right));
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 7032ef6e52..12e9a2927e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -41,8 +41,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
 import org.apache.iotdb.tsfile.utils.Pair;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -103,8 +103,15 @@ public class LogDispatcher {
 
     for (int i = 0; i < bindingThreadNum; i++) {
       for (Pair<Node, BlockingQueue<SendLogRequest>> pair : nodesLogQueuesList) {
-        executorServices.computeIfAbsent(pair.left, n -> IoTDBThreadPoolFactory.newCachedThreadPool(
-                "LogDispatcher-" + member.getName() + "-" + ClusterUtils.nodeToString(pair.left)))
+        executorServices
+            .computeIfAbsent(
+                pair.left,
+                n ->
+                    IoTDBThreadPoolFactory.newCachedThreadPool(
+                        "LogDispatcher-"
+                            + member.getName()
+                            + "-"
+                            + ClusterUtils.nodeToString(pair.left)))
             .submit(newDispatcherThread(pair.left, pair.right));
       }
     }
@@ -124,7 +131,6 @@ public class LogDispatcher {
     return newRequest;
   }
 
-
   private boolean addToQueue(BlockingQueue<SendLogRequest> nodeLogQueue, SendLogRequest request) {
     if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) {
       long waitStart = System.currentTimeMillis();
@@ -146,6 +152,7 @@ public class LogDispatcher {
       return nodeLogQueue.add(request);
     }
   }
+
   public void offer(SendLogRequest request) {
 
     long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime();
@@ -347,10 +354,13 @@ public class LogDispatcher {
             request.getVotingLog().getLog().getCreateTime());
         long start = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
         request.getAppendEntryRequest().entry = request.getVotingLog().getLog().serialize();
-        request.getVotingLog().getLog()
+        request
+            .getVotingLog()
+            .getLog()
             .setByteSize(request.getAppendEntryRequest().entry.capacity());
         if (clusterConfig.isUseVGRaft()) {
-          request.getAppendEntryRequest()
+          request
+              .getAppendEntryRequest()
               .setEntryHash(request.getAppendEntryRequest().entry.hashCode());
         }
         Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
@@ -474,9 +484,6 @@ public class LogDispatcher {
               logRequest.newLeaderTerm,
               logRequest.quorumSize);
       // TODO add async interface
-      if (getSyncClient() == null) {
-        setSyncClient(member.getSyncClient(receiver));
-      }
 
       int retries = 5;
       try {
@@ -484,8 +491,12 @@ public class LogDispatcher {
         for (int i = 0; i < retries; i++) {
           int concurrentSender = concurrentSenderNum.incrementAndGet();
           Statistic.RAFT_CONCURRENT_SENDER.add(concurrentSender);
-          AppendEntryResult result = getSyncClient().appendEntry(logRequest.appendEntryRequest,
-              logRequest.isVerifier);
+          Client client = getSyncClient();
+          if (client == null) {
+            continue;
+          }
+          AppendEntryResult result;
+          result = client.appendEntry(logRequest.appendEntryRequest, logRequest.isVerifier);
           concurrentSenderNum.decrementAndGet();
           if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
             Thread.sleep(100);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index 29a72f1612..9b180d9073 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -19,23 +19,23 @@
 
 package org.apache.iotdb.cluster.log;
 
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class VotingLogList {
 
@@ -45,15 +45,35 @@ public class VotingLogList {
   private int quorumSize;
   private RaftMember member;
   private Map<Integer, Long> stronglyAcceptedIndices = new ConcurrentHashMap<>();
+  private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
 
   public VotingLogList(int quorumSize, RaftMember member) {
     this.quorumSize = quorumSize;
     this.member = member;
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        service,
+        () -> {
+          long newCommitIndex = computeNewCommitIndex();
+          if (newCommitIndex > member.getLogManager().getCommitLogIndex()) {
+            synchronized (member.getLogManager()) {
+              try {
+                member.getLogManager().commitTo(newCommitIndex);
+              } catch (LogExecutionException e) {
+                logger.error("Fail to commit {}", newCommitIndex, e);
+              }
+            }
+          }
+        },
+        0,
+        1,
+        TimeUnit.MILLISECONDS);
   }
 
-
   private long computeNewCommitIndex() {
     List<Entry<Integer, Long>> nodeIndices = new ArrayList<>(stronglyAcceptedIndices.entrySet());
+    if (nodeIndices.size() < quorumSize) {
+      return -1;
+    }
     nodeIndices.sort(Entry.comparingByValue());
     return nodeIndices.get(quorumSize - 1).getValue();
   }
@@ -71,24 +91,15 @@ public class VotingLogList {
   public void onStronglyAccept(long index, long term, Node acceptingNode, ByteBuffer signature) {
     logger.debug("{}-{} is strongly accepted by {}", index, term, acceptingNode);
 
-    stronglyAcceptedIndices.compute(acceptingNode.nodeIdentifier, (nid, idx) -> {
-      if (idx == null) {
-        return index;
-      } else {
-        return Math.max(index, idx);
-      }
-    });
-
-    long newCommitIndex = computeNewCommitIndex();
-    if (newCommitIndex > member.getCommitIndex()) {
-      synchronized (member.getLogManager()) {
-        try {
-          member.getLogManager().commitTo(newCommitIndex);
-        } catch (LogExecutionException e) {
-          logger.error("Fail to commit {}", newCommitIndex, e);
-        }
-      }
-    }
+    stronglyAcceptedIndices.compute(
+        acceptingNode.nodeIdentifier,
+        (nid, idx) -> {
+          if (idx == null) {
+            return index;
+          } else {
+            return Math.max(index, idx);
+          }
+        });
   }
 
   public int totalAcceptedNodeNum(VotingLog log) {
@@ -96,7 +107,7 @@ public class VotingLogList {
     int num = log.getWeaklyAcceptedNodeIds().size();
     for (Entry<Integer, Long> entry : stronglyAcceptedIndices.entrySet()) {
       if (entry.getValue() >= index) {
-        num ++;
+        num++;
       }
     }
     return num;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
index f60523a514..ffae4ccb37 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -77,8 +77,8 @@ public class BlockingLogAppender implements LogAppender {
       // TODO: Consider memory footprint to execute a precise rejection
       if ((logManager.getCommitLogIndex() - logManager.getMaxHaveAppliedCommitIndex())
           <= ClusterDescriptor.getInstance()
-          .getConfig()
-          .getUnAppliedRaftLogNumForRejectThreshold()) {
+              .getConfig()
+              .getUnAppliedRaftLogNumForRejectThreshold()) {
         synchronized (logManager) {
           success =
               logManager.maybeAppend(
@@ -97,7 +97,6 @@ public class BlockingLogAppender implements LogAppender {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-
     }
     Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
     if (success != -1) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index 2f4d21dbf0..9303580d31 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -136,8 +136,8 @@ public class SlidingWindowLogAppender implements LogAppender {
       // TODO: Consider memory footprint to execute a precise rejection
       if ((logManager.getCommitLogIndex() - logManager.getMaxHaveAppliedCommitIndex())
           <= ClusterDescriptor.getInstance()
-          .getConfig()
-          .getUnAppliedRaftLogNumForRejectThreshold()) {
+              .getConfig()
+              .getUnAppliedRaftLogNumForRejectThreshold()) {
         synchronized (logManager) {
           success =
               logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
@@ -155,7 +155,6 @@ public class SlidingWindowLogAppender implements LogAppender {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-
     }
     if (success != -1) {
       moveWindowRightward(flushPos);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index f17ce02b13..ff04103529 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index d1b682257d..eaee82bf39 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -35,7 +35,6 @@ import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -66,10 +65,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
       LoggerFactory.getLogger(FilePartitionedSnapshotLogManager.class);
 
   public FilePartitionedSnapshotLogManager(
-      PartitionTable partitionTable,
-      Node header,
-      Node thisNode,
-      DataGroupMember dataGroupMember) {
+      PartitionTable partitionTable, Node header, Node thisNode, DataGroupMember dataGroupMember) {
     super(
         createLogApplier(dataGroupMember),
         partitionTable,
@@ -80,8 +76,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
         dataGroupMember.getStateMachine());
   }
 
-  private static LogApplier createLogApplier(
-      DataGroupMember dataGroupMember) {
+  private static LogApplier createLogApplier(DataGroupMember dataGroupMember) {
     LogApplier applier = new DataLogApplier(dataGroupMember);
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()
         && ClusterDescriptor.getInstance().getConfig().getReplicationNum() != 1) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index 25a7e02ac7..0cb405f76a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.auth.authorizer.IAuthorizer;
 import org.apache.iotdb.commons.auth.entity.Role;
 import org.apache.iotdb.commons.auth.entity.User;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
 import org.apache.iotdb.db.service.IoTDB;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
index e946ea06c1..0f41aae793 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.server.handlers.caller;
 
-import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -29,6 +28,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
@@ -54,8 +54,7 @@ public class LogCatchUpHandler implements AsyncMethodCallback<AppendEntryResult>
   private void processStrongAccept(ByteBuffer signature) {
     raftMember
         .getVotingLogList()
-        .onStronglyAccept(log.getCurrLogIndex(), log.getCurrLogTerm(), follower,
-           signature);
+        .onStronglyAccept(log.getCurrLogIndex(), log.getCurrLogTerm(), follower, signature);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 027b706af0..5c29f6e3f3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -141,8 +141,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
 
 /**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so
- * on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
  */
 @SuppressWarnings("java:S3077") // reference volatile is enough
 public abstract class RaftMember implements RaftMemberMBean {
@@ -177,34 +176,24 @@ public abstract class RaftMember implements RaftMemberMBean {
    * on this may be woken.
    */
   private final Object waitLeaderCondition = new Object();
-  /**
-   * the lock is to make sure that only one thread can apply snapshot at the same time
-   */
+  /** the lock is to make sure that only one thread can apply snapshot at the same time */
   private final Lock snapshotApplyLock = new ReentrantLock();
 
   private final Object heartBeatWaitObject = new Object();
 
   protected Node thisNode = ClusterIoTDB.getInstance().getThisNode();
 
-  /**
-   * the nodes that belong to the same raft group as thisNode.
-   */
+  /** the nodes that belong to the same raft group as thisNode. */
   protected PartitionGroup allNodes;
 
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-  /**
-   * the name of the member, to distinguish several members in the logs.
-   */
+  /** the name of the member, to distinguish several members in the logs. */
   ConsensusGroupId groupId;
 
   String name;
-  /**
-   * to choose nodes to send request of joining cluster randomly.
-   */
+  /** to choose nodes to send request of joining cluster randomly. */
   Random random = new Random();
-  /**
-   * when the node is a leader, this map is used to track log progress of each follower.
-   */
+  /** when the node is a leader, this map is used to track log progress of each follower. */
   Map<Node, PeerInfo> peerMap;
   /**
    * the current term of the node, this object also works as lock of some transactions of the member
@@ -226,9 +215,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    */
   volatile long lastHeartbeatReceivedTime;
 
-  /**
-   * the raft logs are all stored and maintained in the log manager
-   */
+  /** the raft logs are all stored and maintained in the log manager */
   protected RaftLogManager logManager;
 
   /**
@@ -247,9 +234,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * member by comparing it with the current last log index.
    */
   long lastReportedLogIndex;
-  /**
-   * the thread pool that runs catch-up tasks
-   */
+  /** the thread pool that runs catch-up tasks */
   private ExecutorService catchUpService;
   /**
    * lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -280,15 +265,12 @@ public abstract class RaftMember implements RaftMemberMBean {
    * one slow node.
    */
   private ExecutorService serialToParallelPool;
-  /**
-   * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
-   */
+  /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
   private ExecutorService commitLogPool;
 
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
-   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
-   * logs.
+   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
    */
   private volatile LogDispatcher logDispatcher;
 
@@ -300,9 +282,7 @@ public abstract class RaftMember implements RaftMemberMBean {
 
   protected IStateMachine stateMachine;
 
-  /**
-   * (logIndex, logTerm) -> append handler
-   */
+  /** (logIndex, logTerm) -> append handler */
   protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers =
       new ConcurrentHashMap<>();
 
@@ -321,7 +301,6 @@ public abstract class RaftMember implements RaftMemberMBean {
   //  VG-Raft related
   private TrustValueHolder trustValueHolder = null;
 
-
   protected RaftMember(IStateMachine stateMachine) {
     this.stateMachine = stateMachine;
   }
@@ -640,7 +619,8 @@ public abstract class RaftMember implements RaftMemberMBean {
    * Process an AppendEntryRequest. First check the term of the leader, then parse the log and
    * finally see if we can find a position to append the log.
    */
-  public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier) throws UnknownLogTypeException {
+  public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier)
+      throws UnknownLogTypeException {
     long operationStartTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
     logger.debug("{} received an AppendEntryRequest: {}", name, request);
     if (logger.isDebugEnabled()) {
@@ -665,11 +645,13 @@ public abstract class RaftMember implements RaftMemberMBean {
       if (request.entryHash != request.entry.hashCode()) {
         return new AppendEntryResult(Response.RESPONSE_HASH_INCONSISTENT).setHeader(getHeader());
       }
-      if (!KeyManager.INSTANCE.verifyNodeSignature(request.leader,
-          request.leaderSignature.array(), request.leaderSignature.arrayOffset(),
+      if (!KeyManager.INSTANCE.verifyNodeSignature(
+          request.leader,
+          request.leaderSignature.array(),
+          request.leaderSignature.arrayOffset(),
           request.leaderSignature.remaining())) {
-        return new AppendEntryResult(Response.RESPONSE_SIGNATURE_INCONSISTENT).setHeader(
-            getHeader());
+        return new AppendEntryResult(Response.RESPONSE_SIGNATURE_INCONSISTENT)
+            .setHeader(getHeader());
       }
     }
 
@@ -693,17 +675,13 @@ public abstract class RaftMember implements RaftMemberMBean {
     return result;
   }
 
-  /**
-   * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
-   */
+  /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   public AppendEntryResult appendEntries(AppendEntriesRequest request)
       throws UnknownLogTypeException {
     return appendEntriesInternal(request);
   }
 
-  /**
-   * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
-   */
+  /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
       throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntriesRequest", name);
@@ -865,22 +843,16 @@ public abstract class RaftMember implements RaftMemberMBean {
     return lastCatchUpResponseTime;
   }
 
-  /**
-   * Sub-classes will add their own process of HeartBeatResponse in this method.
-   */
-  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
-  }
+  /** Sub-classes will add their own process of HeartBeatResponse in this method. */
+  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
 
-  /**
-   * The actions performed when the node wins in an election (becoming a leader).
-   */
-  public void onElectionWins() {
-  }
+  /** The actions performed when the node wins in an election (becoming a leader). */
+  public void onElectionWins() {}
 
   /**
    * Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
-   * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
-   * a part of data is in the snapshot, then it is not in the logs.
+   * follower. If some of the required logs are removed, also send the snapshot. <br>
+   * notice that if a part of data is in the snapshot, then it is not in the logs.
    */
   public void catchUp(Node follower, long lastLogIdx) {
     // for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -963,9 +935,7 @@ public abstract class RaftMember implements RaftMemberMBean {
         "%s:%s=%s", "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, "Engine");
   }
 
-  /**
-   * call back after syncLeader
-   */
+  /** call back after syncLeader */
   public interface CheckConsistency {
 
     /**
@@ -974,7 +944,7 @@ public abstract class RaftMember implements RaftMemberMBean {
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
      * @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
-     *                                   implements.
+     *     implements.
      */
     void postCheckConsistency(long leaderCommitId, long localAppliedId)
         throws CheckConsistencyException;
@@ -996,7 +966,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       if (leaderCommitId == Long.MAX_VALUE
           || leaderCommitId == Long.MIN_VALUE
           || leaderCommitId - localAppliedId
-          > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+              > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
         throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
       }
     }
@@ -1029,7 +999,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * @param checkConsistency check after syncleader
    * @return true if the node has caught up, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *                                   value after timeout
+   *     value after timeout
    */
   public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
     if (character == NodeCharacter.LEADER) {
@@ -1048,9 +1018,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     return waitUntilCatchUp(checkConsistency);
   }
 
-  /**
-   * Wait until the leader of this node becomes known or time out.
-   */
+  /** Wait until the leader of this node becomes known or time out. */
   public void waitLeader() {
     long startTime = System.currentTimeMillis();
     while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -1077,7 +1045,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    *
    * @return true if this node has caught up before timeout, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *                                   value after timeout
+   *     value after timeout
    */
   protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
       throws CheckConsistencyException {
@@ -1110,7 +1078,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * sync local applyId to leader commitId
    *
    * @param leaderCommitId leader commit id
-   * @param fastFail       if enable, when log differ too much, return false directly.
+   * @param fastFail if enable, when log differ too much, return false directly.
    * @return true if leaderCommitId <= localAppliedId
    */
   public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
@@ -1163,7 +1131,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * call this method. Will commit the log locally and send it to followers
    *
    * @return OK if over half of the followers accept the log or null if the leadership is lost
-   * during the appending
+   *     during the appending
    */
   public TSStatus processPlanLocally(IConsensusRequest request) {
     if (USE_LOG_DISPATCHER) {
@@ -1194,7 +1162,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     // we need to return error code to the client as in server mode
     if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
         && log.serialize().capacity() + Integer.BYTES
-        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+            >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
       logger.error(
           "Log cannot fit into buffer, please increase raft_log_buffer_size;"
               + "or reduce the size of requests you send.");
@@ -1268,7 +1236,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     // just like processPlanLocally,we need to check the size of log
     if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()
         && log.serialize().capacity() + Integer.BYTES
-        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+            >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
       logger.error(
           "Log cannot fit into buffer, please increase raft_log_buffer_size;"
               + "or reduce the size of requests you send.");
@@ -1431,9 +1399,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     return peerMap.computeIfAbsent(node, r -> new PeerInfo(getLogManager().getLastLogIndex()));
   }
 
-  /**
-   * @return true if there is a log whose index is "index" and term is "term", false otherwise
-   */
+  /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
   public boolean matchLog(long index, long term) {
     boolean matched = logManager.matchTerm(term, index);
     logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1452,18 +1418,15 @@ public abstract class RaftMember implements RaftMemberMBean {
     return syncLock;
   }
 
-  /**
-   * Sub-classes will add their own process of HeartBeatRequest in this method.
-   */
-  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
-  }
+  /** Sub-classes will add their own process of HeartBeatRequest in this method. */
+  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
 
   /**
    * Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
    * request is valid.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkElectorLogProgress(ElectionRequest electionRequest) {
 
@@ -1507,7 +1470,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * lastLogIndex is smaller than the voter's Otherwise accept the election.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkLogProgress(long lastLogIndex, long lastLogTerm) {
     long response;
@@ -1524,10 +1487,10 @@ public abstract class RaftMember implements RaftMemberMBean {
   /**
    * Forward a non-query plan to a node using the default client.
    *
-   * @param plan   a non-query plan
-   * @param node   cannot be the local node
+   * @param plan a non-query plan
+   * @param node cannot be the local node
    * @param header must be set for data group communication, set to null for meta group
-   *               communication
+   *     communication
    * @return a TSStatus indicating if the forwarding is successful.
    */
   public TSStatus forwardPlan(IConsensusRequest plan, Node node, RaftNode header) {
@@ -1558,7 +1521,7 @@ public abstract class RaftMember implements RaftMemberMBean {
   /**
    * Forward a non-query plan to "receiver" using "client".
    *
-   * @param plan   a non-query plan
+   * @param plan a non-query plan
    * @param header to determine which DataGroupMember of "receiver" will process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
@@ -1637,7 +1600,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * Get an asynchronous thrift client of the given node.
    *
    * @return an asynchronous thrift client or null if the caller tries to connect the local node or
-   * the node cannot be reached.
+   *     the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
     try {
@@ -1668,7 +1631,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     try {
       return clientManager.borrowSyncClient(node, getClientCategory());
     } catch (IOException e) {
-      logger.error("borrow sync client fail", e);
+      logger.debug("borrow sync client fail", e);
       return null;
     }
   }
@@ -1773,13 +1736,14 @@ public abstract class RaftMember implements RaftMemberMBean {
     synchronized (log.getLog()) {
       while (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
           || (!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
-          && getCommitIndex() < log.getLog().getCurrLogIndex()
-          || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
-          && log.getSignatures().size() < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
-          && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
-          || (totalAccepted < quorumSize))
-          && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
-          && !log.isHasFailed()) {
+                      && getCommitIndex() < log.getLog().getCurrLogIndex()
+                  || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
+                      && log.getSignatures().size()
+                          < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
+              && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
+                  || (totalAccepted < quorumSize))
+              && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
+              && !log.isHasFailed()) {
         try {
           log.getLog().wait(waitTime);
         } catch (InterruptedException e) {
@@ -1831,12 +1795,12 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     if (log.getLog().getCurrLogIndex() == Long.MIN_VALUE
         || ((!ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
-        && log.getLog().getCurrLogIndex() > getCommitIndex()
-        || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
-        && log.getSignatures().size() < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
-        && (!ENABLE_WEAK_ACCEPTANCE
-        || (totalAccepted < quorumSize))
-        && !log.isHasFailed())) {
+                    && log.getLog().getCurrLogIndex() > getCommitIndex()
+                || ClusterDescriptor.getInstance().getConfig().isUseVGRaft()
+                    && log.getSignatures().size()
+                        < TrustValueHolder.verifierGroupSize(allNodes.size()) / 2)
+            && (!ENABLE_WEAK_ACCEPTANCE || (totalAccepted < quorumSize))
+            && !log.isHasFailed())) {
 
       waitAppendResultLoop(log, quorumSize);
     }
@@ -1958,7 +1922,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * heartbeat timer.
    *
    * @param fromLeader true if the request is from a leader, false if the request is from an
-   *                   elector.
+   *     elector.
    */
   public void stepDown(long newTerm, boolean fromLeader) {
     synchronized (logManager) {
@@ -1994,9 +1958,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     this.thisNode = thisNode;
   }
 
-  /**
-   * @return the header of the data raft group or null if this is in a meta group.
-   */
+  /** @return the header of the data raft group or null if this is in a meta group. */
   public RaftNode getHeader() {
     return null;
   }
@@ -2156,9 +2118,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
   }
 
-  /**
-   * Send "log" to "node".
-   */
+  /** Send "log" to "node". */
   public void sendLogToFollower(
       VotingLog log,
       Node node,
@@ -2228,7 +2188,8 @@ public abstract class RaftMember implements RaftMemberMBean {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      int quorumSize, boolean isVerifier) {
+      int quorumSize,
+      boolean isVerifier) {
     Client client = getSyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 0eafbd76cd..37261fe9bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -76,7 +76,8 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
   @Override
   public void appendEntry(
       AppendEntryRequest request,
-      boolean isVerifier, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+      boolean isVerifier,
+      AsyncMethodCallback<AppendEntryResult> resultHandler) {
     try {
       resultHandler.onComplete(member.appendEntry(request, isVerifier));
     } catch (UnknownLogTypeException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index 1b2c9be4eb..63297012a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -70,7 +70,8 @@ public abstract class BaseSyncService implements RaftService.Iface {
   }
 
   @Override
-  public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier) throws TException {
+  public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier)
+      throws TException {
     try {
       return member.appendEntry(request, isVerifier);
     } catch (UnknownLogTypeException e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index a6c0f47224..5dbefe805d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -502,7 +502,5 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService.
   }
 
   @Override
-  public void ping(AsyncMethodCallback<Void> resultHandler) throws TException {
-
-  }
+  public void ping(AsyncMethodCallback<Void> resultHandler) throws TException {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
index 6b7efa2365..ee56ffbf63 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
@@ -100,8 +100,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
   }
 
   @Override
-  public void start() throws StartupException {
-  }
+  public void start() throws StartupException {}
 
   @Override
   public void stop() {
@@ -141,8 +140,8 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
   public void waitPartitionTableReady() throws PartitionTableUnavailableException {
     long waitStart = System.currentTimeMillis();
     while (!metaGroupMember.isReady()
-        && (System.currentTimeMillis() - waitStart) < ClusterDescriptor.getInstance().getConfig()
-        .getConnectionTimeoutInMS()) {
+        && (System.currentTimeMillis() - waitStart)
+            < ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS()) {
       try {
         Thread.sleep(100);
       } catch (InterruptedException e) {
@@ -194,10 +193,10 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
   }
 
   /**
-   * @param header        the header of the group which the local node is in
+   * @param header the header of the group which the local node is in
    * @param resultHandler can be set to null if the request is an internal request
-   * @param request       the toString() of this parameter should explain what the request is and it
-   *                      is only used in logs for tracing
+   * @param request the toString() of this parameter should explain what the request is and it is
+   *     only used in logs for tracing
    * @return
    */
   public <T> DataGroupMember getDataMember(
@@ -351,9 +350,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
     }
   }
 
-  /**
-   * Make sure the group will not receive new raft logs.
-   */
+  /** Make sure the group will not receive new raft logs. */
   private void removeMember(
       RaftNode header, DataGroupMember dataGroupMember, boolean removedGroup) {
     dataGroupMember.setReadOnly();
@@ -362,14 +359,14 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
     } else {
       if (dataGroupMember.getCharacter() != NodeCharacter.LEADER) {
         new Thread(
-            () -> {
-              try {
-                dataGroupMember.syncLeader(null);
-                dataGroupMember.stop();
-              } catch (CheckConsistencyException e) {
-                logger.warn("Failed to check consistency.", e);
-              }
-            })
+                () -> {
+                  try {
+                    dataGroupMember.syncLeader(null);
+                    dataGroupMember.stop();
+                  } catch (CheckConsistencyException e) {
+                    logger.warn("Failed to check consistency.", e);
+                  }
+                })
             .start();
       }
     }
@@ -488,9 +485,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
     this.partitionTable = partitionTable;
   }
 
-  /**
-   * @return The reports of every DataGroupMember in this node.
-   */
+  /** @return The reports of every DataGroupMember in this node. */
   public List<DataMemberReport> genMemberReports() {
     List<DataMemberReport> dataMemberReports = new ArrayList<>();
     for (DataGroupMember value : headerGroupMap.values()) {
@@ -514,8 +509,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
 
   private static class InstanceHolder {
 
-    private InstanceHolder() {
-    }
+    private InstanceHolder() {}
 
     private static final DataGroupEngine Instance = new DataGroupEngine();
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index d449542ded..d39d527d95 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -96,7 +96,8 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
 
   @Override
   public void appendEntry(
-      AppendEntryRequest request, boolean isVerifier,
+      AppendEntryRequest request,
+      boolean isVerifier,
       AsyncMethodCallback<AppendEntryResult> resultHandler)
       throws TException {
     DataAsyncService service =
@@ -789,9 +790,7 @@ public class DataGroupServiceImpls implements TSDataService.AsyncIface, TSDataSe
   }
 
   @Override
-  public void ping() throws TException {
-
-  }
+  public void ping() throws TException {}
 
   @Override
   public void ping(AsyncMethodCallback<Void> resultHandler) throws TException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 47801ff829..4995b07a6b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -457,7 +457,5 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If
   }
 
   @Override
-  public void ping() throws TException {
-
-  }
+  public void ping() throws TException {}
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 736a083abc..f406078d35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -62,7 +62,8 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
   @Override
   public void appendEntry(
       AppendEntryRequest request,
-      boolean isVerifier, AsyncMethodCallback<AppendEntryResult> resultHandler) {
+      boolean isVerifier,
+      AsyncMethodCallback<AppendEntryResult> resultHandler) {
     // if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded
     // locally, but the partition table is not verified), we do not handle the RPC requests.
     if (!metaGroupMember.isReady() && metaGroupMember.getPartitionTable() == null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index ed5f27974f..e782adab3f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -62,7 +62,8 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
 
   // behavior of followers
   @Override
-  public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier) throws TException {
+  public AppendEntryResult appendEntry(AppendEntryRequest request, boolean isVerifier)
+      throws TException {
     // if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded
     // locally, but the partition table is not verified), we do not handle the RPC requests.
     if (!metaGroupMember.isReady()) {
@@ -259,7 +260,5 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
   }
 
   @Override
-  public void ping() throws TException {
-
-  }
+  public void ping() throws TException {}
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
index 834f5d5500..4d126607f9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.service.JMXService;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -32,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 public class WrappedThreadPoolExecutor extends ThreadPoolExecutor
     implements WrappedThreadPoolExecutorMBean {
   private final String mbeanName;
+  private static final Logger logger = LoggerFactory.getLogger(WrappedThreadPoolExecutor.class);
 
   public WrappedThreadPoolExecutor(
       int corePoolSize,
@@ -60,7 +64,11 @@ public class WrappedThreadPoolExecutor extends ThreadPoolExecutor
     this.mbeanName =
         String.format(
             "%s:%s=%s", IoTDBConstant.IOTDB_THREADPOOL_PACKAGE, IoTDBConstant.JMX_TYPE, mbeanName);
-    JMXService.registerMBean(this, this.mbeanName);
+    try {
+      JMXService.registerMBean(this, this.mbeanName);
+    } catch (Exception e) {
+      logger.warn("Cannot register pool {}", mbeanName, e);
+    }
   }
 
   @Override