You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/01 11:08:54 UTC

[iotdb] 01/02: temp save

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4aa5448b1eceb0e499266678d3677787e0d9a010
Author: jt <jt...@163.com>
AuthorDate: Tue May 25 09:40:24 2021 +0800

    temp save
---
 cluster/distribute.sh                              |  19 ++
 cluster/src/assembly/resources/sbin/expr-bench.sh  |  73 ++++++
 cluster/src/assembly/resources/sbin/expr-server.sh |  73 ++++++
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 +
 .../org/apache/iotdb/cluster/expr/ExprBench.java   | 103 ++++++++
 .../ExprLogDispatcher.java}                        | 217 ++++-------------
 .../org/apache/iotdb/cluster/expr/ExprMember.java  | 151 ++++++++++++
 .../org/apache/iotdb/cluster/expr/ExprServer.java  |  90 +++++++
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 103 ++++++++
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  35 ++-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  33 +++
 .../log/manage/UnCommittedEntryManager.java        |  12 +-
 .../serializable/SyncLogDequeSerializer.java       |   2 +-
 .../iotdb/cluster/server/DataClusterServer.java    |   8 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |  24 +-
 .../apache/iotdb/cluster/server/RaftServer.java    |  14 +-
 .../server/handlers/caller/HeartbeatHandler.java   |   2 +-
 .../handlers/forwarder/IndirectAppendHandler.java  |  49 ++++
 .../cluster/server/heartbeat/HeartbeatThread.java  |   3 +-
 .../server/heartbeat/MetaHeartbeatServer.java      |   4 +-
 .../server/heartbeat/MetaHeartbeatThread.java      |  10 +-
 .../cluster/server/member/MetaGroupMember.java     |  26 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 264 ++++++++++++++++-----
 .../cluster/server/service/BaseAsyncService.java   |  19 ++
 .../cluster/server/service/BaseSyncService.java    |  52 ++--
 .../apache/iotdb/cluster/utils/ClientUtils.java    |   2 +
 .../apache/iotdb/cluster/utils/ClusterNode.java    |   2 +-
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |  12 +
 pom.xml                                            |  52 ++--
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   2 +
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   1 +
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   7 +-
 .../apache/iotdb/db/qp/physical/sys/ExprPlan.java  |  84 +++++++
 thrift-cluster/src/main/thrift/cluster.thrift      |  21 ++
 34 files changed, 1239 insertions(+), 340 deletions(-)

