You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/01 11:08:53 UTC

[iotdb] branch expr created (now 438cd8c)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 438cd8c  implement server interfaces

This branch includes the following new commits:

     new 4aa5448  temp save
     new 438cd8c  implement server interfaces

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 02/02: implement server interfaces

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 438cd8cdebdbf3390795c089bf53656d178a7cef
Author: jt <jt...@163.com>
AuthorDate: Thu May 27 17:17:41 2021 +0800

    implement server interfaces
---
 .../iotdb/cluster/server/DataClusterServer.java    | 27 ++++++++++++++++++++++
 .../iotdb/cluster/server/MetaClusterServer.java    | 25 ++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index d88d1a3..69469b0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -963,4 +964,30 @@ public class DataClusterServer extends RaftServer
     getDataAsyncService(thisNode, resultHandler, hardLinkPath)
         .removeHardLink(hardLinkPath, resultHandler);
   }
+
+  @Override
+  public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+      throws TException {
+    return getDataSyncService(thisNode)
+        .appendEntryIndirect(request, subReceivers);
+  }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+    getDataSyncService(thisNode).acknowledgeAppendEntry(ack);
+  }
+
+  @Override
+  public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
+      AsyncMethodCallback<Long> resultHandler) {
+    getDataAsyncService(thisNode, resultHandler, request)
+        .appendEntryIndirect(request, subReceivers, resultHandler);
+  }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+      AsyncMethodCallback<Void> resultHandler) {
+    getDataAsyncService(thisNode, resultHandler, ack)
+        .acknowledgeAppendEntry(ack, resultHandler);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index ddc64cc..f18d920 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.cluster.server;
 
+import java.util.List;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
@@ -26,6 +27,7 @@ import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.metadata.MetaPuller;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -370,4 +372,27 @@ public class MetaClusterServer extends RaftServer
   public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
     asyncService.handshake(sender, resultHandler);
   }
+
+  @Override
+  public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+      throws TException {
+    return syncService.appendEntryIndirect(request, subReceivers);
+  }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+    syncService.acknowledgeAppendEntry(ack);
+  }
+
+  @Override
+  public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
+      AsyncMethodCallback<Long> resultHandler) {
+    asyncService.appendEntryIndirect(request, subReceivers, resultHandler);
+  }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+      AsyncMethodCallback<Void> resultHandler) {
+    asyncService.acknowledgeAppendEntry(ack, resultHandler);
+  }
 }

[iotdb] 01/02: temp save

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4aa5448b1eceb0e499266678d3677787e0d9a010
Author: jt <jt...@163.com>
AuthorDate: Tue May 25 09:40:24 2021 +0800

    temp save
---
 cluster/distribute.sh                              |  19 ++
 cluster/src/assembly/resources/sbin/expr-bench.sh  |  73 ++++++
 cluster/src/assembly/resources/sbin/expr-server.sh |  73 ++++++
 .../apache/iotdb/cluster/config/ClusterConfig.java |  10 +
 .../org/apache/iotdb/cluster/expr/ExprBench.java   | 103 ++++++++
 .../ExprLogDispatcher.java}                        | 217 ++++-------------
 .../org/apache/iotdb/cluster/expr/ExprMember.java  | 151 ++++++++++++
 .../org/apache/iotdb/cluster/expr/ExprServer.java  |  90 +++++++
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 103 ++++++++
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  35 ++-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  33 +++
 .../log/manage/UnCommittedEntryManager.java        |  12 +-
 .../serializable/SyncLogDequeSerializer.java       |   2 +-
 .../iotdb/cluster/server/DataClusterServer.java    |   8 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |  24 +-
 .../apache/iotdb/cluster/server/RaftServer.java    |  14 +-
 .../server/handlers/caller/HeartbeatHandler.java   |   2 +-
 .../handlers/forwarder/IndirectAppendHandler.java  |  49 ++++
 .../cluster/server/heartbeat/HeartbeatThread.java  |   3 +-
 .../server/heartbeat/MetaHeartbeatServer.java      |   4 +-
 .../server/heartbeat/MetaHeartbeatThread.java      |  10 +-
 .../cluster/server/member/MetaGroupMember.java     |  26 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 264 ++++++++++++++++-----
 .../cluster/server/service/BaseAsyncService.java   |  19 ++
 .../cluster/server/service/BaseSyncService.java    |  52 ++--
 .../apache/iotdb/cluster/utils/ClientUtils.java    |   2 +
 .../apache/iotdb/cluster/utils/ClusterNode.java    |   2 +-
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |  12 +
 pom.xml                                            |  52 ++--
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   2 +
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   1 +
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   7 +-
 .../apache/iotdb/db/qp/physical/sys/ExprPlan.java  |  84 +++++++
 thrift-cluster/src/main/thrift/cluster.thrift      |  21 ++
 34 files changed, 1239 insertions(+), 340 deletions(-)

