You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/13 09:12:46 UTC

[iotdb] branch native_raft updated: refactor log dispatcher

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new f86efa19e7 refactor log dispatcher
f86efa19e7 is described below

commit f86efa19e76d8d2a8c01fc2d973fea34417c1387
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Apr 13 17:12:31 2023 +0800

    refactor log dispatcher
---
 .../protocol/log/dispatch/DispatcherGroup.java     | 111 +++++++
 .../protocol/log/dispatch/DispatcherThread.java    | 252 ++++++++++++++++
 .../protocol/log/dispatch/LogDispatcher.java       | 319 +++------------------
 .../log/dispatch/flowcontrol/FlowBalancer.java     |  23 +-
 4 files changed, 410 insertions(+), 295 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
new file mode 100644
index 0000000000..0104695c23
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DispatcherGroup {
+  private static final Logger logger = LoggerFactory.getLogger(DispatcherGroup.class);
+  private final Peer peer;
+  private final BlockingQueue<VotingEntry> entryQueue;
+  private boolean nodeEnabled;
+  private final RateLimiter rateLimiter;
+  private final ExecutorService dispatcherThreadPool;
+  private final LogDispatcher logDispatcher;
+  private final AtomicInteger groupThreadNum = new AtomicInteger();
+
+  public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int bindingThreadNum) {
+    this.logDispatcher = logDispatcher;
+    this.peer = peer;
+    this.entryQueue = new ArrayBlockingQueue<>(logDispatcher.getConfig().getMaxNumOfLogsInMem());
+    this.nodeEnabled = true;
+    this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
+    this.dispatcherThreadPool = createPool(peer, logDispatcher.getMember().getName());
+    for (int i = 0; i < bindingThreadNum; i++) {
+      addThread();
+    }
+  }
+
+  public void close() {
+    dispatcherThreadPool.shutdownNow();
+    boolean closeSucceeded = false;
+    try {
+      closeSucceeded = dispatcherThreadPool.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      // ignore
+    }
+    if (!closeSucceeded) {
+      logger.warn("Cannot shut down dispatcher pool of {}-{}", logDispatcher.member.getName(),
+          peer);
+    }
+  }
+  public void addThread() {
+    dispatcherThreadPool
+        .submit(newDispatcherThread(peer, entryQueue, rateLimiter));
+    groupThreadNum.incrementAndGet();
+  }
+
+  DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> logBlockingQueue,
+      RateLimiter rateLimiter) {
+    return new DispatcherThread(logDispatcher, node, logBlockingQueue, rateLimiter, this);
+  }
+
+  public void updateRate(double rate) {
+    rateLimiter.setRate(rate);
+  }
+
+  ExecutorService createPool(Peer node, String name) {
+    return IoTDBThreadPoolFactory.newCachedThreadPool(
+        "LogDispatcher-"
+            + name
+            + "-"
+            + node.getEndpoint().getIp()
+            + "-"
+            + node.getEndpoint().getPort()
+            + "-"
+            + node.getNodeId());
+  }
+
+  public int getQueueSize() {
+    return entryQueue.size();
+  }
+
+  public boolean isNodeEnabled() {
+    return nodeEnabled;
+  }
+
+  public BlockingQueue<VotingEntry> getEntryQueue() {
+    return entryQueue;
+  }
+
+  public AtomicInteger getGroupThreadNum() {
+    return groupThreadNum;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
new file mode 100644
index 0000000000..8808479b45
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
+import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
+import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class DispatcherThread implements Runnable {
+  private static final Logger logger = LoggerFactory.getLogger(DispatcherThread.class);
+
+  private final LogDispatcher logDispatcher;
+  Peer receiver;
+  private final BlockingQueue<VotingEntry> logBlockingDeque;
+  protected List<VotingEntry> currBatch = new ArrayList<>();
+  private final String baseName;
+  private final RateLimiter rateLimiter;
+  private final DispatcherGroup group;
+
+  protected DispatcherThread(LogDispatcher logDispatcher, Peer receiver,
+      BlockingQueue<VotingEntry> logBlockingDeque, RateLimiter rateLimiter,
+      DispatcherGroup group) {
+    this.logDispatcher = logDispatcher;
+    this.receiver = receiver;
+    this.logBlockingDeque = logBlockingDeque;
+    this.rateLimiter = rateLimiter;
+    this.group = group;
+    this.baseName = "LogDispatcher-" + logDispatcher.member.getName() + "-" + receiver;
+  }
+
+  @Override
+  public void run() {
+    if (logger.isDebugEnabled()) {
+      Thread.currentThread().setName(baseName);
+    }
+    try {
+      while (!Thread.interrupted()) {
+        synchronized (logBlockingDeque) {
+          VotingEntry poll = logBlockingDeque.poll();
+          if (poll != null) {
+            currBatch.add(poll);
+            logBlockingDeque.drainTo(currBatch, logDispatcher.maxBatchSize - 1);
+          } else {
+            logBlockingDeque.wait(10);
+            continue;
+          }
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
+        }
+        serializeEntries();
+        if (!logDispatcher.queueOrdered) {
+          currBatch.sort(Comparator.comparingLong(s -> s.getEntry().getCurrLogIndex()));
+        }
+        sendLogs(currBatch);
+        currBatch.clear();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      logger.error("Unexpected error in log dispatcher", e);
+    }
+    logger.info("Dispatcher exits");
+    group.getGroupThreadNum().decrementAndGet();
+  }
+
+  protected void serializeEntries() throws InterruptedException {
+    for (VotingEntry request : currBatch) {
+
+      request.getAppendEntryRequest().entry = request.getEntry().serialize();
+      request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
+    }
+  }
+
+  private void appendEntriesAsync(
+      List<ByteBuffer> logList, AppendEntriesRequest request, List<VotingEntry> currBatch) {
+    AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
+    AsyncRaftServiceClient client = logDispatcher.member.getClient(receiver.getEndpoint());
+    try {
+      long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+      AppendEntryResult appendEntryResult = SyncClientAdaptor.appendEntries(client, request);
+      Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+      handler.onComplete(appendEntryResult);
+    } catch (Exception e) {
+      handler.onError(e);
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "{}: append entries {} with {} logs", logDispatcher.member.getName(), receiver,
+          logList.size());
+    }
+  }
+
+  private void appendEntriesAsync(
+      List<ByteBuffer> logList,
+      AppendCompressedEntriesRequest request,
+      List<VotingEntry> currBatch) {
+    AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
+    AsyncRaftServiceClient client = logDispatcher.member.getClient(receiver.getEndpoint());
+    try {
+      long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+      AppendEntryResult appendEntryResult =
+          SyncClientAdaptor.appendCompressedEntries(client, request);
+      Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+      handler.onComplete(appendEntryResult);
+    } catch (Exception e) {
+      handler.onError(e);
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "{}: append entries {} with {} logs", logDispatcher.member.getName(), receiver,
+          logList.size());
+    }
+  }
+
+  protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList) {
+    AppendEntriesRequest request = new AppendEntriesRequest();
+
+    request.setGroupId(logDispatcher.member.getRaftGroupId().convertToTConsensusGroupId());
+    request.setLeader(logDispatcher.member.getThisNode().getEndpoint());
+    request.setLeaderId(logDispatcher.member.getThisNode().getNodeId());
+    request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
+    request.setTerm(logDispatcher.member.getStatus().getTerm().get());
+    request.setEntries(logList);
+    return request;
+  }
+
+  protected AppendCompressedEntriesRequest prepareCompressedRequest(List<ByteBuffer> logList) {
+    AppendCompressedEntriesRequest request = new AppendCompressedEntriesRequest();
+
+    request.setGroupId(logDispatcher.member.getRaftGroupId().convertToTConsensusGroupId());
+    request.setLeader(logDispatcher.member.getThisNode().getEndpoint());
+    request.setLeaderId(logDispatcher.member.getThisNode().getNodeId());
+    request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
+    request.setTerm(logDispatcher.member.getStatus().getTerm().get());
+    request.setEntryBytes(LogUtils.compressEntries(logList, logDispatcher.compressor));
+    request.setCompressionType((byte) logDispatcher.compressor.getType().ordinal());
+    return request;
+  }
+
+  private void sendLogs(List<VotingEntry> currBatch) {
+    if (currBatch.isEmpty()) {
+      return;
+    }
+
+    int logIndex = 0;
+    logger.debug(
+        "send logs from index {} to {}",
+        currBatch.get(0).getEntry().getCurrLogIndex(),
+        currBatch.get(currBatch.size() - 1).getEntry().getCurrLogIndex());
+    while (logIndex < currBatch.size()) {
+      long logSize = 0;
+      long logSizeLimit = logDispatcher.getConfig().getThriftMaxFrameSize();
+      List<ByteBuffer> logList = new ArrayList<>();
+      int prevIndex = logIndex;
+
+      for (; logIndex < currBatch.size(); logIndex++) {
+        VotingEntry entry = currBatch.get(logIndex);
+        long curSize = entry.getAppendEntryRequest().entry.array().length;
+        if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
+          break;
+        }
+        logSize += curSize;
+        logList.add(entry.getAppendEntryRequest().entry);
+        Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+            entry.getEntry().createTime);
+      }
+
+      if (!logDispatcher.enableCompressedDispatching) {
+        AppendEntriesRequest appendEntriesRequest = prepareRequest(logList);
+        appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
+      } else {
+        AppendCompressedEntriesRequest appendEntriesRequest = prepareCompressedRequest(logList);
+        appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
+      }
+
+      if (logDispatcher.getConfig().isUseFollowerLoadBalance()) {
+        FlowMonitorManager.INSTANCE.report(receiver, logSize);
+      }
+      rateLimiter.acquire((int) logSize);
+    }
+  }
+
+  public AppendNodeEntryHandler getAppendNodeEntryHandler(VotingEntry log, Peer node) {
+    AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
+    handler.setDirectReceiver(node);
+    handler.setVotingEntry(log);
+    handler.setMember(logDispatcher.member);
+    return handler;
+  }
+
+  class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
+
+    private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
+
+    private AppendEntriesHandler(List<VotingEntry> batch) {
+      singleEntryHandlers = new ArrayList<>(batch.size());
+      for (VotingEntry sendLogRequest : batch) {
+        AppendNodeEntryHandler handler = getAppendNodeEntryHandler(sendLogRequest, receiver);
+        singleEntryHandlers.add(handler);
+      }
+    }
+
+    @Override
+    public void onComplete(AppendEntryResult aLong) {
+      for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
+        singleEntryHandler.onComplete(aLong);
+      }
+    }
+
+    @Override
+    public void onError(Exception e) {
+      for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
+        singleEntryHandler.onError(e);
+      }
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 463f9f0748..5ab499da20 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,42 +19,23 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
-import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
-import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
-import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
-import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
-import org.apache.iotdb.consensus.natraft.utils.LogUtils;
-import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
-import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
-import org.apache.iotdb.tsfile.compress.ICompressor;
-
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
@@ -67,18 +48,14 @@ public class LogDispatcher {
 
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
   protected RaftMember member;
-  private RaftConfig config;
+  protected RaftConfig config;
   protected List<Peer> allNodes;
   protected List<Peer> newNodes;
-  protected Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = new HashMap<>();
-  protected Map<Peer, Boolean> nodesEnabled;
-  protected Map<Peer, RateLimiter> nodesRateLimiter = new HashMap<>();
+  protected Map<Peer, DispatcherGroup> dispatcherGroupMap = new HashMap<>();
   protected Map<Peer, Double> nodesRate = new HashMap<>();
-  protected Map<Peer, ExecutorService> executorServices = new HashMap<>();
   protected boolean queueOrdered;
   protected boolean enableCompressedDispatching;
   protected ICompressor compressor;
-
   public int bindingThreadNum;
   public int maxBatchSize = 10;
 
@@ -98,39 +75,18 @@ public class LogDispatcher {
   public void updateRateLimiter() {
     logger.info("TEndPoint rates: {}", nodesRate);
     for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) {
-      nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue());
+      dispatcherGroupMap.get(nodeDoubleEntry.getKey()).updateRate(nodeDoubleEntry.getValue());
     }
   }
 
