You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/13 06:02:09 UTC

[iotdb] branch native_raft updated: add compressed append entries

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 8f90527226 add compressed append entries
8f90527226 is described below

commit 8f90527226d3f5f341405ffcf16d870a42f0097a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Mar 13 14:03:54 2023 +0800

    add compressed append entries
---
 .../natraft/client/SyncClientAdaptor.java          |   9 ++
 .../consensus/natraft/protocol/RaftConfig.java     |  20 ++++
 .../consensus/natraft/protocol/RaftMember.java     |  25 ++---
 .../protocol/log/appender/BlockingLogAppender.java | 108 ++++++---------------
 .../natraft/protocol/log/appender/LogAppender.java |   4 +-
 .../log/appender/SlidingWindowLogAppender.java     |  16 +--
 .../protocol/log/dispatch/LogDispatcher.java       |  73 +++++++++++++-
 .../natraft/service/RaftRPCServiceProcessor.java   |  40 ++++++++
 .../iotdb/consensus/natraft/utils/LogUtils.java    |  77 +++++++++++++++
 .../iotdb/consensus/natraft/utils/Timer.java       |  30 +++++-
 thrift-raft/src/main/thrift/raft.thrift            |  14 +++
 11 files changed, 304 insertions(+), 112 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index 82196bf9fa..b1977de7d1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -9,6 +9,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 import org.apache.iotdb.consensus.raft.thrift.ExecuteReq;
@@ -83,6 +84,14 @@ public class SyncClientAdaptor {
     return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
   }
 