diff --git a/cluster/distribute.sh b/cluster/distribute.sh
new file mode 100644
index 0000000..98758da
--- /dev/null
+++ b/cluster/distribute.sh
@@ -0,0 +1,19 @@
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+
+ips=(fit36 fit38 fit39)
+target_lib_path=/data/iotdb_expr/lib/
+
+for ip in ${ips[*]}
+  do
+    ssh fit@$ip "mkdir $target_lib_path"
+    scp -r $src_lib_path fit@$ip:$target_lib_path
+  done
+
+ips=(fit31 fit32 fit33 fit34)
+target_lib_path=/disk/iotdb_expr/lib/
+
+for ip in ${ips[*]}
+  do
+    ssh fit@$ip "mkdir $target_lib_path"
+    scp -r $src_lib_path fit@$ip:$target_lib_path
+  done
\ No newline at end of file
diff --git a/cluster/src/assembly/resources/sbin/expr-bench.sh b/cluster/src/assembly/resources/sbin/expr-bench.sh
new file mode 100644
index 0000000..d4fa092
--- /dev/null
+++ b/cluster/src/assembly/resources/sbin/expr-bench.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting IoTDB (Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+enable_printgc=false
+if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+  enable_printgc=true;
+  shift
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+    for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+        if [ -x "$java" ]; then
+            JAVA="$java"
+            break
+        fi
+    done
+else
+    JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+    echo Unable to find java executable. Check JAVA_HOME and PATH environment variables.  > /dev/stderr
+    exit 1;
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.cluster.expr.ExprBench
+
+launch_service()
+{
+	iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+	iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@
+	return $?
+}
+
+# Start up the service
+launch_service "$classname" $@
+
+exit $?
diff --git a/cluster/src/assembly/resources/sbin/expr-server.sh b/cluster/src/assembly/resources/sbin/expr-server.sh
new file mode 100644
index 0000000..f323d0b
--- /dev/null
+++ b/cluster/src/assembly/resources/sbin/expr-server.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting IoTDB (Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+enable_printgc=false
+if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+  enable_printgc=true;
+  shift
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+    for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+        if [ -x "$java" ]; then
+            JAVA="$java"
+            break
+        fi
+    done
+else
+    JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+    echo Unable to find java executable. Check JAVA_HOME and PATH environment variables.  > /dev/stderr
+    exit 1;
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.cluster.expr.ExprServer
+
+launch_service()
+{
+	iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+	iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@
+	return $?
+}
+
+# Start up the service
+launch_service "$classname" $@
+
+exit $?
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 7d3e42a..540201c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -168,6 +168,8 @@ public class ClusterConfig {
 
   private boolean openServerRpcPort = false;
 
+  private boolean useIndirectBroadcasting = false;
+
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
    * there is something error for getting the ip of the hostname, then set the internalIp as
@@ -183,6 +185,14 @@ public class ClusterConfig {
     seedNodeUrls = Arrays.asList(String.format("%s:%d", internalIp, internalMetaPort));
   }
 
+  public boolean isUseIndirectBroadcasting() {
+    return useIndirectBroadcasting;
+  }
+
+  public void setUseIndirectBroadcasting(boolean useIndirectBroadcasting) {
+    this.useIndirectBroadcasting = useIndirectBroadcasting;
+  }
+
   public int getSelectorNumOfClientPool() {
     return selectorNumOfClientPool;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
new file mode 100644
index 0000000..b625f2c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.client.sync.SyncClientFactory;
+import org.apache.iotdb.cluster.client.sync.SyncClientPool;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+
+public class ExprBench {
+
+  private AtomicLong requestCounter = new AtomicLong();
+  private int threadNum = 64;
+  private int workloadSize = 64 * 1024;
+  private SyncClientPool clientPool;
+  private Node target;
+  private int maxRequestNum;
+
+  public ExprBench(Node target) {
+    this.target = target;
+    SyncClientFactory factory = new FactorySync(new Factory());
+    clientPool = new SyncClientPool(factory);
+  }
+
+  public void benchmark() {
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < threadNum; i++) {
+      new Thread(
+          () -> {
+            Client client = clientPool.getClient(target);
+            ExecutNonQueryReq request = new ExecutNonQueryReq();
+            ExprPlan plan = new ExprPlan();
+            plan.setWorkload(new byte[workloadSize]);
+            plan.setNeedForward(true);
+            ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+            plan.serialize(byteBuffer);
+            byteBuffer.flip();
+            request.setPlanBytes(byteBuffer);
+            long currRequsetNum = -1;
+            while (true) {
+
+              try {
+                client.executeNonQueryPlan(request);
+                currRequsetNum = requestCounter.incrementAndGet();
+              } catch (TException e) {
+                e.printStackTrace();
+              }
+
+              if (currRequsetNum % 1000 == 0) {
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                System.out.println(String.format("%d %d %f(%f)", elapsedTime,
+                    currRequsetNum,
+                    (currRequsetNum + 0.0) / elapsedTime,
+                    currRequsetNum * workloadSize / (1024.0*1024.0)  / elapsedTime));
+              }
+
+              if (currRequsetNum >= maxRequestNum) {
+                break;
+              }
+            }
+          })
+          .start();
+    }
+  }
+
+  public void setMaxRequestNum(int maxRequestNum) {
+    this.maxRequestNum = maxRequestNum;
+  }
+
+  public static void main(String[] args) {
+    Node target = new Node();
+    target.setInternalIp(args[0]);
+    target.setMetaPort(Integer.parseInt(args[1]));
+    ExprBench bench = new ExprBench(target);
+    bench.maxRequestNum = Integer.parseInt(args[2]);
+    bench.threadNum = Integer.parseInt(args[3]);
+    bench.benchmark();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
similarity index 63%
copy from cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
index a600c4a..4b302a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
@@ -17,9 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.log;
+package org.apache.iotdb.cluster.expr;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.client.sync.SyncClientPool;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -33,26 +51,13 @@ import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.TestOnly;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
  * followers and send the logs in an ordered manner so that the followers will not wait for previous
@@ -60,27 +65,25 @@ import java.util.concurrent.atomic.AtomicLong;
  * follower A, the actual reach order may be log3, log2, and log1. According to the protocol, log3
  * and log2 must halt until log1 reaches, as a result, the total delay may increase significantly.
  */
-public class LogDispatcher {
+public class ExprLogDispatcher {
 
-  private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-  private RaftMember member;
-  private boolean useBatchInLogCatchUp =
-      ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
+  private static final Logger logger = LoggerFactory.getLogger(ExprLogDispatcher.class);
   private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
   private ExecutorService executorService;
   private static ExecutorService serializationService =
       Executors.newFixedThreadPool(
           Runtime.getRuntime().availableProcessors(),
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+  private SyncClientPool clientPool;
+  private Node leader;
 
-  public LogDispatcher(RaftMember member) {
-    this.member = member;
+  public ExprLogDispatcher(List<Node> nodes, Node leader) {
     executorService = Executors.newCachedThreadPool();
-    for (Node node : member.getAllNodes()) {
-      if (!node.equals(member.getThisNode())) {
-        nodeLogQueues.add(createQueueAndBindingThread(node));
-      }
+    for (Node node : nodes) {
+      nodeLogQueues.add(createQueueAndBindingThread(node));
     }
+    clientPool = new SyncClientPool(new FactorySync(new Factory()));
+    this.leader = leader;
   }
 
   @TestOnly
@@ -114,15 +117,9 @@ public class LogDispatcher {
           addSucceeded = nodeLogQueue.add(log);
         }
 
-        if (!addSucceeded) {
-          logger.debug(
-              "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName());
-        } else {
+        if (addSucceeded) {
           log.setEnqueueTime(System.nanoTime());
         }
-      } catch (IllegalStateException e) {
-        logger.debug(
-            "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName());
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -219,30 +216,26 @@ public class LogDispatcher {
 
   class DispatcherThread implements Runnable {
 
-    private Node receiver;
+    private Node node;
+    private Client client;
     private BlockingQueue<SendLogRequest> logBlockingDeque;
     private List<SendLogRequest> currBatch = new ArrayList<>();
-    private Peer peer;
 
-    DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
-      this.receiver = receiver;
+    DispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingDeque) {
+      this.client = clientPool.getClient(node);
       this.logBlockingDeque = logBlockingDeque;
-      this.peer =
-          member
-              .getPeerMap()
-              .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
     }
 
     @Override
     public void run() {
-      Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
+      Thread.currentThread().setName("LogDispatcher-" + node);
       try {
         while (!Thread.interrupted()) {
           SendLogRequest poll = logBlockingDeque.take();
           currBatch.add(poll);
           logBlockingDeque.drainTo(currBatch);
           if (logger.isDebugEnabled()) {
-            logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
+            logger.debug("Sending {} logs to {}", currBatch.size(), node);
           }
           for (SendLogRequest request : currBatch) {
             request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
@@ -258,59 +251,25 @@ public class LogDispatcher {
       logger.info("Dispatcher exits");
     }
 
-    private void appendEntriesAsync(
-        List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch)
-        throws TException {
-      AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
-      AsyncClient client = member.getSendLogAsyncClient(receiver);
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
-      }
-      if (client != null) {
-        client.appendEntries(request, handler);
-      }
-    }
-
     private void appendEntriesSync(
         List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch) {
 
       long startTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
-      if (!member.waitForPrevLog(peer, currBatch.get(0).getLog())) {
-        logger.warn(
-            "{}: node {} timed out when appending {}",
-            member.getName(),
-            receiver,
-            currBatch.get(0).getLog());
-        return;
-      }
       Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
 
-      Client client = member.getSyncClient(receiver);
-      if (client == null) {
-        logger.error("No available client for {}", receiver);
-        return;
-      }
-      AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
       startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
       try {
         long result = client.appendEntries(request);
         Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
         if (result != -1 && logger.isInfoEnabled()) {
           logger.info(
-              "{}: Append {} logs to {}, resp: {}",
-              member.getName(),
+              "Append {} logs to {}, resp: {}",
               logList.size(),
-              receiver,
+              node,
               result);
         }
-        handler.onComplete(result);
       } catch (TException e) {
-        client.getInputProtocol().getTransport().close();
-        handler.onError(e);
         logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
-      } finally {
-        ClientUtils.putBackSyncClient(client);
       }
     }
 
@@ -318,15 +277,8 @@ public class LogDispatcher {
         List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
       AppendEntriesRequest request = new AppendEntriesRequest();
 
-      if (member.getHeader() != null) {
-        request.setHeader(member.getHeader());
-      }
-      request.setLeader(member.getThisNode());
-      request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-
-      synchronized (member.getTerm()) {
-        request.setTerm(member.getTerm().get());
-      }
+      request.setLeader(leader);
+      request.setTerm(1);
 
       request.setEntries(logList);
       // set index for raft
@@ -339,7 +291,7 @@ public class LogDispatcher {
       return request;
     }
 
-    private void sendLogs(List<SendLogRequest> currBatch) throws TException {
+    private void sendLogs(List<SendLogRequest> currBatch) {
       int logIndex = 0;
       logger.debug(
           "send logs from index {} to {}",
@@ -362,11 +314,7 @@ public class LogDispatcher {
         }
 
         AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
-        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-          appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
-        } else {
-          appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
-        }
+        appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
         for (; prevIndex < logIndex; prevIndex++) {
           Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
               currBatch.get(prevIndex).getLog().getCreateTime());
@@ -374,84 +322,9 @@ public class LogDispatcher {
       }
     }
 
-    private void sendBatchLogs(List<SendLogRequest> currBatch) throws TException {
-      if (currBatch.size() > 1) {
-        if (useBatchInLogCatchUp) {
-          sendLogs(currBatch);
-        } else {
-          for (SendLogRequest batch : currBatch) {
-            sendLog(batch);
-          }
-        }
-      } else {
-        sendLog(currBatch.get(0));
-      }
-    }
-
-    private void sendLog(SendLogRequest logRequest) {
-      Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
-          logRequest.getLog().getCreateTime());
-      member.sendLogToFollower(
-          logRequest.getLog(),
-          logRequest.getVoteCounter(),
-          receiver,
-          logRequest.getLeaderShipStale(),
-          logRequest.getNewLeaderTerm(),
-          logRequest.getAppendEntryRequest());
-      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
-          logRequest.getLog().getCreateTime());
+    private void sendBatchLogs(List<SendLogRequest> currBatch) {
+      sendLogs(currBatch);
     }
 
-    class AppendEntriesHandler implements AsyncMethodCallback<Long> {
-
-      private final List<AsyncMethodCallback<Long>> singleEntryHandlers;
-
-      private AppendEntriesHandler(List<SendLogRequest> batch) {
-        singleEntryHandlers = new ArrayList<>(batch.size());
-        for (SendLogRequest sendLogRequest : batch) {
-          AppendNodeEntryHandler handler =
-              getAppendNodeEntryHandler(
-                  sendLogRequest.getLog(),
-                  sendLogRequest.getVoteCounter(),
-                  receiver,
-                  sendLogRequest.getLeaderShipStale(),
-                  sendLogRequest.getNewLeaderTerm(),
-                  peer);
-          singleEntryHandlers.add(handler);
-        }
-      }
-
-      @Override
-      public void onComplete(Long aLong) {
-        for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) {
-          singleEntryHandler.onComplete(aLong);
-        }
-      }
-
-      @Override
-      public void onError(Exception e) {
-        for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) {
-          singleEntryHandler.onError(e);
-        }
-      }
-
-      private AppendNodeEntryHandler getAppendNodeEntryHandler(
-          Log log,
-          AtomicInteger voteCounter,
-          Node node,
-          AtomicBoolean leaderShipStale,
-          AtomicLong newLeaderTerm,
-          Peer peer) {
-        AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
-        handler.setReceiver(node);
-        handler.setVoteCounter(voteCounter);
-        handler.setLeaderShipStale(leaderShipStale);
-        handler.setLog(log);
-        handler.setMember(member);
-        handler.setPeer(peer);
-        handler.setReceiverTerm(newLeaderTerm);
-        return handler;
-      }
-    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
new file mode 100644
index 0000000..efa63c2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+public class ExprMember extends MetaGroupMember {
+
+  public ExprMember() {
+  }
+
+  public ExprMember(Node thisNode, List<Node> allNodes) {
+    this.thisNode = thisNode;
+    this.allNodes = allNodes;
+  }
+
+  public ExprMember(TProtocolFactory factory,
+      Node thisNode, Coordinator coordinator)
+      throws QueryProcessException {
+    super(factory, thisNode, coordinator);
+  }
+
+  private int windowSize = 10000;
+  private Log[] logWindow = new Log[windowSize];
+  private long windowPrevLogIndex;
+  private long windowPrevLogTerm;
+  private long windowTerm;
+
+  @Override
+  protected synchronized void startSubServers() {
+
+  }
+
+  @Override
+  public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+    if (false) {
+      if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) {
+        return StatusUtils.OK;
+      } else if (plan instanceof ExprPlan) {
+        ((ExprPlan) plan).setNeedForward(false);
+      }
+
+      ExecutNonQueryReq req = new ExecutNonQueryReq();
+      ByteBuffer byteBuffer = ByteBuffer.allocate(128 * 1024);
+      plan.serialize(byteBuffer);
+      byteBuffer.flip();
+      req.setPlanBytes(byteBuffer);
+
+      for (Node node : getAllNodes()) {
+        if (!ClusterUtils.isNodeEquals(node, thisNode)) {
+          Client syncClient = getSyncClient(node);
+          try {
+            syncClient.executeNonQueryPlan(req);
+          } catch (TException e) {
+            ClientUtils.putBackSyncClient(syncClient);
+            return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
+          }
+          ClientUtils.putBackSyncClient(syncClient);
+        }
+      }
+      return StatusUtils.OK;
+    }
+    return processNonPartitionedMetaPlan(plan);
+  }
+
+  protected long appendEntry1(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+    long resp;
+    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+    long success = 0;
+
+    synchronized (logManager) {
+      long windowPos = log.getCurrLogIndex() - logManager.getLastLogIndex() - 1;
+      if (windowPos < 0) {
+        success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+      } else if (windowPos < windowSize) {
+        logWindow[(int) windowPos] = log;
+        if (windowPos == 0) {
+          windowPrevLogIndex = prevLogIndex;
+          windowPrevLogTerm = prevLogTerm;
+
+          int flushPos = 0;
+          for (; flushPos < windowSize; flushPos++) {
+            if (logWindow[flushPos] == null) {
+              break;
+            }
+          }
+          // flush [0, flushPos)
+          List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+          success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit,
+              logs);
+          if (success != -1) {
+            System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - flushPos);
+            for (int i = 1; i <= flushPos; i++) {
+              logWindow[windowSize - i] = null;
+            }
+          } else {
+            System.out.println("not success");
+          }
+        }
+      } else {
+        return Response.RESPONSE_LOG_MISMATCH;
+      }
+    }
+
+    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+    if (success != -1) {
+      resp = Response.RESPONSE_AGREE;
+    } else {
+      // the incoming log points to an illegal position, reject it
+      resp = Response.RESPONSE_LOG_MISMATCH;
+    }
+    return resp;
+  }
+
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
new file mode 100644
index 0000000..338755e
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import io.moquette.broker.config.IConfig;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.LogDispatcher;
+import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.tsfile.read.filter.operator.In;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class ExprServer extends MetaClusterServer {
+
+  protected ExprServer() throws QueryProcessException {
+    super();
+  }
+
+  @Override
+  protected MetaGroupMember createMetaGroupMember() throws QueryProcessException {
+    return new ExprMember(new Factory(), thisNode, coordinator);
+  }
+
+  @Override
+  protected String getClientThreadPrefix() {
+    return "FollowerClient";
+  }
+
+  @Override
+  protected String getServerClientName() {
+    return "FollowerServer";
+  }
+
+  @Override
+  protected TServerTransport getServerSocket() throws TTransportException {
+    return new TServerSocket(
+        new InetSocketAddress(thisNode.getInternalIp(), thisNode.getMetaPort()));
+  }
+
+
+  public static void main(String[] args)
+      throws StartupException, TTransportException, QueryProcessException, ConfigInconsistentException, StartUpCheckFailureException {
+
+    String[] nodeStrings = args[0].split(":");
+    String ip = nodeStrings[0];
+    int port = Integer.parseInt(nodeStrings[1]);
+    String[] allNodeStr = args[1].split(",");
+
+    int dispatcherThreadNum = Integer.parseInt(args[2]);
+
+    ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
+    ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
+    ClusterDescriptor.getInstance().getConfig().setInternalIp(ip);
+    ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(false);
+    ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
+    RaftMember.USE_LOG_DISPATCHER = true;
+    LogDispatcher.bindingThreadNum = dispatcherThreadNum;
+
+    ExprServer server = new ExprServer();
+    server.start();
+    server.buildCluster();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
new file mode 100644
index 0000000..75f1b16
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import org.apache.iotdb.cluster.log.LogDispatcher.DispatcherThread;
+import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+
+/**
+ * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
+ * followers and let the selected followers to broadcast the log to other followers.
+ */
+public class IndirectLogDispatcher extends LogDispatcher {
+
+  private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
+
+  public IndirectLogDispatcher(RaftMember member) {
+    super(member);
+    recalculateDirectFollowerMap();
+  }
+
+  @Override
+  LogDispatcher.DispatcherThread newDispatcherThread(Node node,
+      BlockingQueue<SendLogRequest> logBlockingQueue) {
+    return new DispatcherThread(node, logBlockingQueue);
+  }
+
+  @Override
+  void createQueueAndBindingThreads() {
+    for (Node node : directToIndirectFollowerMap.keySet()) {
+      nodeLogQueues.add(createQueueAndBindingThread(node));
+    }
+  }
+
+  public void recalculateDirectFollowerMap() {
+    List<Node> allNodes = new ArrayList<>(member.getAllNodes());
+    allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
+    QueryCoordinator instance = QueryCoordinator.getINSTANCE();
+    List<Node> orderedNodes = instance.reorderNodes(allNodes);
+
+    synchronized (this) {
+      directToIndirectFollowerMap.clear();
+      for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
+        if (i != j) {
+          directToIndirectFollowerMap.put(orderedNodes.get(i),
+              Collections.singletonList(orderedNodes.get(j)));
+        } else {
+          directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
+        }
+      }
+    }
+  }
+
+  class DispatcherThread extends LogDispatcher.DispatcherThread {
+
+    DispatcherThread(Node receiver,
+        BlockingQueue<SendLogRequest> logBlockingDeque) {
+      super(receiver, logBlockingDeque);
+    }
+
+    @Override
+    void sendLog(SendLogRequest logRequest) {
+      Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+          logRequest.getLog().getCreateTime());
+      member.sendLogToFollower(
+          logRequest.getLog(),
+          logRequest.getVoteCounter(),
+          receiver,
+          logRequest.getLeaderShipStale(),
+          logRequest.getNewLeaderTerm(),
+          logRequest.getAppendEntryRequest(), directToIndirectFollowerMap.get(receiver));
+      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
+          logRequest.getLog().getCreateTime());
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a600c4a..711b511 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.log;
 
+import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -63,19 +64,24 @@ import java.util.concurrent.atomic.AtomicLong;
 public class LogDispatcher {
 
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-  private RaftMember member;
-  private boolean useBatchInLogCatchUp =
-      ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
-  private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
+  RaftMember member;
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+  private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
+  List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
   private ExecutorService executorService;
   private static ExecutorService serializationService =
       Executors.newFixedThreadPool(
           Runtime.getRuntime().availableProcessors(),
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+  public static int bindingThreadNum = 1;
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
     executorService = Executors.newCachedThreadPool();
+    createQueueAndBindingThreads();
+  }
+
+  void createQueueAndBindingThreads() {
     for (Node node : member.getAllNodes()) {
       if (!node.equals(member.getThisNode())) {
         nodeLogQueues.add(createQueueAndBindingThread(node));
@@ -129,17 +135,20 @@ public class LogDispatcher {
     }
   }
 
-  private BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
+  BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
     BlockingQueue<SendLogRequest> logBlockingQueue =
         new ArrayBlockingQueue<>(
             ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
-    int bindingThreadNum = 1;
     for (int i = 0; i < bindingThreadNum; i++) {
-      executorService.submit(new DispatcherThread(node, logBlockingQueue));
+      executorService.submit(newDispatcherThread(node, logBlockingQueue));
     }
     return logBlockingQueue;
   }
 
+  DispatcherThread newDispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingQueue) {
+    return new DispatcherThread(node, logBlockingQueue);
+  }
+
   public static class SendLogRequest {
 
     private Log log;
@@ -219,7 +228,7 @@ public class LogDispatcher {
 
   class DispatcherThread implements Runnable {
 
-    private Node receiver;
+    Node receiver;
     private BlockingQueue<SendLogRequest> logBlockingDeque;
     private List<SendLogRequest> currBatch = new ArrayList<>();
     private Peer peer;
@@ -238,9 +247,11 @@ public class LogDispatcher {
       Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
       try {
         while (!Thread.interrupted()) {
-          SendLogRequest poll = logBlockingDeque.take();
-          currBatch.add(poll);
-          logBlockingDeque.drainTo(currBatch);
+          synchronized (logBlockingDeque) {
+            SendLogRequest poll = logBlockingDeque.take();
+            currBatch.add(poll);
+            logBlockingDeque.drainTo(currBatch);
+          }
           if (logger.isDebugEnabled()) {
             logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
           }
@@ -388,7 +399,7 @@ public class LogDispatcher {
       }
     }
 
-    private void sendLog(SendLogRequest logRequest) {
+    void sendLog(SendLogRequest logRequest) {
       Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
           logRequest.getLog().getCreateTime());
       member.sendLogToFollower(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 446eefc..eae40e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -420,6 +420,25 @@ public abstract class RaftLogManager {
     return -1;
   }
 
+  public long append(long lastIndex, long lastTerm, long leaderCommit, Log entry) {
+    long newLastIndex = lastIndex + 1;
+    if (entry.getCurrLogIndex() <= commitIndex) {
+      logger.debug(
+          "{}: entry {} conflict with committed entry [commitIndex({})]",
+          name,
+          entry.getCurrLogIndex(),
+          commitIndex);
+    } else {
+      append(entry);
+    }
+    try {
+      commitTo(Math.min(leaderCommit, newLastIndex));
+    } catch (LogExecutionException e) {
+      // exceptions are ignored on follower side
+    }
+    return newLastIndex;
+  }
+
   /**
    * Used by leader node or MaybeAppend to directly append to unCommittedEntryManager. Note that the
    * caller should ensure entries[0].index > committed.
@@ -466,6 +485,20 @@ public abstract class RaftLogManager {
     return getLastLogIndex();
   }
 
+  public long appendDirectly(Log entry) {
+    long after = entry.getCurrLogIndex();
+    if (after <= commitIndex) {
+      logger.error("{}: after({}) is out of range [commitIndex({})]", name, after, commitIndex);
+      return -1;
+    }
+    getUnCommittedEntryManager().truncateAndAppend(entry);
+    Object logUpdateCondition = getLogUpdateCondition(entry.getCurrLogIndex());
+    synchronized (logUpdateCondition) {
+      logUpdateCondition.notifyAll();
+    }
+    return getLastLogIndex();
+  }
+
   /**
    * Used by leader node to try to commit entries.
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index a3026c9..c85e709 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -185,10 +185,10 @@ public class UnCommittedEntryManager {
    * @param appendingEntry request entry
    */
   void truncateAndAppend(Log appendingEntry) {
-    if (maybeTerm(appendingEntry.getCurrLogIndex()) == appendingEntry.getCurrLogTerm()) {
-      // skip existing entry
-      return;
-    }
+//    if (maybeTerm(appendingEntry.getCurrLogIndex()) == appendingEntry.getCurrLogTerm()) {
+//      // skip existing entry
+//      return;
+//    }
 
     long after = appendingEntry.getCurrLogIndex();
     long len = after - offset;
@@ -203,8 +203,8 @@ public class UnCommittedEntryManager {
     } else {
       // clear conflict entries
       // then append
-      logger.info(
-          "truncate the entries after index {}, append a new entry {}", after, appendingEntry);
+//      logger.info(
+//          "truncate the entries after index {}, append a new entry {}", after, appendingEntry);
       int truncateIndex = (int) (after - offset);
       if (truncateIndex < entries.size()) {
         entries.subList(truncateIndex, entries.size()).clear();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index c81e6d0..a945a92 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -431,7 +431,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   @Override
   public void setHardStateAndFlush(HardState state) {
     this.state = state;
-    serializeMeta(meta);
+    // serializeMeta(meta);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e4c81f8..d88d1a3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -521,7 +521,7 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  TProcessor getProcessor() {
+  protected TProcessor getProcessor() {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       return new AsyncProcessor<>(this);
     } else {
@@ -530,7 +530,7 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  TServerTransport getServerSocket() throws TTransportException {
+  protected TServerTransport getServerSocket() throws TTransportException {
     logger.info(
         "[{}] Cluster node will listen {}:{}",
         getServerClientName(),
@@ -547,12 +547,12 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  String getClientThreadPrefix() {
+  protected String getClientThreadPrefix() {
     return "DataClientThread-";
   }
 
   @Override
-  String getServerClientName() {
+  protected String getServerClientName() {
     return "DataServerThread-";
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 12e286f..ddc64cc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -74,27 +74,31 @@ public class MetaClusterServer extends RaftServer
   private static Logger logger = LoggerFactory.getLogger(MetaClusterServer.class);
 
   // each node only contains one MetaGroupMember
-  private MetaGroupMember member;
-  private Coordinator coordinator;
+  protected MetaGroupMember member;
+  protected Coordinator coordinator;
   // the single-node IoTDB instance
   private IoTDB ioTDB;
   // to register the ClusterMonitor that helps monitoring the cluster
   private RegisterManager registerManager = new RegisterManager();
-  private MetaAsyncService asyncService;
-  private MetaSyncService syncService;
-  private MetaHeartbeatServer metaHeartbeatServer;
+  protected MetaAsyncService asyncService;
+  protected MetaSyncService syncService;
+  protected MetaHeartbeatServer metaHeartbeatServer;
 
   public MetaClusterServer() throws QueryProcessException {
     super();
     metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this);
     coordinator = new Coordinator();
-    member = new MetaGroupMember(protocolFactory, thisNode, coordinator);
+    member = createMetaGroupMember();
     coordinator.setMetaGroupMember(member);
     asyncService = new MetaAsyncService(member);
     syncService = new MetaSyncService(member);
     MetaPuller.getInstance().init(member);
   }
 
+  protected MetaGroupMember createMetaGroupMember() throws QueryProcessException {
+    return new MetaGroupMember(protocolFactory, thisNode, coordinator);
+  }
+
   /**
    * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the
    * ClusterMonitor are also started.
@@ -150,7 +154,7 @@ public class MetaClusterServer extends RaftServer
    * @throws TTransportException if create the socket fails
    */
   @Override
-  TServerTransport getServerSocket() throws TTransportException {
+  protected TServerTransport getServerSocket() throws TTransportException {
     logger.info(
         "[{}] Cluster node will listen {}:{}",
         getServerClientName(),
@@ -167,17 +171,17 @@ public class MetaClusterServer extends RaftServer
   }
 
   @Override
-  String getClientThreadPrefix() {
+  protected String getClientThreadPrefix() {
     return "MetaClientThread-";
   }
 
   @Override
-  String getServerClientName() {
+  protected String getServerClientName() {
     return "MetaServerThread-";
   }
 
   @Override
-  TProcessor getProcessor() {
+  protected TProcessor getProcessor() {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       return new AsyncProcessor<>(this);
     } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 09956d2..405c6e2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -71,9 +71,9 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
   private TServerTransport socket;
   // RPC processing server
   private TServer poolServer;
-  Node thisNode;
+  protected Node thisNode;
 
-  TProtocolFactory protocolFactory =
+  protected TProtocolFactory protocolFactory =
       config.isRpcThriftCompressionEnabled()
           ? new TCompactProtocol.Factory()
           : new TBinaryProtocol.Factory();
@@ -92,7 +92,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
     thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
   }
 
-  RaftServer(Node thisNode) {
+  protected RaftServer(Node thisNode) {
     this.thisNode = thisNode;
   }
 
@@ -167,14 +167,14 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
    * @return An AsyncProcessor that contains the extended interfaces of a non-abstract subclass of
    *     RaftService (DataService or MetaService).
    */
-  abstract TProcessor getProcessor();
+  protected abstract TProcessor getProcessor();
 
   /**
    * @return A socket that will be used to establish a thrift server to listen to RPC requests.
    *     DataServer and MetaServer use different port, so this is to be determined.
    * @throws TTransportException
    */
-  abstract TServerTransport getServerSocket() throws TTransportException;
+  protected abstract TServerTransport getServerSocket() throws TTransportException;
 
   /**
    * Each thrift RPC request will be processed in a separate thread and this will return the name
@@ -183,7 +183,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
    *
    * @return name prefix of RPC processing threads.
    */
-  abstract String getClientThreadPrefix();
+  protected abstract String getClientThreadPrefix();
 
   /**
    * The thrift server will be run in a separate thread, and this will be its name. It help you
@@ -191,7 +191,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
    *
    * @return The name of the thread running the thrift server.
    */
-  abstract String getServerClientName();
+  protected abstract String getServerClientName();
 
   private TServer createAsyncServer() throws TTransportException {
     socket = getServerSocket();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index de0b83f..6d95e06 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -111,7 +111,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
       if (lastLogIdx == peer.getLastHeartBeatIndex()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peer.incInconsistentHeartbeatNum();
-        if (inconsistentNum >= 5) {
+        if (inconsistentNum >= 10000) {
           logger.info(
               "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
               memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
new file mode 100644
index 0000000..71903d4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.forwarder;
+
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndirectAppendHandler implements AsyncMethodCallback<Long> {
+
+  private static final Logger logger = LoggerFactory.getLogger(IndirectAppendHandler.class);
+  private Node receiver;
+  private AppendEntryRequest request;
+
+  public IndirectAppendHandler(Node receiver,
+      AppendEntryRequest request) {
+    this.receiver = receiver;
+    this.request = request;
+  }
+
+  @Override
+  public void onComplete(Long response) {
+    // ignore response from indirect appender
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    logger.warn("Cannot send request {} to {}", request, receiver, exception);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 0459fef..50a46bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 
+import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -152,7 +153,7 @@ public class HeartbeatThread implements Runnable {
     synchronized (nodes) {
       // avoid concurrent modification
       for (Node node : nodes) {
-        if (node.equals(localMember.getThisNode())) {
+        if (ClusterUtils.isNodeEquals(node, localMember.getThisNode())) {
           continue;
         }
         if (Thread.currentThread().isInterrupted()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index ed99c3d..e744fdb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@ -74,8 +74,8 @@ public class MetaHeartbeatServer extends HeartbeatServer {
     } else {
       return new TServerSocket(
           new InetSocketAddress(
-              config.getInternalIp(),
-              config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
+              metaClusterServer.getMember().getThisNode().getInternalIp(),
+              metaClusterServer.getMember().getThisNode().getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 002c4fe..2c5cf35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -40,7 +40,15 @@ public class MetaHeartbeatThread extends HeartbeatThread {
     request.setRequireIdentifier(!node.isSetNodeIdentifier());
     synchronized (localMetaMember.getIdConflictNodes()) {
       request.unsetRegenerateIdentifier();
-      if (localMetaMember.getIdConflictNodes().contains(node)) {
+      boolean hasIdConflict = false;
+      for (Node idConflictNode : localMetaMember.getIdConflictNodes()) {
+        if (idConflictNode.getInternalIp().equals(node.internalIp)
+            && idConflictNode.getMetaPort() == node.metaPort) {
+          hasIdConflict = true;
+          break;
+        }
+      }
+      if (hasIdConflict) {
         request.setRegenerateIdentifier(true);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index eb10d44..056a485 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import java.util.Comparator;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -1021,21 +1022,26 @@ public class MetaGroupMember extends RaftMember {
     ExecutorService pool = new ScheduledThreadPoolExecutor(getAllNodes().size() - 1);
     for (Node seedNode : getAllNodes()) {
       Node thisNode = getThisNode();
-      if (seedNode.equals(thisNode)) {
+      if (ClusterUtils.isNodeEquals(thisNode, seedNode)) {
         continue;
       }
       pool.submit(
           () -> {
-            CheckStatusResponse response = checkStatus(seedNode);
-            if (response != null) {
-              // check the response
-              ClusterUtils.examineCheckStatusResponse(
-                  response, consistentNum, inconsistentNum, seedNode);
-            } else {
-              logger.warn(
-                  "Start up exception. Cannot connect to node {}. Try again in next turn.",
-                  seedNode);
+            try {
+              CheckStatusResponse response = checkStatus(seedNode);
+              if (response != null) {
+                // check the response
+                ClusterUtils.examineCheckStatusResponse(
+                    response, consistentNum, inconsistentNum, seedNode);
+              } else {
+                logger.warn(
+                    "Start up exception. Cannot connect to node {}. Try again in next turn.",
+                    seedNode);
+              }
+            } catch (Exception e) {
+              logger.error("Start up exception:", e);
             }
+
           });
     }
     pool.shutdown();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f2187f9..daa3f4e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import java.util.Collections;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
 import org.apache.iotdb.cluster.client.sync.SyncClientPool;
@@ -39,6 +40,7 @@ import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -53,6 +55,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -77,7 +80,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
+import org.checkerframework.checker.units.qual.A;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,12 +112,14 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so
+ * on.
  */
 @SuppressWarnings("java:S3077") // reference volatile is enough
 public abstract class RaftMember {
+
   private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
-  public static final boolean USE_LOG_DISPATCHER = false;
+  public static boolean USE_LOG_DISPATCHER = false;
 
   private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
   private static final String MSG_FORWARD_ERROR =
@@ -132,19 +139,29 @@ public abstract class RaftMember {
    * on this may be woken.
    */
   private final Object waitLeaderCondition = new Object();
-  /** the lock is to make sure that only one thread can apply snapshot at the same time */
+  /**
+   * the lock is to make sure that only one thread can apply snapshot at the same time
+   */
   private final Object snapshotApplyLock = new Object();
 
   protected Node thisNode = ClusterConstant.EMPTY_NODE;
-  /** the nodes that belong to the same raft group as thisNode. */
+  /**
+   * the nodes that belong to the same raft group as thisNode.
+   */
   protected List<Node> allNodes;
 
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-  /** the name of the member, to distinguish several members in the logs. */
+  /**
+   * the name of the member, to distinguish several members in the logs.
+   */
   String name;
-  /** to choose nodes to send request of joining cluster randomly. */
+  /**
+   * to choose nodes to send request of joining cluster randomly.
+   */
   Random random = new Random();
-  /** when the node is a leader, this map is used to track log progress of each follower. */
+  /**
+   * when the node is a leader, this map is used to track log progress of each follower.
+   */
   Map<Node, Peer> peerMap;
   /**
    * the current term of the node, this object also works as lock of some transactions of the member
@@ -165,8 +182,10 @@ public abstract class RaftMember {
    * offline.
    */
   volatile long lastHeartbeatReceivedTime;
-  /** the raft logs are all stored and maintained in the log manager */
-  RaftLogManager logManager;
+  /**
+   * the raft logs are all stored and maintained in the log manager
+   */
+  protected RaftLogManager logManager;
   /**
    * the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
    * when this node is a leader, or start elections when this node is an elector.
@@ -183,7 +202,9 @@ public abstract class RaftMember {
    * member by comparing it with the current last log index.
    */
   long lastReportedLogIndex;
-  /** the thread pool that runs catch-up tasks */
+  /**
+   * the thread pool that runs catch-up tasks
+   */
   private ExecutorService catchUpService;
   /**
    * lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -219,20 +240,30 @@ public abstract class RaftMember {
    * one slow node.
    */
   private ExecutorService serialToParallelPool;
-  /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
+  /**
+   * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+   */
   private ExecutorService commitLogPool;
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
-   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
+   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
+   * logs.
    */
   private LogDispatcher logDispatcher;
 
   /**
-   * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+   * localExecutor is used to directly execute plans like load configuration in the underlying
+   * IoTDB
    */
   protected PlanExecutor localExecutor;
 
-  protected RaftMember() {}
+  /**
+   * (logIndex, logTerm) -> append handler
+   */
+  protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers = new ConcurrentHashMap<>();
+
+  protected RaftMember() {
+  }
 
   protected RaftMember(
       String name,
@@ -527,10 +558,68 @@ public abstract class RaftMember {
     long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
     logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result);
 
+    if (!request.isFromLeader) {
+      appendAckLeader(request.leader, log, result);
+    }
+
     return result;
   }
 
-  /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+  private void appendAckLeader(Node leader, Log log, long response) {
+    AppendEntryAcknowledgement appendEntryAcknowledgement = new AppendEntryAcknowledgement();
+    appendEntryAcknowledgement.index = log.getCurrLogIndex();
+    appendEntryAcknowledgement.term = log.getCurrLogTerm();
+    appendEntryAcknowledgement.response = response;
+
+    Client syncClient = null;
+    try {
+      if (config.isUseAsyncServer()) {
+        GenericHandler<Void> handler = new GenericHandler<>(leader, null);
+        getAsyncClient(leader).acknowledgeAppendEntry(appendEntryAcknowledgement, handler);
+      } else {
+        syncClient = getSyncClient(leader);
+        syncClient.acknowledgeAppendEntry(appendEntryAcknowledgement);
+      }
+    } catch (TException e) {
+      logger.warn("Cannot send ack of {} to leader {}", log, leader, e);
+    } finally {
+      if (syncClient != null) {
+        ClientUtils.putBackSyncClient(syncClient);
+      }
+    }
+  }
+
+  public long appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers) throws UnknownLogTypeException {
+    long result = appendEntry(request);
+    appendLogThreadPool.submit(() -> sendLogToSubFollowers(request, subFollowers));
+    return result;
+  }
+
+  private void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
+    request.setIsFromLeader(false);
+    for (Node subFollower : subFollowers) {
+      Client syncClient = null;
+      try {
+        if (config.isUseAsyncServer()) {
+          getAsyncClient(subFollower).appendEntry(request, new IndirectAppendHandler(subFollower,
+              request));
+        } else {
+          syncClient = getSyncClient(subFollower);
+          syncClient.appendEntry(request);
+        }
+      } catch (TException e) {
+        logger.error("Cannot send {} to {}", request, subFollower, e);
+      } finally {
+        if (syncClient != null) {
+          ClientUtils.putBackSyncClient(syncClient);
+        }
+      }
+    }
+  }
+
+  /**
+   * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
+   */
   public long appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntriesRequest", name);
 
@@ -603,13 +692,17 @@ public abstract class RaftMember {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      Peer peer) {
+      Peer peer, List<Node> indirectReceivers) {
     AsyncClient client = getSendLogAsyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
           getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
       try {
-        client.appendEntry(request, handler);
+        if (indirectReceivers == null || indirectReceivers.isEmpty()) {
+          client.appendEntry(request, handler);
+        } else {
+          client.appendEntryIndirect(request, indirectReceivers, handler);
+        }
         logger.debug("{} sending a log to {}: {}", name, node, log);
       } catch (Exception e) {
         logger.warn("{} cannot append log to node {}", name, node, e);
@@ -680,16 +773,22 @@ public abstract class RaftMember {
     return lastCatchUpResponseTime;
   }
 
-  /** Sub-classes will add their own process of HeartBeatResponse in this method. */
-  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
+  /**
+   * Sub-classes will add their own process of HeartBeatResponse in this method.
+   */
+  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
+  }
 
-  /** The actions performed when the node wins in an election (becoming a leader). */
-  public void onElectionWins() {}
+  /**
+   * The actions performed when the node wins in an election (becoming a leader).
+   */
+  public void onElectionWins() {
+  }
 
   /**
    * Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
-   * follower. If some of the required logs are removed, also send the snapshot. <br>
-   * notice that if a part of data is in the snapshot, then it is not in the logs.
+   * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
+   * a part of data is in the snapshot, then it is not in the logs.
    */
   public void catchUp(Node follower, long lastLogIdx) {
     // for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -744,7 +843,7 @@ public abstract class RaftMember {
    * @param plan a non-query plan.
    * @return A TSStatus indicating the execution result.
    */
-  abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
+  protected abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
 
   /**
    * according to the consistency configuration, decide whether to execute syncLeader or not and
@@ -775,7 +874,9 @@ public abstract class RaftMember {
     }
   }
 
-  /** call back after syncLeader */
+  /**
+   * call back after syncLeader
+   */
   public interface CheckConsistency {
 
     /**
@@ -784,7 +885,7 @@ public abstract class RaftMember {
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
      * @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
-     *     implements.
+     *                                   implements.
      */
     void postCheckConsistency(long leaderCommitId, long localAppliedId)
         throws CheckConsistencyException;
@@ -793,8 +894,7 @@ public abstract class RaftMember {
   public static class MidCheckConsistency implements CheckConsistency {
 
     /**
-     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
-     * CHECK_MID_CONSISTENCY_EXCEPTION
+     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw CHECK_MID_CONSISTENCY_EXCEPTION
      *
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
@@ -806,7 +906,7 @@ public abstract class RaftMember {
       if (leaderCommitId == Long.MAX_VALUE
           || leaderCommitId == Long.MIN_VALUE
           || leaderCommitId - localAppliedId
-              > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+          > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
         throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
       }
     }
@@ -839,7 +939,7 @@ public abstract class RaftMember {
    * @param checkConsistency check after syncleader
    * @return true if the node has caught up, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *     value after timeout
+   *                                   value after timeout
    */
   public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
     if (character == NodeCharacter.LEADER) {
@@ -858,7 +958,9 @@ public abstract class RaftMember {
     return waitUntilCatchUp(checkConsistency);
   }
 
-  /** Wait until the leader of this node becomes known or time out. */
+  /**
+   * Wait until the leader of this node becomes known or time out.
+   */
   public void waitLeader() {
     long startTime = System.currentTimeMillis();
     while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -885,7 +987,7 @@ public abstract class RaftMember {
    *
    * @return true if this node has caught up before timeout, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *     value after timeout
+   *                                   value after timeout
    */
   protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
       throws CheckConsistencyException {
@@ -957,7 +1059,7 @@ public abstract class RaftMember {
    * call this method. Will commit the log locally and send it to followers
    *
    * @return OK if over half of the followers accept the log or null if the leadership is lost
-   *     during the appending
+   * during the appending
    */
   TSStatus processPlanLocally(PhysicalPlan plan) {
     if (USE_LOG_DISPATCHER) {
@@ -1156,7 +1258,9 @@ public abstract class RaftMember {
     return peerMap;
   }
 
-  /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
+  /**
+   * @return true if there is a log whose index is "index" and term is "term", false otherwise
+   */
   public boolean matchLog(long index, long term) {
     boolean matched = logManager.matchTerm(term, index);
     logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1171,15 +1275,18 @@ public abstract class RaftMember {
     return syncLock;
   }
 
-  /** Sub-classes will add their own process of HeartBeatRequest in this method. */
-  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
+  /**
+   * Sub-classes will add their own process of HeartBeatRequest in this method.
+   */
+  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
+  }
 
   /**
    * Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
    * request is valid.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkElectorLogProgress(ElectionRequest electionRequest) {
 
@@ -1222,7 +1329,7 @@ public abstract class RaftMember {
    * lastLogIndex is smaller than the voter's Otherwise accept the election.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkLogProgress(long lastLogIndex, long lastLogTerm) {
     long response;
@@ -1239,10 +1346,10 @@ public abstract class RaftMember {
   /**
    * Forward a non-query plan to a node using the default client.
    *
-   * @param plan a non-query plan
-   * @param node cannot be the local node
+   * @param plan   a non-query plan
+   * @param node   cannot be the local node
    * @param header must be set for data group communication, set to null for meta group
-   *     communication
+   *               communication
    * @return a TSStatus indicating if the forwarding is successful.
    */
   public TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) {
@@ -1273,7 +1380,7 @@ public abstract class RaftMember {
   /**
    * Forward a non-query plan to "receiver" using "client".
    *
-   * @param plan a non-query plan
+   * @param plan   a non-query plan
    * @param header to determine which DataGroupMember of "receiver" will process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
@@ -1354,7 +1461,7 @@ public abstract class RaftMember {
    * Get an asynchronous thrift client of the given node.
    *
    * @return an asynchronous thrift client or null if the caller tries to connect the local node or
-   *     the node cannot be reached.
+   * the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
     return getAsyncClient(node, asyncClientPool, true);
@@ -1541,7 +1648,7 @@ public abstract class RaftMember {
    * heartbeat timer.
    *
    * @param fromLeader true if the request is from a leader, false if the request is from an
-   *     elector.
+   *                   elector.
    */
   public void stepDown(long newTerm, boolean fromLeader) {
     synchronized (term) {
@@ -1572,7 +1679,9 @@ public abstract class RaftMember {
     this.thisNode = thisNode;
   }
 
-  /** @return the header of the data raft group or null if this is in a meta group. */
+  /**
+   * @return the header of the data raft group or null if this is in a meta group.
+   */
   public Node getHeader() {
     return null;
   }
@@ -1653,7 +1762,7 @@ public abstract class RaftMember {
    * success.
    *
    * @param requiredQuorum the number of votes needed to make the log valid, when requiredQuorum <=
-   *     0, half of the cluster size will be used.
+   *                       0, half of the cluster size will be used.
    * @return an AppendLogResult
    */
   private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
@@ -1722,7 +1831,6 @@ public abstract class RaftMember {
     return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
   }
 
-  /** Send "log" to "node". */
   public void sendLogToFollower(
       Log log,
       AtomicInteger voteCounter,
@@ -1730,6 +1838,21 @@ public abstract class RaftMember {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request) {
+    sendLogToFollower(log, voteCounter, node, leaderShipStale, newLeaderTerm, request,
+        Collections.emptyList());
+  }
+
+  /**
+   * Send "log" to "node".
+   */
+  public void sendLogToFollower(
+      Log log,
+      AtomicInteger voteCounter,
+      Node node,
+      AtomicBoolean leaderShipStale,
+      AtomicLong newLeaderTerm,
+      AppendEntryRequest request,
+      List<Node> indirectReceivers) {
     if (node.equals(thisNode)) {
       return;
     }
@@ -1750,9 +1873,9 @@ public abstract class RaftMember {
     }
 
     if (config.isUseAsyncServer()) {
-      sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
+      sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers);
     } else {
-      sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
+      sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers);
     }
   }
 
@@ -1791,14 +1914,20 @@ public abstract class RaftMember {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      Peer peer) {
+      Peer peer, List<Node> indirectReceivers) {
     Client client = getSyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
           getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
       try {
         logger.debug("{} sending a log to {}: {}", name, node, log);
-        long result = client.appendEntry(request);
+        long result;
+        if (indirectReceivers == null || indirectReceivers.isEmpty()) {
+          result = client.appendEntry(request);
+        } else {
+          result = client.appendEntryIndirect(request, indirectReceivers);
+        }
+
         handler.onComplete(result);
       } catch (TException e) {
         client.getInputProtocol().getTransport().close();
@@ -1843,9 +1972,9 @@ public abstract class RaftMember {
    * and append "log" to it. Otherwise report a log mismatch.
    *
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  private long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+  protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
     long resp = checkPrevLogIndex(prevLogIndex);
     if (resp != Response.RESPONSE_AGREE) {
       return resp;
@@ -1867,7 +1996,9 @@ public abstract class RaftMember {
     return resp;
   }
 
-  /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+  /**
+   * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+   */
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
@@ -1893,7 +2024,7 @@ public abstract class RaftMember {
     return alreadyWait <= RaftServer.getWriteOperationTimeoutMS();
   }
 
-  private long checkPrevLogIndex(long prevLogIndex) {
+  protected long checkPrevLogIndex(long prevLogIndex) {
     long lastLogIndex = logManager.getLastLogIndex();
     long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
     if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
@@ -1913,7 +2044,7 @@ public abstract class RaftMember {
    *
    * @param logs append logs
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   private long appendEntries(
       long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
@@ -1987,4 +2118,25 @@ public abstract class RaftMember {
     TIME_OUT,
     LEADERSHIP_STALE
   }
+
+  /**
+   * Process the result from an indirect receiver of an entry.
+   * @param ack acknowledgement from an indirect receiver.
+   */
+  public void acknowledgeAppendLog(AppendEntryAcknowledgement ack) {
+    AppendNodeEntryHandler appendNodeEntryHandler = sentLogHandlers
+        .get(new Pair<>(ack.index, ack.term));
+    if (appendNodeEntryHandler != null) {
+      appendNodeEntryHandler.onComplete(ack.response);
+    }
+  }
+
+  public void registerAppendLogHandler(Pair<Long, Long> indexTerm,
+      AppendNodeEntryHandler appendNodeEntryHandler) {
+    sentLogHandlers.put(indexTerm, appendNodeEntryHandler);
+  }
+
+  public void removeAppendLogHandler(Pair<Long, Long> indexTerm) {
+    sentLogHandlers.remove(indexTerm);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 8673078..eecf26e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.cluster.server.service;
 
+import java.util.List;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -173,4 +175,21 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
       resultHandler.onError(e);
     }
   }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+      AsyncMethodCallback<Void> resultHandler) {
+    member.acknowledgeAppendLog(ack);
+    resultHandler.onComplete(null);
+  }
+
+  @Override
+  public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
+      AsyncMethodCallback<Long> resultHandler) {
+    try {
+      resultHandler.onComplete(member.appendEntryIndirect(request, subReceivers));
+    } catch (UnknownLogTypeException e) {
+      resultHandler.onError(e);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index ce200ab..181ad8a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -19,9 +19,16 @@
 
 package org.apache.iotdb.cluster.server.service;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.List;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -31,30 +38,21 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
-import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
-import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-
 public abstract class BaseSyncService implements RaftService.Iface {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseSyncService.class);
   RaftMember member;
   String name;
 
-  BaseSyncService(RaftMember member) {
+  protected BaseSyncService(RaftMember member) {
     this.member = member;
     this.name = member.getName();
   }
@@ -153,29 +151,25 @@ public abstract class BaseSyncService implements RaftService.Iface {
 
   @Override
   public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
-    if (member.getCharacter() != NodeCharacter.LEADER) {
-      // forward the plan to the leader
-      Client client = member.getSyncClient(member.getLeader());
-      if (client != null) {
-        TSStatus status;
-        try {
-          status = client.executeNonQueryPlan(request);
-        } catch (TException e) {
-          client.getInputProtocol().getTransport().close();
-          throw e;
-        } finally {
-          ClientUtils.putBackSyncClient(client);
-        }
-        return status;
-      } else {
-        return StatusUtils.NO_LEADER;
-      }
-    }
-
     try {
       return member.executeNonQueryPlan(request);
     } catch (Exception e) {
       throw new TException(e);
     }
   }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+    member.acknowledgeAppendLog(ack);
+  }
+
+  @Override
+  public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+      throws TException {
+    try {
+      return member.appendEntryIndirect(request, subReceivers);
+    } catch (UnknownLogTypeException e) {
+      throw new TException(e);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
index 255cb4b..0de64a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.utils;
 
+import java.util.Objects;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -27,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
index be87eb1..905019f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
@@ -58,7 +58,7 @@ public class ClusterNode extends Node {
         && this.dataPort == that.dataPort
         && this.metaPort == that.metaPort
         && this.clientPort == that.clientPort
-        && this.clientIp.equals(that.clientIp);
+        && Objects.equals(this.clientIp, that.clientIp);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 4c64587..ef656fe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -175,6 +175,18 @@ public class ClusterUtils {
     }
   }
 
+  public static boolean isNodeEquals(Node node1, Node node2) {
+    if (node1 == node2) {
+      return true;
+    }
+    if (node1 == null || node2 == null) {
+      return false;
+    }
+    return Objects.equals(node1.internalIp, node2.internalIp) && Objects.equals(node1.metaPort,
+        node2.metaPort);
+  }
+
+
   private static boolean seedNodesContains(List<Node> seedNodeList, List<Node> subSeedNodeList) {
     // Because identifier is not compared here, List.contains() is not suitable
     if (subSeedNodeList == null) {
diff --git a/pom.xml b/pom.xml
index aed72ae..879a584 100644
--- a/pom.xml
+++ b/pom.xml
@@ -621,32 +621,32 @@
                         </excludes>
                     </configuration>
                 </plugin>
-                <plugin>
-                    <groupId>com.diffplug.spotless</groupId>
-                    <artifactId>spotless-maven-plugin</artifactId>
-                    <version>${spotless.version}</version>
-                    <configuration>
-                        <java>
-                            <googleJavaFormat>
-                                <version>1.7</version>
-                                <style>GOOGLE</style>
-                            </googleJavaFormat>
-                            <importOrder>
-                                <order>org.apache.iotdb,,javax,java,\#</order>
-                            </importOrder>
-                            <removeUnusedImports/>
-                        </java>
-                    </configuration>
-                    <executions>
-                        <execution>
-                            <id>spotless-check</id>
-                            <phase>validate</phase>
-                            <goals>
-                                <goal>check</goal>
-                            </goals>
-                        </execution>
-                    </executions>
-                </plugin>
+                <!--                <plugin>-->
+                <!--                    <groupId>com.diffplug.spotless</groupId>-->
+                <!--                    <artifactId>spotless-maven-plugin</artifactId>-->
+                <!--                    <version>${spotless.version}</version>-->
+                <!--                    <configuration>-->
+                <!--                        <java>-->
+                <!--                            <googleJavaFormat>-->
+                <!--                                <version>1.7</version>-->
+                <!--                                <style>GOOGLE</style>-->
+                <!--                            </googleJavaFormat>-->
+                <!--                            <importOrder>-->
+                <!--                                <order>org.apache.iotdb,,javax,java,\#</order>-->
+                <!--                            </importOrder>-->
+                <!--                            <removeUnusedImports/>-->
+                <!--                        </java>-->
+                <!--                    </configuration>-->
+                <!--                    <executions>-->
+                <!--                        <execution>-->
+                <!--                            <id>spotless-check</id>-->
+                <!--                            <phase>validate</phase>-->
+                <!--                            <goals>-->
+                <!--                                <goal>check</goal>-->
+                <!--                            </goals>-->
+                <!--                        </execution>-->
+                <!--                    </executions>-->
+                <!--                </plugin>-->
             </plugins>
         </pluginManagement>
         <plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index d4d08c1..0ef0033 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -349,6 +349,8 @@ public class PlanExecutor implements IPlanExecutor {
         return createDeviceTemplate((CreateTemplatePlan) plan);
       case SET_DEVICE_TEMPLATE:
         return setDeviceTemplate((SetDeviceTemplatePlan) plan);
+      case EMPTY:
+        return true;
       default:
         throw new UnsupportedOperationException(
             String.format("operation %s is not supported", plan.getOperatorType()));
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index e1fb10f..fddda60 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -157,5 +157,6 @@ public abstract class Operator {
     CREATE_TEMPLATE,
     SET_DEVICE_TEMPLATE,
     SET_USING_DEVICE_TEMPLATE,
+    EMPTY,
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 4c5b1a1..457bd22 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
 import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
@@ -368,6 +369,9 @@ public abstract class PhysicalPlan {
         case AUTO_CREATE_DEVICE_MNODE:
           plan = new AutoCreateDeviceMNodePlan();
           break;
+        case EXPR:
+          plan = new ExprPlan();
+          break;
         default:
           throw new IOException("unrecognized log type " + type);
       }
@@ -422,7 +426,8 @@ public abstract class PhysicalPlan {
     CREATE_TRIGGER,
     DROP_TRIGGER,
     START_TRIGGER,
-    STOP_TRIGGER
+    STOP_TRIGGER,
+    EXPR
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
new file mode 100644
index 0000000..2a1e16b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+public class ExprPlan extends PhysicalPlan {
+
+  private byte[] workload;
+  private boolean needForward;
+
+  public ExprPlan() {
+    super(false, OperatorType.EMPTY);
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return null;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.write((byte) PhysicalPlanType.EXPR.ordinal());
+    stream.writeInt(workload == null ? 0 : workload.length);
+    if (workload != null) {
+      stream.write(workload);
+    }
+    stream.write(needForward ? 1 : 0);
+  }
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    buffer.put((byte) PhysicalPlanType.EXPR.ordinal());
+    buffer.putInt(workload == null ? 0 : workload.length);
+    if (workload != null) {
+      buffer.put(workload);
+    }
+    buffer.put(needForward ? (byte) 1 : 0);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+    int size = buffer.getInt();
+    workload = new byte[size];
+    buffer.get(workload);
+    needForward = buffer.get() == 1;
+  }
+
+  public void setWorkload(byte[] workload) {
+    this.workload = workload;
+  }
+
+  public boolean isNeedForward() {
+    return needForward;
+  }
+
+  public void setNeedForward(boolean needForward) {
+    this.needForward = needForward;
+  }
+}
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index f23130e..449e843 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -90,6 +90,16 @@ struct AppendEntryRequest {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   7: optional Node header
+  // true if the request is sent from the leader, and the reiceiver just responds to the sender;
+  // otherwise, the reiceiver should also notify the leader
+  8: optional bool isFromLeader
+}
+
+
+struct AppendEntryAcknowledgement {
+  1: required long term
+  2: required long index
+  3: required long response
 }
 
 // leader -> follower
@@ -306,6 +316,11 @@ service RaftService {
   **/
   long appendEntry(1:AppendEntryRequest request)
 
+  /**
+  * Like appendEntry, but the receiver should forward the request to nodes in subReceivers.
+  **/
+  long appendEntryIndirect(1:AppendEntryRequest request, 2:list<Node> subReceivers)
+
   void sendSnapshot(1:SendSnapshotRequest request)
 
   /**
@@ -337,6 +352,12 @@ service RaftService {
   * interface to notify the leader to remove the associated hardlink.
   **/
   void removeHardLink(1: string hardLinkPath)
+
+  /**
+  * when a follower reiceives an AppendEntryRequest from a non-leader node, it sends this ack to
+  * the leader so the leader can know whether it has successfully received the entry
+  **/
+  void acknowledgeAppendEntry(1: AppendEntryAcknowledgement ack)
 }