-  void createQueue(Peer node) {
-    BlockingQueue<VotingEntry> logBlockingQueue;
-    logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
-    nodesLogQueuesMap.put(node, logBlockingQueue);
-    nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
-
-    for (int i = 0; i < bindingThreadNum; i++) {
-      executorServices
-          .computeIfAbsent(node, n -> createPool(node))
-          .submit(newDispatcherThread(node, logBlockingQueue));
-    }
-  }
-
-  ExecutorService createPool(Peer node) {
-    return IoTDBThreadPoolFactory.newCachedThreadPool(
-        "LogDispatcher-"
-            + member.getName()
-            + "-"
-            + node.getEndpoint().getIp()
-            + "-"
-            + node.getEndpoint().getPort()
-            + "-"
-            + node.getNodeId());
+  void createDispatcherGroup(Peer node) {
+    dispatcherGroupMap.computeIfAbsent(node, n -> new DispatcherGroup(n, this, bindingThreadNum));
   }
 
   void createQueueAndBindingThreads(Collection<Peer> peers) {
     for (Peer node : peers) {
       if (!node.equals(member.getThisNode())) {
-        createQueue(node);
+        createDispatcherGroup(node);
       }
     }
     updateRateLimiter();
@@ -138,13 +94,9 @@ public class LogDispatcher {
 
   @TestOnly
   public void close() throws InterruptedException {
-    for (Entry<Peer, ExecutorService> entry : executorServices.entrySet()) {
-      ExecutorService pool = entry.getValue();
-      pool.shutdownNow();
-      boolean closeSucceeded = pool.awaitTermination(10, TimeUnit.SECONDS);
-      if (!closeSucceeded) {
-        logger.warn("Cannot shut down dispatcher pool of {}-{}", member.getName(), entry.getKey());
-      }
+    for (Entry<Peer, DispatcherGroup> entry : dispatcherGroupMap.entrySet()) {
+      DispatcherGroup group = entry.getValue();
+      group.close();
     }
   }
 
@@ -160,14 +112,14 @@ public class LogDispatcher {
 
   public void offer(VotingEntry request) {
 
-    for (Entry<Peer, BlockingQueue<VotingEntry>> entry : nodesLogQueuesMap.entrySet()) {
-      if (nodesEnabled != null && !this.nodesEnabled.getOrDefault(entry.getKey(), false)) {
+    for (Entry<Peer, DispatcherGroup> entry : dispatcherGroupMap.entrySet()) {
+      DispatcherGroup dispatcherGroup = entry.getValue();
+      if (!dispatcherGroup.isNodeEnabled()) {
         continue;
       }
 
-      BlockingQueue<VotingEntry> nodeLogQueue = entry.getValue();
       try {
-        boolean addSucceeded = addToQueue(nodeLogQueue, request);
+        boolean addSucceeded = addToQueue(dispatcherGroup.getEntryQueue(), request);
 
         if (!addSucceeded) {
           logger.debug(
@@ -184,226 +136,21 @@ public class LogDispatcher {
     }
   }
 
-  DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> logBlockingQueue) {
-    return new DispatcherThread(node, logBlockingQueue);
-  }
+
 
   public void applyNewNodes() {
     allNodes = newNodes;
     newNodes = null;
 
     List<Peer> nodesToRemove = new ArrayList<>();
-    for (Entry<Peer, ExecutorService> entry : executorServices.entrySet()) {
+    for (Entry<Peer, DispatcherGroup> entry : dispatcherGroupMap.entrySet()) {
       if (!allNodes.contains(entry.getKey())) {
         nodesToRemove.add(entry.getKey());
       }
     }
     for (Peer peer : nodesToRemove) {
-      ExecutorService executorService = executorServices.remove(peer);
-      executorService.shutdownNow();
-      nodesRate.remove(peer);
-      nodesRateLimiter.remove(peer);
-      nodesEnabled.remove(peer);
-      nodesLogQueuesMap.remove(peer);
-    }
-  }
-
-  protected class DispatcherThread implements Runnable {
-
-    Peer receiver;
-    private final BlockingQueue<VotingEntry> logBlockingDeque;
-    protected List<VotingEntry> currBatch = new ArrayList<>();
-    private final String baseName;
-
-    protected DispatcherThread(Peer receiver, BlockingQueue<VotingEntry> logBlockingDeque) {
-      this.receiver = receiver;
-      this.logBlockingDeque = logBlockingDeque;
-      baseName = "LogDispatcher-" + member.getName() + "-" + receiver;
-    }
-
-    @Override
-    public void run() {
-      if (logger.isDebugEnabled()) {
-        Thread.currentThread().setName(baseName);
-      }
-      try {
-        while (!Thread.interrupted()) {
-          synchronized (logBlockingDeque) {
-            VotingEntry poll = logBlockingDeque.poll();
-            if (poll != null) {
-              currBatch.add(poll);
-              logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
-            } else {
-              logBlockingDeque.wait(10);
-              continue;
-            }
-          }
-          if (logger.isDebugEnabled()) {
-            logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
-          }
-          serializeEntries();
-          if (!queueOrdered) {
-            currBatch.sort(Comparator.comparingLong(s -> s.getEntry().getCurrLogIndex()));
-          }
-          sendLogs(currBatch);
-          currBatch.clear();
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      } catch (Exception e) {
-        logger.error("Unexpected error in log dispatcher", e);
-      }
-      logger.info("Dispatcher exits");
-    }
-
-    protected void serializeEntries() throws InterruptedException {
-      for (VotingEntry request : currBatch) {
-
-        request.getAppendEntryRequest().entry = request.getEntry().serialize();
-        request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit());
-      }
-    }
-
-    private void appendEntriesAsync(
-        List<ByteBuffer> logList, AppendEntriesRequest request, List<VotingEntry> currBatch) {
-      AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
-      AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
-      try {
-        long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
-        AppendEntryResult appendEntryResult = SyncClientAdaptor.appendEntries(client, request);
-        Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
-        handler.onComplete(appendEntryResult);
-      } catch (Exception e) {
-        handler.onError(e);
-      }
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
-      }
-    }
-
-    private void appendEntriesAsync(
-        List<ByteBuffer> logList,
-        AppendCompressedEntriesRequest request,
-        List<VotingEntry> currBatch) {
-      AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
-      AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
-      try {
-        long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
-        AppendEntryResult appendEntryResult =
-            SyncClientAdaptor.appendCompressedEntries(client, request);
-        Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
-        handler.onComplete(appendEntryResult);
-      } catch (Exception e) {
-        handler.onError(e);
-      }
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
-      }
-    }
-
-    protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList) {
-      AppendEntriesRequest request = new AppendEntriesRequest();
-
-      request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
-      request.setLeader(member.getThisNode().getEndpoint());
-      request.setLeaderId(member.getThisNode().getNodeId());
-      request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-      request.setTerm(member.getStatus().getTerm().get());
-      request.setEntries(logList);
-      return request;
-    }
-
-    protected AppendCompressedEntriesRequest prepareCompressedRequest(List<ByteBuffer> logList) {
-      AppendCompressedEntriesRequest request = new AppendCompressedEntriesRequest();
-
-      request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
-      request.setLeader(member.getThisNode().getEndpoint());
-      request.setLeaderId(member.getThisNode().getNodeId());
-      request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-      request.setTerm(member.getStatus().getTerm().get());
-      request.setEntryBytes(LogUtils.compressEntries(logList, compressor));
-      request.setCompressionType((byte) compressor.getType().ordinal());
-      return request;
-    }
-
-    private void sendLogs(List<VotingEntry> currBatch) {
-      if (currBatch.isEmpty()) {
-        return;
-      }
-
-      int logIndex = 0;
-      logger.debug(
-          "send logs from index {} to {}",
-          currBatch.get(0).getEntry().getCurrLogIndex(),
-          currBatch.get(currBatch.size() - 1).getEntry().getCurrLogIndex());
-      while (logIndex < currBatch.size()) {
-        long logSize = 0;
-        long logSizeLimit = config.getThriftMaxFrameSize();
-        List<ByteBuffer> logList = new ArrayList<>();
-        int prevIndex = logIndex;
-
-        for (; logIndex < currBatch.size(); logIndex++) {
-          VotingEntry entry = currBatch.get(logIndex);
-          long curSize = entry.getAppendEntryRequest().entry.array().length;
-          if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
-            break;
-          }
-          logSize += curSize;
-          logList.add(entry.getAppendEntryRequest().entry);
-          Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
-              entry.getEntry().createTime);
-        }
-
-        if (!enableCompressedDispatching) {
-          AppendEntriesRequest appendEntriesRequest = prepareRequest(logList);
-          appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
-        } else {
-          AppendCompressedEntriesRequest appendEntriesRequest = prepareCompressedRequest(logList);
-          appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
-        }
-
-        if (config.isUseFollowerLoadBalance()) {
-          FlowMonitorManager.INSTANCE.report(receiver, logSize);
-        }
-        nodesRateLimiter.get(receiver).acquire((int) logSize);
-      }
-    }
-
-    public AppendNodeEntryHandler getAppendNodeEntryHandler(VotingEntry log, Peer node) {
-      AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
-      handler.setDirectReceiver(node);
-      handler.setVotingEntry(log);
-      handler.setMember(member);
-      return handler;
-    }
-
-    class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
-
-      private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
-
-      private AppendEntriesHandler(List<VotingEntry> batch) {
-        singleEntryHandlers = new ArrayList<>(batch.size());
-        for (VotingEntry sendLogRequest : batch) {
-          AppendNodeEntryHandler handler = getAppendNodeEntryHandler(sendLogRequest, receiver);
-          singleEntryHandlers.add(handler);
-        }
-      }
-
-      @Override
-      public void onComplete(AppendEntryResult aLong) {
-        for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
-          singleEntryHandler.onComplete(aLong);
-        }
-      }
-
-      @Override
-      public void onError(Exception e) {
-        for (AsyncMethodCallback<AppendEntryResult> singleEntryHandler : singleEntryHandlers) {
-          singleEntryHandler.onError(e);
-        }
-      }
+      DispatcherGroup removed = dispatcherGroupMap.remove(peer);
+      removed.close();
     }
   }
 
@@ -411,16 +158,24 @@ public class LogDispatcher {
     return nodesRate;
   }
 
-  public Map<Peer, BlockingQueue<VotingEntry>> getNodesLogQueuesMap() {
-    return nodesLogQueuesMap;
+  public Map<Peer, DispatcherGroup> getDispatcherGroupMap() {
+    return dispatcherGroupMap;
   }
 
   public void setNewNodes(List<Peer> newNodes) {
     this.newNodes = newNodes;
     for (Peer newNode : newNodes) {
       if (!allNodes.contains(newNode)) {
-        createQueue(newNode);
+        createDispatcherGroup(newNode);
       }
     }
   }
+
+  public RaftConfig getConfig() {
+    return config;
+  }
+
+  public RaftMember getMember() {
+    return member;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 3c838f81b7..27ffe6b86d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -19,26 +19,23 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
 
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
-import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.DispatcherGroup;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 public class FlowBalancer {
 
   private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class);
@@ -88,11 +85,11 @@ public class FlowBalancer {
     double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
     double assumedFlow = thisNodeFlow * overestimateFactor;
     logger.info("Flow of this node: {}", thisNodeFlow);
-    Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap();
+    Map<Peer, DispatcherGroup> dispatcherGroupMap = logDispatcher.getDispatcherGroupMap();
     Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
 
     // sort followers according to their queue length
-    followers.sort(Comparator.comparing(node -> nodesLogQueuesMap.get(node).size()));
+    followers.sort(Comparator.comparing(node -> dispatcherGroupMap.get(node).getQueueSize()));
     if (assumedFlow * followerNum > maxFlow) {
       enterBurst(nodesRate, nodeNum, assumedFlow, followers);
     } else {