+  public static AppendEntryResult appendCompressedEntries(
+      AsyncRaftServiceClient client, AppendCompressedEntriesRequest request)
+      throws TException, InterruptedException {
+    GenericHandler<AppendEntryResult> matchTermHandler = new GenericHandler<>(client.getEndpoint());
+    client.appendCompressedEntries(request, matchTermHandler);
+    return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
+  }
+
   public static TSStatus forceElection(AsyncRaftServiceClient client, ConsensusGroupId groupId)
       throws TException, InterruptedException {
     GenericHandler<TSStatus> matchTermHandler = new GenericHandler<>(client.getEndpoint());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 78fb556cbd..6414a0ccfc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -25,6 +25,7 @@ package org.apache.iotdb.consensus.natraft.protocol;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RPCConfig;
 import org.apache.iotdb.consensus.natraft.protocol.consistency.ConsistencyLevel;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
 import java.io.File;
 import java.util.concurrent.TimeUnit;
@@ -89,6 +90,9 @@ public class RaftConfig {
   private int flushRaftLogThreshold = 100_000;
   private long maxSyncLogLag = 100_000;
   private long syncLeaderMaxWaitMs = 30_000;
+
+  private boolean enableCompressedDispatching = true;
+  private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
   private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
   private RPCConfig rpcConfig;
 
@@ -423,4 +427,20 @@ public class RaftConfig {
   public RPCConfig getRpcConfig() {
     return rpcConfig;
   }
+
+  public boolean isEnableCompressedDispatching() {
+    return enableCompressedDispatching;
+  }
+
+  public void setEnableCompressedDispatching(boolean enableCompressedDispatching) {
+    this.enableCompressedDispatching = enableCompressedDispatching;
+  }
+
+  public CompressionType getDispatchingCompressionType() {
+    return dispatchingCompressionType;
+  }
+
+  public void setDispatchingCompressionType(CompressionType dispatchingCompressionType) {
+    this.dispatchingCompressionType = dispatchingCompressionType;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index ea26a6d3c9..8d96d73179 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -47,11 +47,11 @@ import org.apache.iotdb.consensus.natraft.protocol.heartbeat.ElectionReqHandler;
 import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatReqHandler;
 import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatThread;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.appender.BlockingLogAppender;
 import org.apache.iotdb.consensus.natraft.protocol.log.appender.LogAppender;
 import org.apache.iotdb.consensus.natraft.protocol.log.appender.LogAppenderFactory;
+import org.apache.iotdb.consensus.natraft.protocol.log.appender.SlidingWindowLogAppender.Factory;
 import org.apache.iotdb.consensus.natraft.protocol.log.applier.AsyncLogApplier;
 import org.apache.iotdb.consensus.natraft.protocol.log.applier.BaseApplier;
 import org.apache.iotdb.consensus.natraft.protocol.log.catchup.CatchUpManager;
@@ -93,7 +93,6 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -116,7 +115,7 @@ public class RaftMember {
   private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
 
   private RaftConfig config;
-  protected static final LogAppenderFactory appenderFactory = new BlockingLogAppender.Factory();
+  protected final LogAppenderFactory appenderFactory;
 
   protected static final LogSequencerFactory SEQUENCER_FACTORY = new SynchronousSequencer.Factory();
 
@@ -235,6 +234,8 @@ public class RaftMember {
             stateMachine,
             config,
             this::examineUnappliedEntry);
+    this.appenderFactory =
+        config.isUseFollowerSlidingWindow() ? new Factory() : new BlockingLogAppender.Factory();
     this.logAppender = appenderFactory.create(this, config);
     this.logSequencer = SEQUENCER_FACTORY.create(this, config);
     this.logDispatcher = new LogDispatcher(this, config);
@@ -517,21 +518,11 @@ public class RaftMember {
     }
 
     AppendEntryResult response;
-    List<Entry> entries = new ArrayList<>();
-    for (ByteBuffer buffer : request.getEntries()) {
-      buffer.mark();
-      Entry e;
-      try {
-        e = LogParser.getINSTANCE().parse(buffer);
-        e.setByteSize(buffer.limit() - buffer.position());
-      } catch (BufferUnderflowException ex) {
-        buffer.reset();
-        throw ex;
-      }
-      entries.add(e);
-    }
+    List<Entry> entries = LogUtils.parseEntries(request.entries);
 
-    response = logAppender.appendEntries(request, entries);
+    response =
+        logAppender.appendEntries(
+            request.prevLogIndex, request.prevLogTerm, request.leaderCommit, request.term, entries);
 
     if (logger.isDebugEnabled()) {
       logger.debug(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index ed4cb9bff0..8138562e3e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -25,14 +25,12 @@ import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
 import org.apache.iotdb.consensus.natraft.utils.Response;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -56,56 +54,6 @@ public class BlockingLogAppender implements LogAppender {
     this.config = config;
   }
 
-  /**
-   * Find the local previous log of "log". If such log is found, discard all local logs behind it
-   * and append "log" to it. Otherwise report a log mismatch.
-   *
-   * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
-   */
-  public AppendEntryResult appendEntry(AppendEntryRequest request, Entry log) {
-    long resp = checkPrevLogIndex(request.prevLogIndex);
-    if (resp != Response.RESPONSE_AGREE) {
-      return new AppendEntryResult(resp)
-          .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
-    }
-
-    long startWaitingTime = System.currentTimeMillis();
-    long success;
-    AppendEntryResult result = new AppendEntryResult();
-    while (true) {
-      // TODO: Consider memory footprint to execute a precise rejection
-      if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
-          <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
-        success =
-            logManager.maybeAppend(
-                request.prevLogIndex, request.prevLogTerm, Collections.singletonList(log));
-        member.tryUpdateCommitIndex(
-            request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
-        break;
-      }
-      try {
-        TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
-        if (System.currentTimeMillis() - startWaitingTime
-            > config.getMaxWaitingTimeWhenInsertBlocked()) {
-          result.status = Response.RESPONSE_TOO_BUSY;
-          return result;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-    if (success != -1) {
-      logger.debug("{} append a new log {}", member.getName(), log);
-      result.status = Response.RESPONSE_STRONG_ACCEPT;
-    } else {
-      // the incoming log points to an illegal position, reject it
-      result.status = Response.RESPONSE_LOG_MISMATCH;
-    }
-    result.setGroupId(request.getGroupId());
-    return result;
-  }
-
   /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
@@ -153,19 +101,23 @@ public class BlockingLogAppender implements LogAppender {
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
    *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> logs) {
+  public AppendEntryResult appendEntries(
+      long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> logs) {
     logger.debug(
         "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
         member.getName(),
-        request.prevLogIndex,
-        request.prevLogTerm,
-        request.leaderCommit);
+        prevLogIndex,
+        prevLogTerm,
+        leaderCommit);
     if (logs.isEmpty()) {
       return new AppendEntryResult(Response.RESPONSE_AGREE)
           .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
     }
 
-    long resp = checkPrevLogIndex(request.prevLogIndex);
+    long startTime = Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
+    long resp = checkPrevLogIndex(prevLogIndex);
+    Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
+
     if (resp != Response.RESPONSE_AGREE) {
       return new AppendEntryResult(resp)
           .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -178,6 +130,7 @@ public class BlockingLogAppender implements LogAppender {
       }
     }
 
+    startTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     AppendEntryResult result = new AppendEntryResult();
     long startWaitingTime = System.currentTimeMillis();
     while (true) {
@@ -185,8 +138,10 @@ public class BlockingLogAppender implements LogAppender {
           <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
         resp =
             lastConfigEntry == null
-                ? appendWithoutConfigChange(request, logs, result)
-                : appendWithConfigChange(request, logs, result, lastConfigEntry);
+                ? appendWithoutConfigChange(
+                    prevLogIndex, prevLogTerm, leaderCommit, term, logs, result)
+                : appendWithConfigChange(
+                    prevLogIndex, prevLogTerm, leaderCommit, term, logs, result, lastConfigEntry);
         break;
       }
 
@@ -201,35 +156,35 @@ public class BlockingLogAppender implements LogAppender {
         Thread.currentThread().interrupt();
       }
     }
+    Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
 
     return result;
   }
 
   protected long appendWithConfigChange(
-      AppendEntriesRequest request,
+      long prevLogIndex,
+      long prevLogTerm,
+      long leaderCommit,
+      long term,
       List<Entry> logs,
       AppendEntryResult result,
       ConfigChangeEntry configChangeEntry) {
     long resp;
     try {
       logManager.getLock().writeLock().lock();
-      resp = logManager.maybeAppend(request.prevLogIndex, request.prevLogTerm, logs);
+      resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, logs);
 
       if (resp != -1) {
         if (logger.isDebugEnabled()) {
           logger.debug(
-              "{} append a new log list {}, commit to {}",
-              member.getName(),
-              logs,
-              request.leaderCommit);
+              "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
         }
 
         result.status = Response.RESPONSE_STRONG_ACCEPT;
         result.setLastLogIndex(logManager.getLastLogIndex());
         result.setLastLogTerm(logManager.getLastLogTerm());
         member.setNewNodes(configChangeEntry.getNewPeers());
-        member.tryUpdateCommitIndex(
-            request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
+        member.tryUpdateCommitIndex(term, leaderCommit, logManager.getTerm(leaderCommit));
       } else {
         // the incoming log points to an illegal position, reject it
         result.status = Response.RESPONSE_LOG_MISMATCH;
@@ -241,21 +196,22 @@ public class BlockingLogAppender implements LogAppender {
   }
 
   protected long appendWithoutConfigChange(
-      AppendEntriesRequest request, List<Entry> logs, AppendEntryResult result) {
-    long resp = logManager.maybeAppend(request.prevLogIndex, request.prevLogTerm, logs);
+      long prevLogIndex,
+      long prevLogTerm,
+      long leaderCommit,
+      long term,
+      List<Entry> logs,
+      AppendEntryResult result) {
+    long resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, logs);
     if (resp != -1) {
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "{} append a new log list {}, commit to {}",
-            member.getName(),
-            logs,
-            request.leaderCommit);
+            "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
       }
       result.status = Response.RESPONSE_STRONG_ACCEPT;
       result.setLastLogIndex(logManager.getLastLogIndex());
       result.setLastLogTerm(logManager.getLastLogTerm());
-      member.tryUpdateCommitIndex(
-          request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
+      member.tryUpdateCommitIndex(term, leaderCommit, logManager.getTerm(leaderCommit));
 
     } else {
       // the incoming log points to an illegal position, reject it
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
index 2ec3eceb52..1d0bb8f56d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.consensus.natraft.protocol.log.appender;
 
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 
 import java.util.List;
@@ -31,7 +30,8 @@ import java.util.List;
  */
 public interface LogAppender {
 
-  AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> entries);
+  AppendEntryResult appendEntries(
+      long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> entries);
 
   void reset();
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index ab89f354cf..0985204aa5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
 import org.apache.iotdb.consensus.natraft.utils.Response;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 
 import org.slf4j.Logger;
@@ -62,8 +61,6 @@ public class SlidingWindowLogAppender implements LogAppender {
   /**
    * After insert an entry into the window, check if its previous and latter entries should be
    * removed if it mismatches.
-   *
-   * @param pos
    */
   private void checkLog(int pos) {
     checkLogPrev(pos);
@@ -108,10 +105,6 @@ public class SlidingWindowLogAppender implements LogAppender {
   /**
    * Flush window range [0, flushPos) into the LogManager, where flushPos is the first null position
    * in the window.
-   *
-   * @param result
-   * @param leaderCommit
-   * @return
    */
   private long flushWindow(AppendEntryResult result, long leaderCommit) {
     long windowPrevLogIndex = firstPosPrevIndex;
@@ -183,7 +176,8 @@ public class SlidingWindowLogAppender implements LogAppender {
   }
 
   @Override
-  public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> entries) {
+  public AppendEntryResult appendEntries(
+      long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> entries) {
     if (entries.isEmpty()) {
       return new AppendEntryResult(Response.RESPONSE_AGREE)
           .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -191,15 +185,15 @@ public class SlidingWindowLogAppender implements LogAppender {
 
     AppendEntryResult result = null;
     for (Entry entry : entries) {
-      result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, entry);
+      result = appendEntry(prevLogIndex, prevLogTerm, leaderCommit, entry);
 
       if (result.status != Response.RESPONSE_AGREE
           && result.status != Response.RESPONSE_STRONG_ACCEPT
           && result.status != Response.RESPONSE_WEAK_ACCEPT) {
         return result;
       }
-      request.prevLogIndex = entry.getCurrLogIndex();
-      request.prevLogTerm = entry.getCurrLogTerm();
+      prevLogIndex = entry.getCurrLogIndex();
+      prevLogTerm = entry.getCurrLogTerm();
     }
 
     return result;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 27ea860aad..3b83a0cd7d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -29,8 +29,12 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.apache.iotdb.tsfile.compress.ICompressor;
 
 import com.google.common.util.concurrent.RateLimiter;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -74,6 +78,8 @@ public class LogDispatcher {
   protected ExecutorService resultHandlerThread =
       IoTDBThreadPoolFactory.newFixedThreadPool(2, "AppendResultHandler");
   protected boolean queueOrdered;
+  protected boolean enableCompressedDispatching;
+  protected ICompressor compressor;
 
   public int bindingThreadNum;
   public static int maxBatchSize = 10;
@@ -82,6 +88,8 @@ public class LogDispatcher {
     this.member = member;
     this.config = config;
     this.queueOrdered = !(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance());
+    this.enableCompressedDispatching = config.isEnableCompressedDispatching();
+    this.compressor = ICompressor.getCompressor(config.getDispatchingCompressionType());
     this.bindingThreadNum = config.getDispatcherBindingThreadNum();
     if (!queueOrdered) {
       maxBatchSize = 1;
@@ -266,7 +274,30 @@ public class LogDispatcher {
       AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
       AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
       try {
+        long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
         AppendEntryResult appendEntryResult = SyncClientAdaptor.appendEntries(client, request);
+        Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+        handler.onComplete(appendEntryResult);
+      } catch (Exception e) {
+        handler.onError(e);
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
+      }
+    }
+
+    private void appendEntriesAsync(
+        List<ByteBuffer> logList,
+        AppendCompressedEntriesRequest request,
+        List<VotingEntry> currBatch) {
+      AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
+      AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
+      try {
+        long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+        AppendEntryResult appendEntryResult =
+            SyncClientAdaptor.appendCompressedEntries(client, request);
+        Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
         handler.onComplete(appendEntryResult);
       } catch (Exception e) {
         handler.onError(e);
@@ -299,6 +330,29 @@ public class LogDispatcher {
       return request;
     }
 
+    protected AppendCompressedEntriesRequest prepareCompressedRequest(
+        List<ByteBuffer> logList, List<VotingEntry> currBatch, int firstIndex) {
+      AppendCompressedEntriesRequest request = new AppendCompressedEntriesRequest();
+
+      request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
+      request.setLeader(member.getThisNode().getEndpoint());
+      request.setLeaderId(member.getThisNode().getNodeId());
+      request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
+
+      request.setTerm(member.getStatus().getTerm().get());
+
+      request.setEntryBytes(LogUtils.compressEntries(logList, compressor));
+      request.setCompressionType((byte) compressor.getType().ordinal());
+      // set index for raft
+      request.setPrevLogIndex(currBatch.get(firstIndex).getEntry().getCurrLogIndex() - 1);
+      try {
+        request.setPrevLogTerm(currBatch.get(firstIndex).getAppendEntryRequest().prevLogTerm);
+      } catch (Exception e) {
+        logger.error("getTerm failed for newly append entries", e);
+      }
+      return request;
+    }
+
     private void sendLogs(List<VotingEntry> currBatch) {
       if (currBatch.isEmpty()) {
         return;
@@ -316,21 +370,30 @@ public class LogDispatcher {
         int prevIndex = logIndex;
 
         for (; logIndex < currBatch.size(); logIndex++) {
-          long curSize = currBatch.get(logIndex).getAppendEntryRequest().entry.array().length;
+          VotingEntry entry = currBatch.get(logIndex);
+          long curSize = entry.getAppendEntryRequest().entry.array().length;
           if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
             break;
           }
           logSize += curSize;
-          logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry);
+          logList.add(entry.getAppendEntryRequest().entry);
+          Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+              entry.getEntry().createTime);
+        }
+
+        if (!enableCompressedDispatching) {
+          AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
+          appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
+        } else {
+          AppendCompressedEntriesRequest appendEntriesRequest =
+              prepareCompressedRequest(logList, currBatch, prevIndex);
+          appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
         }
 
-        AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
         if (config.isUseFollowerLoadBalance()) {
           FlowMonitorManager.INSTANCE.report(receiver, logSize);
         }
         nodesRateLimiter.get(receiver).acquire((int) logSize);
-
-        appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
       }
     }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index df31eeba40..fced0f868a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -32,6 +32,9 @@ import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
 import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
 import org.apache.iotdb.consensus.natraft.utils.IOUtils;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -42,6 +45,8 @@ import org.apache.iotdb.consensus.raft.thrift.NoMemberException;
 import org.apache.iotdb.consensus.raft.thrift.RaftService;
 import org.apache.iotdb.consensus.raft.thrift.RequestCommitIndexResponse;
 import org.apache.iotdb.consensus.raft.thrift.SendSnapshotRequest;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -50,6 +55,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
 
@@ -135,12 +141,46 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
   public void appendEntries(
       AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
       throws TException {
+    long startTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
     RaftMember member = getMemberOrCreate(request.groupId, request);
     try {
       resultHandler.onComplete(member.appendEntries(request));
     } catch (UnknownLogTypeException e) {
       throw new TException(e);
     }
+    Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(startTime);
+  }
+
+  @Override
+  public void appendCompressedEntries(
+      AppendCompressedEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
+      throws TException {
+    long startTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
+    AppendEntriesRequest decompressedRequest = new AppendEntriesRequest();
+    decompressedRequest
+        .setTerm(request.getTerm())
+        .setLeader(request.leader)
+        .setPrevLogIndex(request.prevLogIndex)
+        .setPrevLogTerm(request.prevLogTerm)
+        .setLeaderCommit(request.leaderCommit)
+        .setGroupId(request.groupId)
+        .setLeaderId(request.leaderId);
+
+    try {
+      long compressionStartTime = Statistic.RAFT_RECEIVER_DECOMPRESS_ENTRY.getOperationStartTime();
+      List<ByteBuffer> buffers =
+          LogUtils.decompressEntries(
+              request.entryBytes,
+              IUnCompressor.getUnCompressor(CompressionType.values()[request.compressionType]));
+      decompressedRequest.setEntries(buffers);
+      Statistic.RAFT_RECEIVER_DECOMPRESS_ENTRY.calOperationCostTimeFromStart(compressionStartTime);
+
+      RaftMember member = getMemberOrCreate(request.groupId, decompressedRequest);
+      resultHandler.onComplete(member.appendEntries(decompressedRequest));
+    } catch (UnknownLogTypeException | IOException e) {
+      throw new TException(e);
+    }
+    Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(startTime);
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index 745e1d5fef..c536763f37 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -19,15 +19,26 @@
 
 package org.apache.iotdb.consensus.natraft.utils;
 
+import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 public class LogUtils {
 
@@ -77,4 +88,70 @@ public class LogUtils {
     }
     return sendLogRequest;
   }
+
+  public static ByteBuffer compressEntries(List<ByteBuffer> entryByteList, ICompressor compressor) {
+    PublicBAOS baos = new PublicBAOS();
+    DataOutputStream dataOutputStream = new DataOutputStream(baos);
+    try {
+      dataOutputStream.writeInt(entryByteList.size());
+      for (ByteBuffer byteBuffer : entryByteList) {
+        dataOutputStream.writeInt(byteBuffer.remaining());
+        dataOutputStream.write(
+            byteBuffer.array(),
+            byteBuffer.arrayOffset() + byteBuffer.position(),
+            byteBuffer.remaining());
+      }
+      Statistic.LOG_DISPATCHER_RAW_SIZE.add(baos.size());
+      byte[] compressed = compressor.compress(baos.getBuf(), 0, baos.size());
+      Statistic.LOG_DISPATCHER_COMPRESSED_SIZE.add(compressed.length);
+      return ByteBuffer.wrap(compressed);
+    } catch (IOException e) {
+      logger.warn("Failed to compress entries", e);
+    }
+    return null;
+  }
+
+  public static List<ByteBuffer> decompressEntries(ByteBuffer buffer, IUnCompressor unCompressor)
+      throws IOException {
+    int uncompressedLength =
+        unCompressor.getUncompressedLength(
+            buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+    byte[] uncompressed = new byte[uncompressedLength];
+    unCompressor.uncompress(
+        buffer.array(),
+        buffer.arrayOffset() + buffer.position(),
+        buffer.remaining(),
+        uncompressed,
+        0);
+    ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompressed);
+
+    int count = uncompressedBuffer.getInt();
+    List<ByteBuffer> buffers = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      int size = uncompressedBuffer.getInt();
+      ByteBuffer slice = uncompressedBuffer.slice();
+      slice.limit(slice.position() + size);
+      buffers.add(slice);
+      uncompressedBuffer.position(uncompressedBuffer.position() + size);
+    }
+
+    return buffers;
+  }
+
+  public static List<Entry> parseEntries(List<ByteBuffer> buffers) throws UnknownLogTypeException {
+    List<Entry> entries = new ArrayList<>();
+    for (ByteBuffer buffer : buffers) {
+      buffer.mark();
+      Entry e;
+      try {
+        e = LogParser.getINSTANCE().parse(buffer);
+        e.setByteSize(buffer.limit() - buffer.position());
+      } catch (BufferUnderflowException ex) {
+        buffer.reset();
+        throw ex;
+      }
+      entries.add(e);
+    }
+    return entries;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index ba3a85ac31..df654f5bfd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -103,6 +103,12 @@ public class Timer {
         TIME_SCALE,
         true,
         DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+    RAFT_SENDER_SEND_LOG(
+        RAFT_MEMBER_SENDER,
+        "send log to a follower",
+        TIME_SCALE,
+        true,
+        DATA_GROUP_MEMBER_LOCAL_EXECUTION),
     RAFT_SENDER_BUILD_LOG_REQUEST(
         RAFT_MEMBER_SENDER,
         "build SendLogRequest",
@@ -184,6 +190,12 @@ public class Timer {
     RAFT_SENDER_DATA_LOG_APPLY(
         RAFT_MEMBER_SENDER, "apply data log", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
     // raft member - receiver
+    RAFT_RECEIVER_DECOMPRESS_ENTRY(
+        RAFT_MEMBER_RECEIVER,
+        "receiver decompress entries",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_RECEIVER_WAIT_FOR_PREV_LOG(
         RAFT_MEMBER_RECEIVER,
         "receiver wait for prev log",
@@ -302,6 +314,18 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_RAW_SIZE(
+        LOG_DISPATCHER,
+        "raw dispatching size",
+        1,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    LOG_DISPATCHER_COMPRESSED_SIZE(
+        LOG_DISPATCHER,
+        "compressed dispatching size",
+        1,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
     RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
     RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
@@ -421,8 +445,12 @@ public class Timer {
       if (!ENABLE_INSTRUMENTING) {
         return "";
       }
-      StringBuilder result = new StringBuilder();
+      StringBuilder result = new StringBuilder("\n");
       printTo(Statistic.ROOT, result);
+      result
+          .append("Dispatcher compression ratio: ")
+          .append(LOG_DISPATCHER_COMPRESSED_SIZE.getSum() * 1.0 / LOG_DISPATCHER_RAW_SIZE.getSum())
+          .append("\n");
       return result.toString();
     }
 
diff --git a/thrift-raft/src/main/thrift/raft.thrift b/thrift-raft/src/main/thrift/raft.thrift
index e2e6fa7e80..67466b2080 100644
--- a/thrift-raft/src/main/thrift/raft.thrift
+++ b/thrift-raft/src/main/thrift/raft.thrift
@@ -31,6 +31,18 @@ struct AppendEntriesRequest {
   8: required i32 leaderId
 }
 
+struct AppendCompressedEntriesRequest {
+  1: required i64 term // leader's
+  2: required common.TEndPoint leader
+  3: required binary entryBytes // data
+  4: required i64 prevLogIndex
+  5: required i64 prevLogTerm
+  6: required i64 leaderCommit
+  7: required common.TConsensusGroupId groupId
+  8: required i32 leaderId
+  9: required i8 compressionType
+}
+
 struct AppendEntryResult {
   1: required i64 status;
   2: optional i64 lastLogTerm;
@@ -151,6 +163,8 @@ service RaftService {
   **/
   AppendEntryResult appendEntries(1:AppendEntriesRequest request)
 
+  AppendEntryResult appendCompressedEntries(1:AppendCompressedEntriesRequest request)
+
   common.TSStatus sendSnapshot(1:SendSnapshotRequest request)
 
   /**