diff --git a/cluster/distribute.sh b/cluster/distribute.sh
new file mode 100644
index 0000000..98758da
--- /dev/null
+++ b/cluster/distribute.sh
@@ -0,0 +1,19 @@
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+
+ips=(fit36 fit38 fit39)
+target_lib_path=/data/iotdb_expr/lib/
+
+for ip in ${ips[*]}
+  do
+    ssh fit@$ip "mkdir $target_lib_path"
+    scp -r $src_lib_path fit@$ip:$target_lib_path
+  done
+
+ips=(fit31 fit32 fit33 fit34)
+target_lib_path=/disk/iotdb_expr/lib/
+
+for ip in ${ips[*]}
+  do
+    ssh fit@$ip "mkdir $target_lib_path"
+    scp -r $src_lib_path fit@$ip:$target_lib_path
+  done
\ No newline at end of file
diff --git a/cluster/src/assembly/resources/sbin/expr-bench.sh b/cluster/src/assembly/resources/sbin/expr-bench.sh
new file mode 100644
index 0000000..d4fa092
--- /dev/null
+++ b/cluster/src/assembly/resources/sbin/expr-bench.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting IoTDB (Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+enable_printgc=false
+if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+  enable_printgc=true;
+  shift
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+    for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+        if [ -x "$java" ]; then
+            JAVA="$java"
+            break
+        fi
+    done
+else
+    JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+    echo Unable to find java executable. Check JAVA_HOME and PATH environment variables.  > /dev/stderr
+    exit 1;
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.cluster.expr.ExprBench
+
+launch_service()
+{
+	iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+	iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@
+	return $?
+}
+
+# Start up the service
+launch_service "$classname" $@
+
+exit $?
diff --git a/cluster/src/assembly/resources/sbin/expr-server.sh b/cluster/src/assembly/resources/sbin/expr-server.sh
new file mode 100644
index 0000000..f323d0b
--- /dev/null
+++ b/cluster/src/assembly/resources/sbin/expr-server.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting IoTDB (Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+enable_printgc=false
+if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+  enable_printgc=true;
+  shift
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+    for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+        if [ -x "$java" ]; then
+            JAVA="$java"
+            break
+        fi
+    done
+else
+    JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+    echo Unable to find java executable. Check JAVA_HOME and PATH environment variables.  > /dev/stderr
+    exit 1;
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.cluster.expr.ExprServer
+
+launch_service()
+{
+	iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+	iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@
+	return $?
+}
+
+# Start up the service
+launch_service "$classname" $@
+
+exit $?
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 7d3e42a..540201c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -168,6 +168,8 @@ public class ClusterConfig {
 
   private boolean openServerRpcPort = false;
 
+  private boolean useIndirectBroadcasting = false;
+
   /**
    * create a clusterConfig class. The internalIP will be set according to the server's hostname. If
    * there is something error for getting the ip of the hostname, then set the internalIp as
@@ -183,6 +185,14 @@ public class ClusterConfig {
     seedNodeUrls = Arrays.asList(String.format("%s:%d", internalIp, internalMetaPort));
   }
 
+  public boolean isUseIndirectBroadcasting() {
+    return useIndirectBroadcasting;
+  }
+
+  public void setUseIndirectBroadcasting(boolean useIndirectBroadcasting) {
+    this.useIndirectBroadcasting = useIndirectBroadcasting;
+  }
+
   public int getSelectorNumOfClientPool() {
     return selectorNumOfClientPool;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
new file mode 100644
index 0000000..b625f2c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.client.sync.SyncClientFactory;
+import org.apache.iotdb.cluster.client.sync.SyncClientPool;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+
+public class ExprBench {
+
+  private AtomicLong requestCounter = new AtomicLong();
+  private int threadNum = 64;
+  private int workloadSize = 64 * 1024;
+  private SyncClientPool clientPool;
+  private Node target;
+  private int maxRequestNum;
+
+  public ExprBench(Node target) {
+    this.target = target;
+    SyncClientFactory factory = new FactorySync(new Factory());
+    clientPool = new SyncClientPool(factory);
+  }
+
+  public void benchmark() {
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < threadNum; i++) {
+      new Thread(
+          () -> {
+            Client client = clientPool.getClient(target);
+            ExecutNonQueryReq request = new ExecutNonQueryReq();
+            ExprPlan plan = new ExprPlan();
+            plan.setWorkload(new byte[workloadSize]);
+            plan.setNeedForward(true);
+            ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+            plan.serialize(byteBuffer);
+            byteBuffer.flip();
+            request.setPlanBytes(byteBuffer);
+            long currRequsetNum = -1;
+            while (true) {
+
+              try {
+                client.executeNonQueryPlan(request);
+                currRequsetNum = requestCounter.incrementAndGet();
+              } catch (TException e) {
+                e.printStackTrace();
+              }
+
+              if (currRequsetNum % 1000 == 0) {
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                System.out.println(String.format("%d %d %f(%f)", elapsedTime,
+                    currRequsetNum,
+                    (currRequsetNum + 0.0) / elapsedTime,
+                    currRequsetNum * workloadSize / (1024.0*1024.0)  / elapsedTime));
+              }
+
+              if (currRequsetNum >= maxRequestNum) {
+                break;
+              }
+            }
+          })
+          .start();
+    }
+  }
+
+  public void setMaxRequestNum(int maxRequestNum) {
+    this.maxRequestNum = maxRequestNum;
+  }
+
+  public static void main(String[] args) {
+    Node target = new Node();
+    target.setInternalIp(args[0]);
+    target.setMetaPort(Integer.parseInt(args[1]));
+    ExprBench bench = new ExprBench(target);
+    bench.maxRequestNum = Integer.parseInt(args[2]);
+    bench.threadNum = Integer.parseInt(args[3]);
+    bench.benchmark();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
similarity index 63%
copy from cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
index a600c4a..4b302a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
@@ -17,9 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.log;
+package org.apache.iotdb.cluster.expr;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.client.sync.SyncClientPool;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -33,26 +51,13 @@ import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.TestOnly;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
  * followers and send the logs in an ordered manner so that the followers will not wait for previous
@@ -60,27 +65,25 @@ import java.util.concurrent.atomic.AtomicLong;
  * follower A, the actual reach order may be log3, log2, and log1. According to the protocol, log3
  * and log2 must halt until log1 reaches, as a result, the total delay may increase significantly.
  */
-public class LogDispatcher {
+public class ExprLogDispatcher {
 
-  private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-  private RaftMember member;
-  private boolean useBatchInLogCatchUp =
-      ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
+  private static final Logger logger = LoggerFactory.getLogger(ExprLogDispatcher.class);
   private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
   private ExecutorService executorService;
   private static ExecutorService serializationService =
       Executors.newFixedThreadPool(
           Runtime.getRuntime().availableProcessors(),
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+  private SyncClientPool clientPool;
+  private Node leader;
 
-  public LogDispatcher(RaftMember member) {
-    this.member = member;
+  public ExprLogDispatcher(List<Node> nodes, Node leader) {
     executorService = Executors.newCachedThreadPool();
-    for (Node node : member.getAllNodes()) {
-      if (!node.equals(member.getThisNode())) {
-        nodeLogQueues.add(createQueueAndBindingThread(node));
-      }
+    for (Node node : nodes) {
+      nodeLogQueues.add(createQueueAndBindingThread(node));
     }
+    clientPool = new SyncClientPool(new FactorySync(new Factory()));
+    this.leader = leader;
   }
 
   @TestOnly
@@ -114,15 +117,9 @@ public class LogDispatcher {
           addSucceeded = nodeLogQueue.add(log);
         }
 
-        if (!addSucceeded) {
-          logger.debug(
-              "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName());
-        } else {
+        if (addSucceeded) {
           log.setEnqueueTime(System.nanoTime());
         }
-      } catch (IllegalStateException e) {
-        logger.debug(
-            "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName());
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -219,30 +216,26 @@ public class LogDispatcher {
 
   class DispatcherThread implements Runnable {
 
-    private Node receiver;
+    private Node node;
+    private Client client;
     private BlockingQueue<SendLogRequest> logBlockingDeque;
     private List<SendLogRequest> currBatch = new ArrayList<>();
-    private Peer peer;
 
-    DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
-      this.receiver = receiver;
+    DispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingDeque) {
+      this.client = clientPool.getClient(node);
       this.logBlockingDeque = logBlockingDeque;
-      this.peer =
-          member
-              .getPeerMap()
-              .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
     }
 
     @Override
     public void run() {
-      Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
+      Thread.currentThread().setName("LogDispatcher-" + node);
       try {
         while (!Thread.interrupted()) {
           SendLogRequest poll = logBlockingDeque.take();
           currBatch.add(poll);
           logBlockingDeque.drainTo(currBatch);
           if (logger.isDebugEnabled()) {
-            logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
+            logger.debug("Sending {} logs to {}", currBatch.size(), node);
           }
           for (SendLogRequest request : currBatch) {
             request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
@@ -258,59 +251,25 @@ public class LogDispatcher {
       logger.info("Dispatcher exits");
     }
 
-    private void appendEntriesAsync(
-        List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch)
-        throws TException {
-      AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
-      AsyncClient client = member.getSendLogAsyncClient(receiver);
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
-      }
-      if (client != null) {
-        client.appendEntries(request, handler);
-      }
-    }
-
     private void appendEntriesSync(
         List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch) {
 
       long startTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
-      if (!member.waitForPrevLog(peer, currBatch.get(0).getLog())) {
-        logger.warn(
-            "{}: node {} timed out when appending {}",
-            member.getName(),
-            receiver,
-            currBatch.get(0).getLog());
-        return;
-      }
       Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
 
-      Client client = member.getSyncClient(receiver);
-      if (client == null) {
-        logger.error("No available client for {}", receiver);
-        return;
-      }
-      AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
       startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
       try {
         long result = client.appendEntries(request);
         Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
         if (result != -1 && logger.isInfoEnabled()) {
           logger.info(
-              "{}: Append {} logs to {}, resp: {}",
-              member.getName(),
+              "Append {} logs to {}, resp: {}",
               logList.size(),
-              receiver,
+              node,
               result);
         }
-        handler.onComplete(result);
       } catch (TException e) {
-        client.getInputProtocol().getTransport().close();
-        handler.onError(e);
         logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
-      } finally {
-        ClientUtils.putBackSyncClient(client);
       }
     }
 
@@ -318,15 +277,8 @@ public class LogDispatcher {
         List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
       AppendEntriesRequest request = new AppendEntriesRequest();
 
-      if (member.getHeader() != null) {
-        request.setHeader(member.getHeader());
-      }
-      request.setLeader(member.getThisNode());
-      request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-
-      synchronized (member.getTerm()) {
-        request.setTerm(member.getTerm().get());
-      }
+      request.setLeader(leader);
+      request.setTerm(1);
 
       request.setEntries(logList);
       // set index for raft
@@ -339,7 +291,7 @@ public class LogDispatcher {
       return request;
     }
 
-    private void sendLogs(List<SendLogRequest> currBatch) throws TException {
+    private void sendLogs(List<SendLogRequest> currBatch) {
       int logIndex = 0;
       logger.debug(
           "send logs from index {} to {}",
@@ -362,11 +314,7 @@ public class LogDispatcher {
         }
 
         AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
-        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-          appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
-        } else {
-          appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
-        }
+        appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
         for (; prevIndex < logIndex; prevIndex++) {
           Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
               currBatch.get(prevIndex).getLog().getCreateTime());
@@ -374,84 +322,9 @@ public class LogDispatcher {
       }
     }
 
-    private void sendBatchLogs(List<SendLogRequest> currBatch) throws TException {
-      if (currBatch.size() > 1) {
-        if (useBatchInLogCatchUp) {
-          sendLogs(currBatch);
-        } else {
-          for (SendLogRequest batch : currBatch) {
-            sendLog(batch);
-          }
-        }
-      } else {
-        sendLog(currBatch.get(0));
-      }
-    }
-
-    private void sendLog(SendLogRequest logRequest) {
-      Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
-          logRequest.getLog().getCreateTime());
-      member.sendLogToFollower(
-          logRequest.getLog(),
-          logRequest.getVoteCounter(),
-          receiver,
-          logRequest.getLeaderShipStale(),
-          logRequest.getNewLeaderTerm(),
-          logRequest.getAppendEntryRequest());
-      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
-          logRequest.getLog().getCreateTime());
+    private void sendBatchLogs(List<SendLogRequest> currBatch) {
+      sendLogs(currBatch);
     }
 
-    class AppendEntriesHandler implements AsyncMethodCallback<Long> {
-
-      private final List<AsyncMethodCallback<Long>> singleEntryHandlers;
-
-      private AppendEntriesHandler(List<SendLogRequest> batch) {
-        singleEntryHandlers = new ArrayList<>(batch.size());
-        for (SendLogRequest sendLogRequest : batch) {
-          AppendNodeEntryHandler handler =
-              getAppendNodeEntryHandler(
-                  sendLogRequest.getLog(),
-                  sendLogRequest.getVoteCounter(),
-                  receiver,
-                  sendLogRequest.getLeaderShipStale(),
-                  sendLogRequest.getNewLeaderTerm(),
-                  peer);
-          singleEntryHandlers.add(handler);
-        }
-      }
-
-      @Override
-      public void onComplete(Long aLong) {
-        for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) {
-          singleEntryHandler.onComplete(aLong);
-        }
-      }
-
-      @Override
-      public void onError(Exception e) {
-        for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) {
-          singleEntryHandler.onError(e);
-        }
-      }
-
-      private AppendNodeEntryHandler getAppendNodeEntryHandler(
-          Log log,
-          AtomicInteger voteCounter,
-          Node node,
-          AtomicBoolean leaderShipStale,
-          AtomicLong newLeaderTerm,
-          Peer peer) {
-        AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
-        handler.setReceiver(node);
-        handler.setVoteCounter(voteCounter);
-        handler.setLeaderShipStale(leaderShipStale);
-        handler.setLog(log);
-        handler.setMember(member);
-        handler.setPeer(peer);
-        handler.setReceiverTerm(newLeaderTerm);
-        return handler;
-      }
-    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
new file mode 100644
index 0000000..efa63c2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+public class ExprMember extends MetaGroupMember {
+
+  public ExprMember() {
+  }
+
+  public ExprMember(Node thisNode, List<Node> allNodes) {
+    this.thisNode = thisNode;
+    this.allNodes = allNodes;
+  }
+
+  public ExprMember(TProtocolFactory factory,
+      Node thisNode, Coordinator coordinator)
+      throws QueryProcessException {
+    super(factory, thisNode, coordinator);
+  }
+
+  private int windowSize = 10000;
+  private Log[] logWindow = new Log[windowSize];
+  private long windowPrevLogIndex;
+  private long windowPrevLogTerm;
+  private long windowTerm;
+
+  @Override
+  protected synchronized void startSubServers() {
+
+  }
+
+  @Override
+  public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+    if (false) {
+      if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) {
+        return StatusUtils.OK;
+      } else if (plan instanceof ExprPlan) {
+        ((ExprPlan) plan).setNeedForward(false);
+      }
+
+      ExecutNonQueryReq req = new ExecutNonQueryReq();
+      ByteBuffer byteBuffer = ByteBuffer.allocate(128 * 1024);
+      plan.serialize(byteBuffer);
+      byteBuffer.flip();
+      req.setPlanBytes(byteBuffer);
+
+      for (Node node : getAllNodes()) {
+        if (!ClusterUtils.isNodeEquals(node, thisNode)) {
+          Client syncClient = getSyncClient(node);
+          try {
+            syncClient.executeNonQueryPlan(req);
+          } catch (TException e) {
+            ClientUtils.putBackSyncClient(syncClient);
+            return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
+          }
+          ClientUtils.putBackSyncClient(syncClient);
+        }
+      }
+      return StatusUtils.OK;
+    }
+    return processNonPartitionedMetaPlan(plan);
+  }
+
+  protected long appendEntry1(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+    long resp;
+    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+    long success = 0;
+
+    synchronized (logManager) {
+      long windowPos = log.getCurrLogIndex() - logManager.getLastLogIndex() - 1;
+      if (windowPos < 0) {
+        success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+      } else if (windowPos < windowSize) {
+        logWindow[(int) windowPos] = log;
+        if (windowPos == 0) {
+          windowPrevLogIndex = prevLogIndex;
+          windowPrevLogTerm = prevLogTerm;
+
+          int flushPos = 0;
+          for (; flushPos < windowSize; flushPos++) {
+            if (logWindow[flushPos] == null) {
+              break;
+            }
+          }
+          // flush [0, flushPos)
+          List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+          success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit,
+              logs);
+          if (success != -1) {
+            System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - flushPos);
+            for (int i = 1; i <= flushPos; i++) {
+              logWindow[windowSize - i] = null;
+            }
+          } else {
+            System.out.println("not success");
+          }
+        }
+      } else {
+        return Response.RESPONSE_LOG_MISMATCH;
+      }
+    }
+
+    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+    if (success != -1) {
+      resp = Response.RESPONSE_AGREE;
+    } else {
+      // the incoming log points to an illegal position, reject it
+      resp = Response.RESPONSE_LOG_MISMATCH;
+    }
+    return resp;
+  }
+
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
new file mode 100644
index 0000000..338755e
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import io.moquette.broker.config.IConfig;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.LogDispatcher;
+import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.tsfile.read.filter.operator.In;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class ExprServer extends MetaClusterServer {
+
+  protected ExprServer() throws QueryProcessException {
+    super();
+  }
+
+  @Override
+  protected MetaGroupMember createMetaGroupMember() throws QueryProcessException {
+    return new ExprMember(new Factory(), thisNode, coordinator);
+  }
+
+  @Override
+  protected String getClientThreadPrefix() {
+    return "FollowerClient";
+  }
+
+  @Override
+  protected String getServerClientName() {
+    return "FollowerServer";
+  }
+
+  @Override
+  protected TServerTransport getServerSocket() throws TTransportException {
+    return new TServerSocket(
+        new InetSocketAddress(thisNode.getInternalIp(), thisNode.getMetaPort()));
+  }
+
+
+  public static void main(String[] args)
+      throws StartupException, TTransportException, QueryProcessException, ConfigInconsistentException, StartUpCheckFailureException {
+
+    String[] nodeStrings = args[0].split(":");
+    String ip = nodeStrings[0];
+    int port = Integer.parseInt(nodeStrings[1]);
+    String[] allNodeStr = args[1].split(",");
+
+    int dispatcherThreadNum = Integer.parseInt(args[2]);
+
+    ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
+    ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
+    ClusterDescriptor.getInstance().getConfig().setInternalIp(ip);
+    ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(false);
+    ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
+    RaftMember.USE_LOG_DISPATCHER = true;
+    LogDispatcher.bindingThreadNum = dispatcherThreadNum;
+
+    ExprServer server = new ExprServer();
+    server.start();
+    server.buildCluster();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
new file mode 100644
index 0000000..75f1b16
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import org.apache.iotdb.cluster.log.LogDispatcher.DispatcherThread;
+import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+
+/**
+ * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
+ * followers and let the selected followers to broadcast the log to other followers.
+ */
+public class IndirectLogDispatcher extends LogDispatcher {
+
+  private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
+
+  public IndirectLogDispatcher(RaftMember member) {
+    super(member);
+    recalculateDirectFollowerMap();
+  }
+
+  @Override
+  LogDispatcher.DispatcherThread newDispatcherThread(Node node,
+      BlockingQueue<SendLogRequest> logBlockingQueue) {
+    return new DispatcherThread(node, logBlockingQueue);
+  }
+
+  @Override
+  void createQueueAndBindingThreads() {
+    for (Node node : directToIndirectFollowerMap.keySet()) {
+      nodeLogQueues.add(createQueueAndBindingThread(node));
+    }
+  }
+
+  public void recalculateDirectFollowerMap() {
+    List<Node> allNodes = new ArrayList<>(member.getAllNodes());
+    allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
+    QueryCoordinator instance = QueryCoordinator.getINSTANCE();
+    List<Node> orderedNodes = instance.reorderNodes(allNodes);
+
+    synchronized (this) {
+      directToIndirectFollowerMap.clear();
+      for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
+        if (i != j) {
+          directToIndirectFollowerMap.put(orderedNodes.get(i),
+              Collections.singletonList(orderedNodes.get(j)));
+        } else {
+          directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
+        }
+      }
+    }
+  }
+
+  class DispatcherThread extends LogDispatcher.DispatcherThread {
+
+    DispatcherThread(Node receiver,
+        BlockingQueue<SendLogRequest> logBlockingDeque) {
+      super(receiver, logBlockingDeque);
+    }
+
+    @Override
+    void sendLog(SendLogRequest logRequest) {
+      Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+          logRequest.getLog().getCreateTime());
+      member.sendLogToFollower(
+          logRequest.getLog(),
+          logRequest.getVoteCounter(),
+          receiver,
+          logRequest.getLeaderShipStale(),
+          logRequest.getNewLeaderTerm(),
+          logRequest.getAppendEntryRequest(), directToIndirectFollowerMap.get(receiver));
+      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
+          logRequest.getLog().getCreateTime());
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a600c4a..711b511 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.log;
 
+import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -63,19 +64,24 @@ import java.util.concurrent.atomic.AtomicLong;
 public class LogDispatcher {
 
   private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
-  private RaftMember member;
-  private boolean useBatchInLogCatchUp =
-      ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
-  private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
+  RaftMember member;
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+  private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
+  List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
   private ExecutorService executorService;
   private static ExecutorService serializationService =
       Executors.newFixedThreadPool(
           Runtime.getRuntime().availableProcessors(),
           new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+  public static int bindingThreadNum = 1;
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
     executorService = Executors.newCachedThreadPool();
+    createQueueAndBindingThreads();
+  }
+
+  void createQueueAndBindingThreads() {
     for (Node node : member.getAllNodes()) {
       if (!node.equals(member.getThisNode())) {
         nodeLogQueues.add(createQueueAndBindingThread(node));
@@ -129,17 +135,20 @@ public class LogDispatcher {
     }
   }
 
-  private BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
+  BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
     BlockingQueue<SendLogRequest> logBlockingQueue =
         new ArrayBlockingQueue<>(
             ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
-    int bindingThreadNum = 1;
     for (int i = 0; i < bindingThreadNum; i++) {
-      executorService.submit(new DispatcherThread(node, logBlockingQueue));
+      executorService.submit(newDispatcherThread(node, logBlockingQueue));
     }
     return logBlockingQueue;
   }
 
+  DispatcherThread newDispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingQueue) {
+    return new DispatcherThread(node, logBlockingQueue);
+  }
+
   public static class SendLogRequest {
 
     private Log log;
@@ -219,7 +228,7 @@ public class LogDispatcher {
 
   class DispatcherThread implements Runnable {
 
-    private Node receiver;
+    Node receiver;
     private BlockingQueue<SendLogRequest> logBlockingDeque;
     private List<SendLogRequest> currBatch = new ArrayList<>();
     private Peer peer;
@@ -238,9 +247,11 @@ public class LogDispatcher {
       Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
       try {
         while (!Thread.interrupted()) {
-          SendLogRequest poll = logBlockingDeque.take();
-          currBatch.add(poll);
-          logBlockingDeque.drainTo(currBatch);
+          synchronized (logBlockingDeque) {
+            SendLogRequest poll = logBlockingDeque.take();
+            currBatch.add(poll);
+            logBlockingDeque.drainTo(currBatch);
+          }
           if (logger.isDebugEnabled()) {
             logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
           }
@@ -388,7 +399,7 @@ public class LogDispatcher {
       }
     }
 
-    private void sendLog(SendLogRequest logRequest) {
+    void sendLog(SendLogRequest logRequest) {
       Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
           logRequest.getLog().getCreateTime());
       member.sendLogToFollower(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 446eefc..eae40e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -420,6 +420,25 @@ public abstract class RaftLogManager {
     return -1;
   }
 
+  public long append(long lastIndex, long lastTerm, long leaderCommit, Log entry) {
+    long newLastIndex = lastIndex + 1;
+    if (entry.getCurrLogIndex() <= commitIndex) {
+      logger.debug(
+          "{}: entry {} conflict with committed entry [commitIndex({})]",
+          name,
+          entry.getCurrLogIndex(),
+          commitIndex);
+    } else {
+      append(entry);
+    }
+    try {
+      commitTo(Math.min(leaderCommit, newLastIndex));
+    } catch (LogExecutionException e) {
+      // exceptions are ignored on follower side
+    }
+    return newLastIndex;
+  }
+
   /**
    * Used by leader node or MaybeAppend to directly append to unCommittedEntryManager. Note that the
    * caller should ensure entries[0].index > committed.
@@ -466,6 +485,20 @@ public abstract class RaftLogManager {
     return getLastLogIndex();
   }
 
+  public long appendDirectly(Log entry) {
+    long after = entry.getCurrLogIndex();
+    if (after <= commitIndex) {
+      logger.error("{}: after({}) is out of range [commitIndex({})]", name, after, commitIndex);
+      return -1;
+    }
+    getUnCommittedEntryManager().truncateAndAppend(entry);
+    Object logUpdateCondition = getLogUpdateCondition(entry.getCurrLogIndex());
+    synchronized (logUpdateCondition) {
+      logUpdateCondition.notifyAll();
+    }
+    return getLastLogIndex();
+  }
+
   /**
    * Used by leader node to try to commit entries.
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index a3026c9..c85e709 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -185,10 +185,10 @@ public class UnCommittedEntryManager {
    * @param appendingEntry request entry
    */
   void truncateAndAppend(Log appendingEntry) {
-    if (maybeTerm(appendingEntry.getCurrLogIndex()) == appendingEntry.getCurrLogTerm()) {
-      // skip existing entry
-      return;
-    }
+//    if (maybeTerm(appendingEntry.getCurrLogIndex()) == appendingEntry.getCurrLogTerm()) {
+//      // skip existing entry
+//      return;
+//    }
 
     long after = appendingEntry.getCurrLogIndex();
     long len = after - offset;
@@ -203,8 +203,8 @@ public class UnCommittedEntryManager {
     } else {
       // clear conflict entries
       // then append
-      logger.info(
-          "truncate the entries after index {}, append a new entry {}", after, appendingEntry);
+//      logger.info(
+//          "truncate the entries after index {}, append a new entry {}", after, appendingEntry);
       int truncateIndex = (int) (after - offset);
       if (truncateIndex < entries.size()) {
         entries.subList(truncateIndex, entries.size()).clear();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index c81e6d0..a945a92 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -431,7 +431,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   @Override
   public void setHardStateAndFlush(HardState state) {
     this.state = state;
-    serializeMeta(meta);
+    // serializeMeta(meta);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e4c81f8..d88d1a3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -521,7 +521,7 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  TProcessor getProcessor() {
+  protected TProcessor getProcessor() {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       return new AsyncProcessor<>(this);
     } else {
@@ -530,7 +530,7 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  TServerTransport getServerSocket() throws TTransportException {
+  protected TServerTransport getServerSocket() throws TTransportException {
     logger.info(
         "[{}] Cluster node will listen {}:{}",
         getServerClientName(),
@@ -547,12 +547,12 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
-  String getClientThreadPrefix() {
+  protected String getClientThreadPrefix() {
     return "DataClientThread-";
   }
 
   @Override
-  String getServerClientName() {
+  protected String getServerClientName() {
     return "DataServerThread-";
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 12e286f..ddc64cc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -74,27 +74,31 @@ public class MetaClusterServer extends RaftServer
   private static Logger logger = LoggerFactory.getLogger(MetaClusterServer.class);
 
   // each node only contains one MetaGroupMember
-  private MetaGroupMember member;
-  private Coordinator coordinator;
+  protected MetaGroupMember member;
+  protected Coordinator coordinator;
   // the single-node IoTDB instance
   private IoTDB ioTDB;
   // to register the ClusterMonitor that helps monitoring the cluster
   private RegisterManager registerManager = new RegisterManager();
-  private MetaAsyncService asyncService;
-  private MetaSyncService syncService;
-  private MetaHeartbeatServer metaHeartbeatServer;
+  protected MetaAsyncService asyncService;
+  protected MetaSyncService syncService;
+  protected MetaHeartbeatServer metaHeartbeatServer;
 
   public MetaClusterServer() throws QueryProcessException {
     super();
     metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this);
     coordinator = new Coordinator();
-    member = new MetaGroupMember(protocolFactory, thisNode, coordinator);
+    member = createMetaGroupMember();
     coordinator.setMetaGroupMember(member);
     asyncService = new MetaAsyncService(member);
     syncService = new MetaSyncService(member);
     MetaPuller.getInstance().init(member);
   }
 
+  protected MetaGroupMember createMetaGroupMember() throws QueryProcessException {
+    return new MetaGroupMember(protocolFactory, thisNode, coordinator);
+  }
+
   /**
    * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the
    * ClusterMonitor are also started.
@@ -150,7 +154,7 @@ public class MetaClusterServer extends RaftServer
    * @throws TTransportException if create the socket fails
    */
   @Override
-  TServerTransport getServerSocket() throws TTransportException {
+  protected TServerTransport getServerSocket() throws TTransportException {
     logger.info(
         "[{}] Cluster node will listen {}:{}",
         getServerClientName(),
@@ -167,17 +171,17 @@ public class MetaClusterServer extends RaftServer
   }
 
   @Override
-  String getClientThreadPrefix() {
+  protected String getClientThreadPrefix() {
     return "MetaClientThread-";
   }
 
   @Override
-  String getServerClientName() {
+  protected String getServerClientName() {
     return "MetaServerThread-";
   }
 
   @Override
-  TProcessor getProcessor() {
+  protected TProcessor getProcessor() {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       return new AsyncProcessor<>(this);
     } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 09956d2..405c6e2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -71,9 +71,9 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
   private TServerTransport socket;
   // RPC processing server
   private TServer poolServer;
-  Node thisNode;
+  protected Node thisNode;
 
-  TProtocolFactory protocolFactory =
+  protected TProtocolFactory protocolFactory =
       config.isRpcThriftCompressionEnabled()
           ? new TCompactProtocol.Factory()
           : new TBinaryProtocol.Factory();
@@ -92,7 +92,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
     thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
   }
 
-  RaftServer(Node thisNode) {
+  protected RaftServer(Node thisNode) {
     this.thisNode = thisNode;
   }
 
@@ -167,14 +167,14 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
    * @return An AsyncProcessor that contains the extended interfaces of a non-abstract subclass of
    *     RaftService (DataService or MetaService).
    */
-  abstract TProcessor getProcessor();
+  protected abstract TProcessor getProcessor();
 
   /**
    * @return A socket that will be used to establish a thrift server to listen to RPC requests.
    *     DataServer and MetaServer use different port, so this is to be determined.
    * @throws TTransportException
    */
-  abstract TServerTransport getServerSocket() throws TTransportException;
+  protected abstract TServerTransport getServerSocket() throws TTransportException;
 
   /**
    * Each thrift RPC request will be processed in a separate thread and this will return the name
@@ -183,7 +183,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
    *
    * @return name prefix of RPC processing threads.
    */
-  abstract String getClientThreadPrefix();
+  protected abstract String getClientThreadPrefix();
 
   /**
    * The thrift server will be run in a separate thread, and this will be its name. It help you
@@ -191,7 +191,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
    *
    * @return The name of the thread running the thrift server.
    */
-  abstract String getServerClientName();
+  protected abstract String getServerClientName();
 
   private TServer createAsyncServer() throws TTransportException {
     socket = getServerSocket();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index de0b83f..6d95e06 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -111,7 +111,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
       if (lastLogIdx == peer.getLastHeartBeatIndex()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peer.incInconsistentHeartbeatNum();
-        if (inconsistentNum >= 5) {
+        if (inconsistentNum >= 10000) {
           logger.info(
               "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
               memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
new file mode 100644
index 0000000..71903d4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.forwarder;
+
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndirectAppendHandler implements AsyncMethodCallback<Long> {
+
+  private static final Logger logger = LoggerFactory.getLogger(IndirectAppendHandler.class);
+  private Node receiver;
+  private AppendEntryRequest request;
+
+  public IndirectAppendHandler(Node receiver,
+      AppendEntryRequest request) {
+    this.receiver = receiver;
+    this.request = request;
+  }
+
+  @Override
+  public void onComplete(Long response) {
+    // ignore response from indirect appender
+  }
+
+  @Override
+  public void onError(Exception exception) {
+    logger.warn("Cannot send request {} to {}", request, receiver, exception);
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 0459fef..50a46bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 
+import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -152,7 +153,7 @@ public class HeartbeatThread implements Runnable {
     synchronized (nodes) {
       // avoid concurrent modification
       for (Node node : nodes) {
-        if (node.equals(localMember.getThisNode())) {
+        if (ClusterUtils.isNodeEquals(node, localMember.getThisNode())) {
           continue;
         }
         if (Thread.currentThread().isInterrupted()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index ed99c3d..e744fdb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@ -74,8 +74,8 @@ public class MetaHeartbeatServer extends HeartbeatServer {
     } else {
       return new TServerSocket(
           new InetSocketAddress(
-              config.getInternalIp(),
-              config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
+              metaClusterServer.getMember().getThisNode().getInternalIp(),
+              metaClusterServer.getMember().getThisNode().getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 002c4fe..2c5cf35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -40,7 +40,15 @@ public class MetaHeartbeatThread extends HeartbeatThread {
     request.setRequireIdentifier(!node.isSetNodeIdentifier());
     synchronized (localMetaMember.getIdConflictNodes()) {
       request.unsetRegenerateIdentifier();
-      if (localMetaMember.getIdConflictNodes().contains(node)) {
+      boolean hasIdConflict = false;
+      for (Node idConflictNode : localMetaMember.getIdConflictNodes()) {
+        if (idConflictNode.getInternalIp().equals(node.internalIp)
+            && idConflictNode.getMetaPort() == node.metaPort) {
+          hasIdConflict = true;
+          break;
+        }
+      }
+      if (hasIdConflict) {
         request.setRegenerateIdentifier(true);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index eb10d44..056a485 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import java.util.Comparator;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -1021,21 +1022,26 @@ public class MetaGroupMember extends RaftMember {
     ExecutorService pool = new ScheduledThreadPoolExecutor(getAllNodes().size() - 1);
     for (Node seedNode : getAllNodes()) {
       Node thisNode = getThisNode();
-      if (seedNode.equals(thisNode)) {
+      if (ClusterUtils.isNodeEquals(thisNode, seedNode)) {
         continue;
       }
       pool.submit(
           () -> {
-            CheckStatusResponse response = checkStatus(seedNode);
-            if (response != null) {
-              // check the response
-              ClusterUtils.examineCheckStatusResponse(
-                  response, consistentNum, inconsistentNum, seedNode);
-            } else {
-              logger.warn(
-                  "Start up exception. Cannot connect to node {}. Try again in next turn.",
-                  seedNode);
+            try {
+              CheckStatusResponse response = checkStatus(seedNode);
+              if (response != null) {
+                // check the response
+                ClusterUtils.examineCheckStatusResponse(
+                    response, consistentNum, inconsistentNum, seedNode);
+              } else {
+                logger.warn(
+                    "Start up exception. Cannot connect to node {}. Try again in next turn.",
+                    seedNode);
+              }
+            } catch (Exception e) {
+              logger.error("Start up exception:", e);
             }
+
           });
     }
     pool.shutdown();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f2187f9..daa3f4e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import java.util.Collections;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
 import org.apache.iotdb.cluster.client.sync.SyncClientPool;
@@ -39,6 +40,7 @@ import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -53,6 +55,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -77,7 +80,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
+import org.checkerframework.checker.units.qual.A;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,12 +112,14 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so
+ * on.
  */
 @SuppressWarnings("java:S3077") // reference volatile is enough
 public abstract class RaftMember {
+
   private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
-  public static final boolean USE_LOG_DISPATCHER = false;
+  public static boolean USE_LOG_DISPATCHER = false;
 
   private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
   private static final String MSG_FORWARD_ERROR =
@@ -132,19 +139,29 @@ public abstract class RaftMember {
    * on this may be woken.
    */
   private final Object waitLeaderCondition = new Object();
-  /** the lock is to make sure that only one thread can apply snapshot at the same time */
+  /**
+   * the lock is to make sure that only one thread can apply snapshot at the same time
+   */
   private final Object snapshotApplyLock = new Object();
 
   protected Node thisNode = ClusterConstant.EMPTY_NODE;
-  /** the nodes that belong to the same raft group as thisNode. */
+  /**
+   * the nodes that belong to the same raft group as thisNode.
+   */
   protected List<Node> allNodes;
 
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-  /** the name of the member, to distinguish several members in the logs. */
+  /**
+   * the name of the member, to distinguish several members in the logs.
+   */
   String name;
-  /** to choose nodes to send request of joining cluster randomly. */
+  /**
+   * to choose nodes to send request of joining cluster randomly.
+   */
   Random random = new Random();
-  /** when the node is a leader, this map is used to track log progress of each follower. */
+  /**
+   * when the node is a leader, this map is used to track log progress of each follower.
+   */
   Map<Node, Peer> peerMap;
   /**
    * the current term of the node, this object also works as lock of some transactions of the member
@@ -165,8 +182,10 @@ public abstract class RaftMember {
    * offline.
    */
   volatile long lastHeartbeatReceivedTime;
-  /** the raft logs are all stored and maintained in the log manager */
-  RaftLogManager logManager;
+  /**
+   * the raft logs are all stored and maintained in the log manager
+   */
+  protected RaftLogManager logManager;
   /**
    * the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
    * when this node is a leader, or start elections when this node is an elector.
@@ -183,7 +202,9 @@ public abstract class RaftMember {
    * member by comparing it with the current last log index.
    */
   long lastReportedLogIndex;
-  /** the thread pool that runs catch-up tasks */
+  /**
+   * the thread pool that runs catch-up tasks
+   */
   private ExecutorService catchUpService;
   /**
    * lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -219,20 +240,30 @@ public abstract class RaftMember {
    * one slow node.
    */
   private ExecutorService serialToParallelPool;
-  /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
+  /**
+   * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+   */
   private ExecutorService commitLogPool;
   /**
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
-   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
+   * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
+   * logs.
    */
   private LogDispatcher logDispatcher;
 
   /**
-   * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+   * localExecutor is used to directly execute plans like load configuration in the underlying
+   * IoTDB
    */
   protected PlanExecutor localExecutor;
 
-  protected RaftMember() {}
+  /**
+   * (logIndex, logTerm) -> append handler
+   */
+  protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers = new ConcurrentHashMap<>();
+
+  protected RaftMember() {
+  }
 
   protected RaftMember(
       String name,
@@ -527,10 +558,68 @@ public abstract class RaftMember {
     long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
     logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result);
 
+    if (!request.isFromLeader) {
+      appendAckLeader(request.leader, log, result);
+    }
+
     return result;
   }
 
-  /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+  private void appendAckLeader(Node leader, Log log, long response) {
+    AppendEntryAcknowledgement appendEntryAcknowledgement = new AppendEntryAcknowledgement();
+    appendEntryAcknowledgement.index = log.getCurrLogIndex();
+    appendEntryAcknowledgement.term = log.getCurrLogTerm();
+    appendEntryAcknowledgement.response = response;
+
+    Client syncClient = null;
+    try {
+      if (config.isUseAsyncServer()) {
+        GenericHandler<Void> handler = new GenericHandler<>(leader, null);
+        getAsyncClient(leader).acknowledgeAppendEntry(appendEntryAcknowledgement, handler);
+      } else {
+        syncClient = getSyncClient(leader);
+        syncClient.acknowledgeAppendEntry(appendEntryAcknowledgement);
+      }
+    } catch (TException e) {
+      logger.warn("Cannot send ack of {} to leader {}", log, leader, e);
+    } finally {
+      if (syncClient != null) {
+        ClientUtils.putBackSyncClient(syncClient);
+      }
+    }
+  }
+
+  public long appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers) throws UnknownLogTypeException {
+    long result = appendEntry(request);
+    appendLogThreadPool.submit(() -> sendLogToSubFollowers(request, subFollowers));
+    return result;
+  }
+
+  private void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
+    request.setIsFromLeader(false);
+    for (Node subFollower : subFollowers) {
+      Client syncClient = null;
+      try {
+        if (config.isUseAsyncServer()) {
+          getAsyncClient(subFollower).appendEntry(request, new IndirectAppendHandler(subFollower,
+              request));
+        } else {
+          syncClient = getSyncClient(subFollower);
+          syncClient.appendEntry(request);
+        }
+      } catch (TException e) {
+        logger.error("Cannot send {} to {}", request, subFollower, e);
+      } finally {
+        if (syncClient != null) {
+          ClientUtils.putBackSyncClient(syncClient);
+        }
+      }
+    }
+  }
+
+  /**
+   * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
+   */
   public long appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntriesRequest", name);
 
@@ -603,13 +692,17 @@ public abstract class RaftMember {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      Peer peer) {
+      Peer peer, List<Node> indirectReceivers) {
     AsyncClient client = getSendLogAsyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
           getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
       try {
-        client.appendEntry(request, handler);
+        if (indirectReceivers == null || indirectReceivers.isEmpty()) {
+          client.appendEntry(request, handler);
+        } else {
+          client.appendEntryIndirect(request, indirectReceivers, handler);
+        }
         logger.debug("{} sending a log to {}: {}", name, node, log);
       } catch (Exception e) {
         logger.warn("{} cannot append log to node {}", name, node, e);
@@ -680,16 +773,22 @@ public abstract class RaftMember {
     return lastCatchUpResponseTime;
   }
 
-  /** Sub-classes will add their own process of HeartBeatResponse in this method. */
-  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
+  /**
+   * Sub-classes will add their own process of HeartBeatResponse in this method.
+   */
+  public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
+  }
 
-  /** The actions performed when the node wins in an election (becoming a leader). */
-  public void onElectionWins() {}
+  /**
+   * The actions performed when the node wins in an election (becoming a leader).
+   */
+  public void onElectionWins() {
+  }
 
   /**
    * Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
-   * follower. If some of the required logs are removed, also send the snapshot. <br>
-   * notice that if a part of data is in the snapshot, then it is not in the logs.
+   * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
+   * a part of data is in the snapshot, then it is not in the logs.
    */
   public void catchUp(Node follower, long lastLogIdx) {
     // for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -744,7 +843,7 @@ public abstract class RaftMember {
    * @param plan a non-query plan.
    * @return A TSStatus indicating the execution result.
    */
-  abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
+  protected abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
 
   /**
    * according to the consistency configuration, decide whether to execute syncLeader or not and
@@ -775,7 +874,9 @@ public abstract class RaftMember {
     }
   }
 
-  /** call back after syncLeader */
+  /**
+   * call back after syncLeader
+   */
   public interface CheckConsistency {
 
     /**
@@ -784,7 +885,7 @@ public abstract class RaftMember {
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
      * @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
-     *     implements.
+     *                                   implements.
      */
     void postCheckConsistency(long leaderCommitId, long localAppliedId)
         throws CheckConsistencyException;
@@ -793,8 +894,7 @@ public abstract class RaftMember {
   public static class MidCheckConsistency implements CheckConsistency {
 
     /**
-     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
-     * CHECK_MID_CONSISTENCY_EXCEPTION
+     * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw CHECK_MID_CONSISTENCY_EXCEPTION
      *
      * @param leaderCommitId leader commit id
      * @param localAppliedId local applied id
@@ -806,7 +906,7 @@ public abstract class RaftMember {
       if (leaderCommitId == Long.MAX_VALUE
           || leaderCommitId == Long.MIN_VALUE
           || leaderCommitId - localAppliedId
-              > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+          > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
         throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
       }
     }
@@ -839,7 +939,7 @@ public abstract class RaftMember {
    * @param checkConsistency check after syncleader
    * @return true if the node has caught up, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *     value after timeout
+   *                                   value after timeout
    */
   public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
     if (character == NodeCharacter.LEADER) {
@@ -858,7 +958,9 @@ public abstract class RaftMember {
     return waitUntilCatchUp(checkConsistency);
   }
 
-  /** Wait until the leader of this node becomes known or time out. */
+  /**
+   * Wait until the leader of this node becomes known or time out.
+   */
   public void waitLeader() {
     long startTime = System.currentTimeMillis();
     while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -885,7 +987,7 @@ public abstract class RaftMember {
    *
    * @return true if this node has caught up before timeout, false otherwise
    * @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
-   *     value after timeout
+   *                                   value after timeout
    */
   protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
       throws CheckConsistencyException {
@@ -957,7 +1059,7 @@ public abstract class RaftMember {
    * call this method. Will commit the log locally and send it to followers
    *
    * @return OK if over half of the followers accept the log or null if the leadership is lost
-   *     during the appending
+   * during the appending
    */
   TSStatus processPlanLocally(PhysicalPlan plan) {
     if (USE_LOG_DISPATCHER) {
@@ -1156,7 +1258,9 @@ public abstract class RaftMember {
     return peerMap;
   }
 
-  /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
+  /**
+   * @return true if there is a log whose index is "index" and term is "term", false otherwise
+   */
   public boolean matchLog(long index, long term) {
     boolean matched = logManager.matchTerm(term, index);
     logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1171,15 +1275,18 @@ public abstract class RaftMember {
     return syncLock;
   }
 
-  /** Sub-classes will add their own process of HeartBeatRequest in this method. */
-  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
+  /**
+   * Sub-classes will add their own process of HeartBeatRequest in this method.
+   */
+  void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
+  }
 
   /**
    * Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
    * request is valid.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkElectorLogProgress(ElectionRequest electionRequest) {
 
@@ -1222,7 +1329,7 @@ public abstract class RaftMember {
    * lastLogIndex is smaller than the voter's Otherwise accept the election.
    *
    * @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
-   *     smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+   * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
    */
   long checkLogProgress(long lastLogIndex, long lastLogTerm) {
     long response;
@@ -1239,10 +1346,10 @@ public abstract class RaftMember {
   /**
    * Forward a non-query plan to a node using the default client.
    *
-   * @param plan a non-query plan
-   * @param node cannot be the local node
+   * @param plan   a non-query plan
+   * @param node   cannot be the local node
    * @param header must be set for data group communication, set to null for meta group
-   *     communication
+   *               communication
    * @return a TSStatus indicating if the forwarding is successful.
    */
   public TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) {
@@ -1273,7 +1380,7 @@ public abstract class RaftMember {
   /**
    * Forward a non-query plan to "receiver" using "client".
    *
-   * @param plan a non-query plan
+   * @param plan   a non-query plan
    * @param header to determine which DataGroupMember of "receiver" will process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
@@ -1354,7 +1461,7 @@ public abstract class RaftMember {
    * Get an asynchronous thrift client of the given node.
    *
    * @return an asynchronous thrift client or null if the caller tries to connect the local node or
-   *     the node cannot be reached.
+   * the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
     return getAsyncClient(node, asyncClientPool, true);
@@ -1541,7 +1648,7 @@ public abstract class RaftMember {
    * heartbeat timer.
    *
    * @param fromLeader true if the request is from a leader, false if the request is from an
-   *     elector.
+   *                   elector.
    */
   public void stepDown(long newTerm, boolean fromLeader) {
     synchronized (term) {
@@ -1572,7 +1679,9 @@ public abstract class RaftMember {
     this.thisNode = thisNode;
   }
 
-  /** @return the header of the data raft group or null if this is in a meta group. */
+  /**
+   * @return the header of the data raft group or null if this is in a meta group.
+   */
   public Node getHeader() {
     return null;
   }
@@ -1653,7 +1762,7 @@ public abstract class RaftMember {
    * success.
    *
    * @param requiredQuorum the number of votes needed to make the log valid, when requiredQuorum <=
-   *     0, half of the cluster size will be used.
+   *                       0, half of the cluster size will be used.
    * @return an AppendLogResult
    */
   private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
@@ -1722,7 +1831,6 @@ public abstract class RaftMember {
     return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
   }
 
-  /** Send "log" to "node". */
   public void sendLogToFollower(
       Log log,
       AtomicInteger voteCounter,
@@ -1730,6 +1838,21 @@ public abstract class RaftMember {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request) {
+    sendLogToFollower(log, voteCounter, node, leaderShipStale, newLeaderTerm, request,
+        Collections.emptyList());
+  }
+
+  /**
+   * Send "log" to "node".
+   */
+  public void sendLogToFollower(
+      Log log,
+      AtomicInteger voteCounter,
+      Node node,
+      AtomicBoolean leaderShipStale,
+      AtomicLong newLeaderTerm,
+      AppendEntryRequest request,
+      List<Node> indirectReceivers) {
     if (node.equals(thisNode)) {
       return;
     }
@@ -1750,9 +1873,9 @@ public abstract class RaftMember {
     }
 
     if (config.isUseAsyncServer()) {
-      sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
+      sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers);
     } else {
-      sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
+      sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers);
     }
   }
 
@@ -1791,14 +1914,20 @@ public abstract class RaftMember {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      Peer peer) {
+      Peer peer, List<Node> indirectReceivers) {
     Client client = getSyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
           getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
       try {
         logger.debug("{} sending a log to {}: {}", name, node, log);
-        long result = client.appendEntry(request);
+        long result;
+        if (indirectReceivers == null || indirectReceivers.isEmpty()) {
+          result = client.appendEntry(request);
+        } else {
+          result = client.appendEntryIndirect(request, indirectReceivers);
+        }
+
         handler.onComplete(result);
       } catch (TException e) {
         client.getInputProtocol().getTransport().close();
@@ -1843,9 +1972,9 @@ public abstract class RaftMember {
    * and append "log" to it. Otherwise report a log mismatch.
    *
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
-  private long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+  protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
     long resp = checkPrevLogIndex(prevLogIndex);
     if (resp != Response.RESPONSE_AGREE) {
       return resp;
@@ -1867,7 +1996,9 @@ public abstract class RaftMember {
     return resp;
   }
 
-  /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+  /**
+   * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+   */
   private boolean waitForPrevLog(long prevLogIndex) {
     long waitStart = System.currentTimeMillis();
     long alreadyWait = 0;
@@ -1893,7 +2024,7 @@ public abstract class RaftMember {
     return alreadyWait <= RaftServer.getWriteOperationTimeoutMS();
   }
 
-  private long checkPrevLogIndex(long prevLogIndex) {
+  protected long checkPrevLogIndex(long prevLogIndex) {
     long lastLogIndex = logManager.getLastLogIndex();
     long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
     if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
@@ -1913,7 +2044,7 @@ public abstract class RaftMember {
    *
    * @param logs append logs
    * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
    */
   private long appendEntries(
       long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
@@ -1987,4 +2118,25 @@ public abstract class RaftMember {
     TIME_OUT,
     LEADERSHIP_STALE
   }
+
+  /**
+   * Process the result from an indirect receiver of an entry.
+   * @param ack acknowledgement from an indirect receiver.
+   */
+  public void acknowledgeAppendLog(AppendEntryAcknowledgement ack) {
+    AppendNodeEntryHandler appendNodeEntryHandler = sentLogHandlers
+        .get(new Pair<>(ack.index, ack.term));
+    if (appendNodeEntryHandler != null) {
+      appendNodeEntryHandler.onComplete(ack.response);
+    }
+  }
+
+  public void registerAppendLogHandler(Pair<Long, Long> indexTerm,
+      AppendNodeEntryHandler appendNodeEntryHandler) {
+    sentLogHandlers.put(indexTerm, appendNodeEntryHandler);
+  }
+
+  public void removeAppendLogHandler(Pair<Long, Long> indexTerm) {
+    sentLogHandlers.remove(indexTerm);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 8673078..eecf26e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.cluster.server.service;
 
+import java.util.List;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -173,4 +175,21 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
       resultHandler.onError(e);
     }
   }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+      AsyncMethodCallback<Void> resultHandler) {
+    member.acknowledgeAppendLog(ack);
+    resultHandler.onComplete(null);
+  }
+
+  @Override
+  public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
+      AsyncMethodCallback<Long> resultHandler) {
+    try {
+      resultHandler.onComplete(member.appendEntryIndirect(request, subReceivers));
+    } catch (UnknownLogTypeException e) {
+      resultHandler.onError(e);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index ce200ab..181ad8a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -19,9 +19,16 @@
 
 package org.apache.iotdb.cluster.server.service;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.List;
 import org.apache.iotdb.cluster.exception.LeaderUnknownException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -31,30 +38,21 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
-import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
-import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-
 public abstract class BaseSyncService implements RaftService.Iface {
 
   private static final Logger logger = LoggerFactory.getLogger(BaseSyncService.class);
   RaftMember member;
   String name;
 
-  BaseSyncService(RaftMember member) {
+  protected BaseSyncService(RaftMember member) {
     this.member = member;
     this.name = member.getName();
   }
@@ -153,29 +151,25 @@ public abstract class BaseSyncService implements RaftService.Iface {
 
   @Override
   public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
-    if (member.getCharacter() != NodeCharacter.LEADER) {
-      // forward the plan to the leader
-      Client client = member.getSyncClient(member.getLeader());
-      if (client != null) {
-        TSStatus status;
-        try {
-          status = client.executeNonQueryPlan(request);
-        } catch (TException e) {
-          client.getInputProtocol().getTransport().close();
-          throw e;
-        } finally {
-          ClientUtils.putBackSyncClient(client);
-        }
-        return status;
-      } else {
-        return StatusUtils.NO_LEADER;
-      }
-    }
-
     try {
       return member.executeNonQueryPlan(request);
     } catch (Exception e) {
       throw new TException(e);
     }
   }
+
+  @Override
+  public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+    member.acknowledgeAppendLog(ack);
+  }
+
+  @Override
+  public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+      throws TException {
+    try {
+      return member.appendEntryIndirect(request, subReceivers);
+    } catch (UnknownLogTypeException e) {
+      throw new TException(e);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
index 255cb4b..0de64a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.utils;
 
+import java.util.Objects;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -27,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
 import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
index be87eb1..905019f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
@@ -58,7 +58,7 @@ public class ClusterNode extends Node {
         && this.dataPort == that.dataPort
         && this.metaPort == that.metaPort
         && this.clientPort == that.clientPort
-        && this.clientIp.equals(that.clientIp);
+        && Objects.equals(this.clientIp, that.clientIp);
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 4c64587..ef656fe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -175,6 +175,18 @@ public class ClusterUtils {
     }
   }
 
+  public static boolean isNodeEquals(Node node1, Node node2) {
+    if (node1 == node2) {
+      return true;
+    }
+    if (node1 == null || node2 == null) {
+      return false;
+    }
+    return Objects.equals(node1.internalIp, node2.internalIp) && Objects.equals(node1.metaPort,
+        node2.metaPort);
+  }
+
+
   private static boolean seedNodesContains(List<Node> seedNodeList, List<Node> subSeedNodeList) {
     // Because identifier is not compared here, List.contains() is not suitable
     if (subSeedNodeList == null) {
diff --git a/pom.xml b/pom.xml
index aed72ae..879a584 100644
--- a/pom.xml
+++ b/pom.xml
@@ -621,32 +621,32 @@
                         </excludes>
                     </configuration>
                 </plugin>
-                <plugin>
-                    <groupId>com.diffplug.spotless</groupId>
-                    <artifactId>spotless-maven-plugin</artifactId>
-                    <version>${spotless.version}</version>
-                    <configuration>
-                        <java>
-                            <googleJavaFormat>
-                                <version>1.7</version>
-                                <style>GOOGLE</style>
-                            </googleJavaFormat>
-                            <importOrder>
-                                <order>org.apache.iotdb,,javax,java,\#</order>
-                            </importOrder>
-                            <removeUnusedImports/>
-                        </java>
-                    </configuration>
-                    <executions>
-                        <execution>
-                            <id>spotless-check</id>
-                            <phase>validate</phase>
-                            <goals>
-                                <goal>check</goal>
-                            </goals>
-                        </execution>
-                    </executions>
-                </plugin>
+                <!--                <plugin>-->
+                <!--                    <groupId>com.diffplug.spotless</groupId>-->
+                <!--                    <artifactId>spotless-maven-plugin</artifactId>-->
+                <!--                    <version>${spotless.version}</version>-->
+                <!--                    <configuration>-->
+                <!--                        <java>-->
+                <!--                            <googleJavaFormat>-->
+                <!--                                <version>1.7</version>-->
+                <!--                                <style>GOOGLE</style>-->
+                <!--                            </googleJavaFormat>-->
+                <!--                            <importOrder>-->
+                <!--                                <order>org.apache.iotdb,,javax,java,\#</order>-->
+                <!--                            </importOrder>-->
+                <!--                            <removeUnusedImports/>-->
+                <!--                        </java>-->
+                <!--                    </configuration>-->
+                <!--                    <executions>-->
+                <!--                        <execution>-->
+                <!--                            <id>spotless-check</id>-->
+                <!--                            <phase>validate</phase>-->
+                <!--                            <goals>-->
+                <!--                                <goal>check</goal>-->
+                <!--                            </goals>-->
+                <!--                        </execution>-->
+                <!--                    </executions>-->
+                <!--                </plugin>-->
             </plugins>
         </pluginManagement>
         <plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index d4d08c1..0ef0033 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -349,6 +349,8 @@ public class PlanExecutor implements IPlanExecutor {
         return createDeviceTemplate((CreateTemplatePlan) plan);
       case SET_DEVICE_TEMPLATE:
         return setDeviceTemplate((SetDeviceTemplatePlan) plan);
+      case EMPTY:
+        return true;
       default:
         throw new UnsupportedOperationException(
             String.format("operation %s is not supported", plan.getOperatorType()));
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index e1fb10f..fddda60 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -157,5 +157,6 @@ public abstract class Operator {
     CREATE_TEMPLATE,
     SET_DEVICE_TEMPLATE,
     SET_USING_DEVICE_TEMPLATE,
+    EMPTY,
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 4c5b1a1..457bd22 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
 import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
@@ -368,6 +369,9 @@ public abstract class PhysicalPlan {
         case AUTO_CREATE_DEVICE_MNODE:
           plan = new AutoCreateDeviceMNodePlan();
           break;
+        case EXPR:
+          plan = new ExprPlan();
+          break;
         default:
           throw new IOException("unrecognized log type " + type);
       }
@@ -422,7 +426,8 @@ public abstract class PhysicalPlan {
     CREATE_TRIGGER,
     DROP_TRIGGER,
     START_TRIGGER,
-    STOP_TRIGGER
+    STOP_TRIGGER,
+    EXPR
   }
 
   public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
new file mode 100644
index 0000000..2a1e16b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+public class ExprPlan extends PhysicalPlan {
+
+  private byte[] workload;
+  private boolean needForward;
+
+  public ExprPlan() {
+    super(false, OperatorType.EMPTY);
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return null;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.write((byte) PhysicalPlanType.EXPR.ordinal());
+    stream.writeInt(workload == null ? 0 : workload.length);
+    if (workload != null) {
+      stream.write(workload);
+    }
+    stream.write(needForward ? 1 : 0);
+  }
+
+  @Override
+  public void serialize(ByteBuffer buffer) {
+    buffer.put((byte) PhysicalPlanType.EXPR.ordinal());
+    buffer.putInt(workload == null ? 0 : workload.length);
+    if (workload != null) {
+      buffer.put(workload);
+    }
+    buffer.put(needForward ? (byte) 1 : 0);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+    int size = buffer.getInt();
+    workload = new byte[size];
+    buffer.get(workload);
+    needForward = buffer.get() == 1;
+  }
+
+  public void setWorkload(byte[] workload) {
+    this.workload = workload;
+  }
+
+  public boolean isNeedForward() {
+    return needForward;
+  }
+
+  public void setNeedForward(boolean needForward) {
+    this.needForward = needForward;
+  }
+}
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index f23130e..449e843 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -90,6 +90,16 @@ struct AppendEntryRequest {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   7: optional Node header
+  // true if the request is sent from the leader, and the reiceiver just responds to the sender;
+  // otherwise, the reiceiver should also notify the leader
+  8: optional bool isFromLeader
+}
+
+
+struct AppendEntryAcknowledgement {
+  1: required long term
+  2: required long index
+  3: required long response
 }
 
 // leader -> follower
@@ -306,6 +316,11 @@ service RaftService {
   **/
   long appendEntry(1:AppendEntryRequest request)
 
+  /**
+  * Like appendEntry, but the receiver should forward the request to nodes in subReceivers.
+  **/
+  long appendEntryIndirect(1:AppendEntryRequest request, 2:list<Node> subReceivers)
+
   void sendSnapshot(1:SendSnapshotRequest request)
 
   /**
@@ -337,6 +352,12 @@ service RaftService {
   * interface to notify the leader to remove the associated hardlink.
   **/
   void removeHardLink(1: string hardLinkPath)
+
+  /**
+  * when a follower reiceives an AppendEntryRequest from a non-leader node, it sends this ack to
+  * the leader so the leader can know whether it has successfully received the entry
+  **/
+  void acknowledgeAppendEntry(1: AppendEntryAcknowledgement ack)
 }