You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/03/11 03:40:02 UTC

[iotdb] branch expr_plus updated: support appendEntriesIndirect

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_plus
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_plus by this push:
     new 991b1b2  support appendEntriesIndirect
991b1b2 is described below

commit 991b1b2fe98824b00bc6373cb5ef7d0982af8d2c
Author: jt <jt...@163.com>
AuthorDate: Fri Mar 11 11:39:15 2022 +0800

    support appendEntriesIndirect
---
 cluster/distribute-dc.sh                           | 10 +++
 cluster/src/assembly/resources/sbin/expr-bench.sh  | 89 ++++++++++++++++++++++
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     | 11 ++-
 .../apache/iotdb/cluster/client/ClientManager.java | 11 +--
 .../iotdb/cluster/client/ClientPoolFactory.java    |  1 +
 .../iotdb/cluster/config/ClusterConstant.java      |  2 +-
 .../iotdb/cluster/coordinator/Coordinator.java     |  5 +-
 .../org/apache/iotdb/cluster/expr/ExprBench.java   | 35 ++++++++-
 .../iotdb/cluster/log/IndirectLogDispatcher.java   | 23 +++---
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  2 +-
 .../org/apache/iotdb/cluster/log/LogRelay.java     | 41 ++++++++--
 .../apache/iotdb/cluster/log/VotingLogList.java    | 13 +++-
 .../cluster/log/manage/CommittedEntryManager.java  |  4 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 17 ++---
 .../log/manage/UnCommittedEntryManager.java        |  4 +
 .../iotdb/cluster/query/ClusterPlanRouter.java     | 28 ++++++-
 .../handlers/caller/AppendNodeEntryHandler.java    | 13 ++++
 .../server/handlers/caller/HeartbeatHandler.java   |  2 +-
 .../cluster/server/member/DataGroupMember.java     |  4 +-
 .../cluster/server/member/MetaGroupMember.java     |  7 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 82 +++++++++++---------
 .../apache/iotdb/cluster/server/monitor/Timer.java |  3 +
 .../cluster/server/service/BaseAsyncService.java   | 12 ---
 .../cluster/server/service/BaseSyncService.java    | 10 ---
 .../cluster/server/service/MetaAsyncService.java   | 12 ---
 .../cluster/server/service/MetaSyncService.java    | 10 ---
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |  4 +
 .../apache/iotdb/cluster/utils/PartitionUtils.java |  5 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 ++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  4 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  1 +
 .../apache/iotdb/db/qp/physical/sys/DummyPlan.java | 29 ++++++-
 thrift-cluster/src/main/thrift/cluster.thrift      | 10 +--
 33 files changed, 368 insertions(+), 149 deletions(-)

diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
new file mode 100644
index 0000000..1eba6f4
--- /dev/null
+++ b/cluster/distribute-dc.sh
@@ -0,0 +1,10 @@
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+
+ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
+target_lib_path=/home/jt/iotdb_expr/lib
+
+for ip in ${ips[*]}
+  do
+    ssh jt@$ip "mkdir $target_lib_path"
+    scp -r $src_lib_path jt@$ip:$target_lib_path
+  done
\ No newline at end of file
diff --git a/cluster/src/assembly/resources/sbin/expr-bench.sh b/cluster/src/assembly/resources/sbin/expr-bench.sh
new file mode 100644
index 0000000..00c9411
--- /dev/null
+++ b/cluster/src/assembly/resources/sbin/expr-bench.sh
@@ -0,0 +1,89 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting IoTDB (Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+  export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+enable_printgc=false
+if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+  enable_printgc=true;
+  shift
+fi
+
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+    if [ $enable_printgc == "true" ]; then
+      . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+    else
+        . "$IOTDB_CONF/iotdb-env.sh"
+    fi
+elif [ -f "${IOTDB_HOME}/conf/iotdb-env.sh" ]; then
+    if [ $enable_printgc == "true" ]; then
+      . "${IOTDB_HOME}/conf/iotdb-env.sh" "printgc"
+    else
+      . "${IOTDB_HOME}/conf/iotdb-env.sh"
+    fi
+else
+    echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+    for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+        if [ -x "$java" ]; then
+            JAVA="$java"
+            break
+        fi
+    done
+else
+    JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+    echo Unable to find java executable. Check JAVA_HOME and PATH environment variables.  > /dev/stderr
+    exit 1;
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.cluster.expr.ExprBench
+
+launch_service()
+{
+    iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+    iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+    iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+    iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+    iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+    iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+    exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@
+    return $?
+}
+
+# Start up the service
+launch_service "$classname" $@
+
+exit $?
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 10b6df8..36c40d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -288,11 +288,14 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
   }
 
   private String clusterConfigCheck() {
-    try {
-      ClusterDescriptor.getInstance().replaceHostnameWithIp();
-    } catch (Exception e) {
-      return String.format("replace hostname with ip failed, %s", e.getMessage());
+    if (IoTDBDescriptor.getInstance().getConfig().isReplaceHostNameWithIp()) {
+      try {
+        ClusterDescriptor.getInstance().replaceHostnameWithIp();
+      } catch (Exception e) {
+        return String.format("replace hostname with ip failed, %s", e.getMessage());
+      }
     }
+
     ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
     // check the initial replicateNum and refuse to start when the replicateNum <= 0
     if (config.getReplicationNum() <= 0) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
index adc1574..bf918d6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
@@ -50,8 +50,6 @@ public class ClientManager implements IClientManager {
   private Map<ClientCategory, GenericKeyedObjectPool<Node, Client>> syncClientPoolMap;
   private ClientPoolFactory clientPoolFactory;
 
-  private Exception createStack;
-
   /**
    * {@link ClientManager.Type#RequestForwardClient} represents the clients used to forward external
    * client requests to proper node to handle such as query, insert request.
@@ -78,8 +76,6 @@ public class ClientManager implements IClientManager {
       syncClientPoolMap = Maps.newHashMap();
       constructSyncClientMap(type);
     }
-
-    this.createStack = new Exception();
   }
 
   private void constructAsyncClientMap(Type type) {
@@ -164,7 +160,7 @@ public class ClientManager implements IClientManager {
           "BorrowSyncClient invoke on unsupported mode or category: Node:{}, ClientCategory:{}, "
               + "isSyncMode:{}",
           node,
-          clientPoolFactory,
+          category,
           syncClientPoolMap != null);
     }
     return client;
@@ -200,7 +196,7 @@ public class ClientManager implements IClientManager {
           "BorrowSyncClient invoke on unsupported mode or category: Node:{}, ClientCategory:{}, "
               + "isSyncMode:{}",
           node,
-          clientPoolFactory,
+          category,
           syncClientPoolMap != null);
     }
     return client;
@@ -231,9 +227,6 @@ public class ClientManager implements IClientManager {
 
   @Override
   public void close() {
-    if (false) {
-      return;
-    }
     if (asyncClientPoolMap != null) {
       for (GenericKeyedObjectPool<Node, AsyncClient> value : asyncClientPoolMap.values()) {
         value.close();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
index 0887992..89b8307 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
@@ -54,6 +54,7 @@ public class ClientPoolFactory {
             : new TBinaryProtocol.Factory();
     poolConfig = new GenericKeyedObjectPoolConfig();
     poolConfig.setMaxTotalPerKey(maxConnectionForEachNode);
+    poolConfig.setMaxIdlePerKey(maxConnectionForEachNode);
     poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
     poolConfig.setTestOnReturn(true);
     poolConfig.setTestOnBorrow(true);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index cb892ae..85ba9ff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@ -53,7 +53,7 @@ public class ClusterConstant {
    * every "REPORT_INTERVAL_SEC" seconds, a reporter thread will print the status of all raft
    * members in this node.
    */
-  public static final int REPORT_INTERVAL_SEC = 10;
+  public static final int REPORT_INTERVAL_SEC = 2;
 
   /**
    * during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 098b127..9d16670 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.ChangeMembershipException;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.metadata.CMManager;
@@ -424,10 +423,10 @@ public class Coordinator {
       metaGroupMember.syncLeaderWithConsistencyCheck(true);
       try {
         planGroupMap = router.splitAndRoutePlan(plan);
-      } catch (MetadataException | UnknownLogTypeException ex) {
+      } catch (MetadataException ex) {
         // ignore
       }
-    } catch (MetadataException | UnknownLogTypeException e) {
+    } catch (MetadataException e) {
       logger.error("Cannot route plan {}", plan, e);
     }
     logger.debug("route plan {} with partitionGroup {}", plan, planGroupMap);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 3a658d5..7673e11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -26,12 +26,16 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 
 import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
@@ -48,6 +52,8 @@ public class ExprBench {
   private Node target;
   private int maxRequestNum;
   private ExecutorService pool = Executors.newCachedThreadPool();
+  private List<Node> nodeList = new ArrayList<>();
+  private int raftFactor = 1;
 
   public ExprBench(Node target) {
     this.target = target;
@@ -57,8 +63,10 @@ public class ExprBench {
   public void benchmark() {
     long startTime = System.currentTimeMillis();
     for (int i = 0; i < threadNum; i++) {
+      int finalI = i;
       pool.submit(
           () -> {
+            Random random = new Random(123456L + finalI);
             Client client = null;
             try {
               client = clientPool.borrowSyncClient(target, ClientCategory.META);
@@ -69,13 +77,23 @@ public class ExprBench {
             DummyPlan plan = new DummyPlan();
             plan.setWorkload(new byte[workloadSize]);
             plan.setNeedForward(true);
+
             ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
-            plan.serialize(byteBuffer);
-            byteBuffer.flip();
-            request.setPlanBytes(byteBuffer);
+
             long currRequsetNum = -1;
             while (true) {
 
+              if (raftFactor > 0) {
+                Node node = nodeList.get(random.nextInt(nodeList.size()));
+                int raftId = random.nextInt(raftFactor);
+                plan.setGroupIdentifier(ClusterUtils.nodeToString(node) + "#" + raftId);
+              }
+              byteBuffer.clear();
+              plan.serialize(byteBuffer);
+              byteBuffer.flip();
+              request.planBytes = byteBuffer;
+              request.setPlanBytesIsSet(true);
+
               long reqLatency = System.nanoTime();
               try {
                 client.executeNonQueryPlan(request);
@@ -108,6 +126,7 @@ public class ExprBench {
             }
           });
     }
+    pool.shutdown();
   }
 
   public void setMaxRequestNum(int maxRequestNum) {
@@ -124,6 +143,16 @@ public class ExprBench {
     bench.threadNum = Integer.parseInt(args[3]);
     bench.workloadSize = Integer.parseInt(args[4]) * 1024;
     bench.printInterval = Integer.parseInt(args[5]);
+    String[] nodesSplit = args[6].split(",");
+    for (String s : nodesSplit) {
+      String[] nodeSplit = s.split(":");
+      Node node = new Node();
+      node.setInternalIp(nodeSplit[0]);
+      node.setMetaPort(Integer.parseInt(nodeSplit[1]));
+      bench.nodeList.add(node);
+    }
+    bench.raftFactor = Integer.parseInt(args[7]);
+
     bench.benchmark();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
index cc05753..9c878df 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.cluster.log;
 
+import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -100,18 +102,15 @@ public class IndirectLogDispatcher extends LogDispatcher {
 
     @Override
     void sendLog(SendLogRequest logRequest) {
-      Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
-          logRequest.getVotingLog().getLog().getCreateTime());
-      member.sendLogToFollower(
-          logRequest.getVotingLog(),
-          receiver,
-          logRequest.getLeaderShipStale(),
-          logRequest.getNewLeaderTerm(),
-          logRequest.getAppendEntryRequest(),
-          logRequest.getQuorumSize(),
-          directToIndirectFollowerMap.get(receiver));
-      Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENT.calOperationCostTimeFromStart(
-          logRequest.getVotingLog().getLog().getCreateTime());
+      logRequest.getAppendEntryRequest().setSubReceivers(directToIndirectFollowerMap.get(receiver));
+      super.sendLog(logRequest);
+    }
+
+    @Override
+    protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList,
+        List<SendLogRequest> currBatch, int firstIndex) {
+      return super.prepareRequest(logList, currBatch, firstIndex)
+          .setSubReceivers(directToIndirectFollowerMap.get(receiver));
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ca6e8ae..2410b2f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -362,7 +362,7 @@ public class LogDispatcher {
       }
     }
 
-    private AppendEntriesRequest prepareRequest(
+    protected AppendEntriesRequest prepareRequest(
         List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
       AppendEntriesRequest request = new AppendEntriesRequest();
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
index 3ed91f5..91339c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogRelay.java
@@ -20,9 +20,11 @@
 package org.apache.iotdb.cluster.log;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -59,6 +61,10 @@ public class LogRelay {
   }
 
   public void offer(AppendEntryRequest request, List<Node> receivers) {
+    offer(new RelayEntry(request, receivers));
+  }
+
+  private void offer(RelayEntry entry) {
     synchronized (entryHeap) {
       while (entryHeap.size()
           > ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem()) {
@@ -68,11 +74,15 @@ public class LogRelay {
           Thread.currentThread().interrupt();
         }
       }
-      entryHeap.add(new RelayEntry(request, receivers));
+      entryHeap.add(entry);
       entryHeap.notifyAll();
     }
   }
 
+  public void offer(AppendEntriesRequest request, List<Node> receivers) {
+    offer(new RelayEntry(request, receivers));
+  }
+
   private class RelayThread implements Runnable {
 
     @Override
@@ -92,21 +102,37 @@ public class LogRelay {
           }
         }
 
-        raftMember.sendLogToSubFollowers(relayEntry.request, relayEntry.receivers);
+        raftMember.sendLogToSubFollowers(relayEntry.singleRequest, relayEntry.receivers);
+        Statistic.RAFT_SEND_RELAY.add(1);
       }
     }
   }
 
   public static class RelayEntry implements Comparable<RelayEntry> {
 
-    private AppendEntryRequest request;
+    private AppendEntryRequest singleRequest;
+    private AppendEntriesRequest batchRequest;
     private List<Node> receivers;
 
     public RelayEntry(AppendEntryRequest request, List<Node> receivers) {
-      this.request = request;
+      this.singleRequest = request;
+      this.receivers = receivers;
+    }
+
+    public RelayEntry(AppendEntriesRequest request, List<Node> receivers) {
+      this.batchRequest = request;
       this.receivers = receivers;
     }
 
+    public long getIndex() {
+      if (singleRequest != null) {
+        return singleRequest.prevLogIndex;
+      } else if (batchRequest != null) {
+        return batchRequest.prevLogIndex;
+      }
+      return 0;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
@@ -116,17 +142,18 @@ public class LogRelay {
         return false;
       }
       RelayEntry that = (RelayEntry) o;
-      return Objects.equals(request, that.request);
+      return Objects.equals(singleRequest, that.singleRequest) && Objects.equals(batchRequest,
+          that.batchRequest);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(request);
+      return Objects.hash(singleRequest);
     }
 
     @Override
     public int compareTo(RelayEntry o) {
-      return Long.compare(this.request.prevLogIndex, o.request.prevLogIndex);
+      return Long.compare(this.getIndex(), o.getIndex());
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
index f5cce6c..59bc40b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.cluster.log;
 
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.tsfile.utils.Pair;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -27,9 +31,11 @@ public class VotingLogList {
   private List<VotingLog> logList = new ArrayList<>();
   private volatile long currTerm = -1;
   private int quorumSize;
+  private RaftMember member;
 
-  public VotingLogList(int quorumSize) {
+  public VotingLogList(int quorumSize, RaftMember member) {
     this.quorumSize = quorumSize;
+    this.member = member;
   }
 
   /**
@@ -92,6 +98,11 @@ public class VotingLogList {
           acceptedLog.acceptedTime.set(System.nanoTime());
           acceptedLog.notifyAll();
         }
+        if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+          member.removeAppendLogHandler(
+              new Pair<>(
+                  acceptedLog.getLog().getCurrLogIndex(), acceptedLog.getLog().getCurrLogTerm()));
+        }
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index 77be5b3..85b618d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -79,8 +79,8 @@ public class CommittedEntryManager {
       logger.info("requested snapshot is older than the existing snapshot");
       return;
     }
-    entries.clear();
-    entries.add(new EmptyContentLog(snapshot.getLastLogIndex(), snapshot.getLastLogTerm()));
+    entries.subList(1, entries.size()).clear();
+    entries.set(0, new EmptyContentLog(snapshot.getLastLogIndex(), snapshot.getLastLogTerm()));
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index b75f1e7..4b23092 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -325,7 +325,7 @@ public abstract class RaftLogManager {
   public long getCommitLogTerm() {
     long term = -1;
     try {
-      term = getTerm(getCommitLogIndex());
+      term = getCommittedEntryManager().getLastTerm();
     } catch (Exception e) {
       logger.error("{}: unexpected error when getting the last term : {}", name, e.getMessage());
     }
@@ -519,19 +519,18 @@ public abstract class RaftLogManager {
         name,
         snapshot.getLastLogIndex(),
         snapshot.getLastLogTerm());
-    try {
-      getCommittedEntryManager().compactEntries(snapshot.getLastLogIndex());
-      getStableEntryManager().removeCompactedEntries(snapshot.getLastLogIndex());
-    } catch (EntryUnavailableException e) {
-      getCommittedEntryManager().applyingSnapshot(snapshot);
-      getUnCommittedEntryManager().applyingSnapshot(snapshot);
-    }
+
+    getUnCommittedEntryManager().applyingSnapshot(snapshot);
+    getCommittedEntryManager().applyingSnapshot(snapshot);
+
     if (this.commitIndex < snapshot.getLastLogIndex()) {
       this.commitIndex = snapshot.getLastLogIndex();
     }
 
     // as the follower receives a snapshot, the logs persisted is not complete, so remove them
-    getStableEntryManager().clearAllLogs(commitIndex);
+    if (ClusterDescriptor.getInstance().getConfig().isEnableRaftLogPersistence()) {
+      getStableEntryManager().clearAllLogs(commitIndex);
+    }
 
     synchronized (changeApplyCommitIndexCond) {
       if (this.maxHaveAppliedCommitIndex < snapshot.getLastLogIndex()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index 80bf2f8..1c63bff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -288,4 +288,8 @@ public class UnCommittedEntryManager {
   List<Log> getAllEntries() {
     return entries;
   }
+
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 8b1ad59..5c56677 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.query;
 
-import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
@@ -27,6 +26,8 @@ import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -42,6 +43,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
 import org.apache.iotdb.db.service.IoTDB;
@@ -117,7 +119,7 @@ public class ClusterPlanRouter {
   }
 
   public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan)
-      throws UnsupportedPlanException, MetadataException, UnknownLogTypeException {
+      throws UnsupportedPlanException, MetadataException {
     if (plan instanceof InsertRowsPlan) {
       return splitAndRoutePlan((InsertRowsPlan) plan);
     } else if (plan instanceof InsertTabletPlan) {
@@ -136,6 +138,8 @@ public class ClusterPlanRouter {
       return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
     } else if (plan instanceof CreateMultiTimeSeriesPlan) {
       return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan);
+    } else if (plan instanceof DummyPlan) {
+      return splitAndRoutePlan((DummyPlan) plan);
     }
     // the if clause can be removed after the program is stable
     if (PartitionUtils.isLocalNonQueryPlan(plan)) {
@@ -172,6 +176,26 @@ public class ClusterPlanRouter {
     return result;
   }
 
+  private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(DummyPlan plan) {
+    List<Node> allNodes = partitionTable.getAllNodes();
+    String groupIdentifier = plan.getGroupIdentifier();
+    String[] split = groupIdentifier.split(DummyPlan.GROUP_ID_SEPARATOR);
+    Node node = ClusterUtils.parseNode(split[0]);
+    int raftId = Integer.parseInt(split[1]);
+    Node innerNode = null;
+    for (Node n : allNodes) {
+      if (ClusterUtils.isNodeEquals(node, n)) {
+        innerNode = n;
+        break;
+      }
+    }
+    if (innerNode == null) {
+      return null;
+    }
+    RaftNode raftNode = new RaftNode(innerNode, raftId);
+    return Collections.singletonMap(plan, partitionTable.getPartitionGroup(raftNode));
+  }
+
   private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowPlan plan)
       throws MetadataException {
     PartitionGroup partitionGroup =
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 9cae7e8..7e703a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -113,12 +114,20 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       synchronized (log) {
         log.notifyAll();
       }
+      if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+        member.removeAppendLogHandler(
+            new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
+      }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
       synchronized (log) {
         log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
         if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
             >= quorumSize) {
           log.acceptedTime.set(System.nanoTime());
+          if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+            member.removeAppendLogHandler(
+                new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
+          }
         }
         log.notifyAll();
       }
@@ -160,6 +169,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
         // quorum members have failed, there is no need to wait for others
         log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
         log.notifyAll();
+        if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+          member.removeAppendLogHandler(
+              new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()));
+        }
       }
     }
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 6eb735d..af4fbc5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -119,7 +119,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
       if (lastLogIdx == peer.getLastHeartBeatIndex()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peer.incInconsistentHeartbeatNum();
-        if (inconsistentNum >= 5) {
+        if (inconsistentNum >= 10) {
           logger.info(
               "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
               memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 2a80781..c43401a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -406,7 +406,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
 
         if (removedNode.equals(leader.get()) && !removedNode.equals(thisNode)) {
           // if the leader is removed, also start an election immediately
-          synchronized (term) {
+          synchronized (logManager) {
             setCharacter(NodeCharacter.ELECTOR);
             setLeader(null);
           }
@@ -962,7 +962,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
         peerMap.remove(removedNode);
         if (removedNode.equals(leader.get())) {
           // if the leader is removed, also start an election immediately
-          synchronized (term) {
+          synchronized (logManager) {
             setCharacter(NodeCharacter.ELECTOR);
             setLeader(null);
           }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 841b32b..e07452c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1317,8 +1317,9 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
       result = processNonPartitionedMetaPlan(plan);
     } else {
       // do nothing
-      logger.warn("receive a plan {} could not be processed in local", plan);
-      result = StatusUtils.UNSUPPORTED_OPERATION;
+      //      logger.warn("receive a plan {} could not be processed in local", plan);
+      //      result = StatusUtils.UNSUPPORTED_OPERATION;
+      result = coordinator.executeNonQueryPlan(plan);
     }
     Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
     return result;
@@ -1708,7 +1709,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
 
       // the leader is removed, start the next election ASAP
       if (oldNode.equals(leader.get()) && !oldNode.equals(thisNode)) {
-        synchronized (term) {
+        synchronized (logManager) {
           setCharacter(NodeCharacter.ELECTOR);
           setLeader(null);
         }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index a28f220..0c227c5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import java.nio.Buffer;
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.ClientManager;
@@ -96,6 +97,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -436,7 +438,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    */
   public HeartBeatResponse processHeartbeatRequest(HeartBeatRequest request) {
     logger.trace("{} received a heartbeat", name);
-    synchronized (term) {
+    synchronized (logManager) {
       long thisTerm = term.get();
       long leaderTerm = request.getTerm();
       HeartBeatResponse response = new HeartBeatResponse();
@@ -520,7 +522,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       logger.debug(
           "{}: start to handle request from elector {}", name, electionRequest.getElector());
     }
-    synchronized (term) {
+    synchronized (logManager) {
       long currentTerm = term.get();
       long response =
           checkElectorTerm(currentTerm, electionRequest.getTerm(), electionRequest.getElector());
@@ -576,6 +578,15 @@ public abstract class RaftMember implements RaftMemberMBean {
    * finally see if we can find a position to append the log.
    */
   public AppendEntryResult appendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
+    AppendEntryResult result = appendEntryInternal(request);
+    if (request.isSetSubReceivers()) {
+      request.entry.rewind();
+      logRelay.offer(request, request.subReceivers);
+    }
+    return result;
+  }
+
+  private AppendEntryResult appendEntryInternal(AppendEntryRequest request) throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntryRequest: {}", name, request);
     // the term checked here is that of the leader, not that of the log
     long checkResult = checkRequestTerm(request.term, request.leader);
@@ -598,6 +609,7 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     if (!request.isFromLeader) {
       appendAckLeader(request.leader, log, result.status);
+      Statistic.RAFT_SEND_RELAY_ACK.add(1);
     }
 
     return result;
@@ -661,6 +673,17 @@ public abstract class RaftMember implements RaftMemberMBean {
   /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
   public AppendEntryResult appendEntries(AppendEntriesRequest request)
       throws UnknownLogTypeException {
+    AppendEntryResult result = appendEntriesInternal(request);
+    if (request.isSetSubReceivers()) {
+      request.entries.forEach(Buffer::rewind);
+      logRelay.offer(request, request.subReceivers);
+    }
+    return result;
+  }
+
+  /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+  private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
+      throws UnknownLogTypeException {
     logger.debug("{} received an AppendEntriesRequest", name);
 
     // the term checked here is that of the leader, not that of the log
@@ -716,18 +739,13 @@ public abstract class RaftMember implements RaftMemberMBean {
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
       Peer peer,
-      int quorumSize,
-      List<Node> indirectReceivers) {
+      int quorumSize) {
     AsyncClient client = getSendLogAsyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
           getAppendNodeEntryHandler(log, node, leaderShipStale, newLeaderTerm, peer, quorumSize);
       try {
-        if (indirectReceivers == null || indirectReceivers.isEmpty()) {
-          client.appendEntry(request, handler);
-        } else {
-          client.appendEntryIndirect(request, indirectReceivers, handler);
-        }
+        client.appendEntry(request, handler);
         logger.debug("{} sending a log to {}: {}", name, node, log);
       } catch (Exception e) {
         logger.warn("{} cannot append log to node {}", name, node, e);
@@ -796,7 +814,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       return;
     }
 
-    this.votingLogList = new VotingLogList(allNodes.size() / 2);
+    this.votingLogList = new VotingLogList(allNodes.size() / 2, this);
 
     // update the reference of thisNode to keep consistency
     boolean foundThisNode = false;
@@ -1686,7 +1704,8 @@ public abstract class RaftMember implements RaftMemberMBean {
       return false;
     }
     PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log;
-    return physicalPlanLog.getPlan() instanceof InsertPlan;
+    return physicalPlanLog.getPlan() instanceof InsertPlan
+        || physicalPlanLog.getPlan() instanceof DummyPlan;
   }
 
   /**
@@ -1904,7 +1923,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    *     elector.
    */
   public void stepDown(long newTerm, boolean fromLeader) {
-    synchronized (term) {
+    synchronized (logManager) {
       long currTerm = term.get();
       // confirm that the heartbeat of the new leader hasn't come
       if (currTerm < newTerm) {
@@ -1922,6 +1941,10 @@ public abstract class RaftMember implements RaftMemberMBean {
         setCharacter(NodeCharacter.FOLLOWER);
         lastHeartbeatReceivedTime = System.currentTimeMillis();
       }
+
+      if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+        sentLogHandlers.clear();
+      }
     }
   }
 
@@ -2093,17 +2116,6 @@ public abstract class RaftMember implements RaftMemberMBean {
     return waitAppendResult(log, leaderShipStale, newLeaderTerm, quorumSize);
   }
 
-  public void sendLogToFollower(
-      VotingLog log,
-      Node node,
-      AtomicBoolean leaderShipStale,
-      AtomicLong newLeaderTerm,
-      AppendEntryRequest request,
-      int quorumSize) {
-    sendLogToFollower(
-        log, node, leaderShipStale, newLeaderTerm, request, quorumSize, Collections.emptyList());
-  }
-
   /** Send "log" to "node". */
   public void sendLogToFollower(
       VotingLog log,
@@ -2111,8 +2123,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       AtomicBoolean leaderShipStale,
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
-      int quorumSize,
-      List<Node> indirectReceivers) {
+      int quorumSize) {
     if (node.equals(thisNode)) {
       return;
     }
@@ -2134,10 +2145,10 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     if (config.isUseAsyncServer()) {
       sendLogAsync(
-          log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize, indirectReceivers);
+          log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
     } else {
       sendLogSync(
-          log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize, indirectReceivers);
+          log, node, leaderShipStale, newLeaderTerm, request, peer, quorumSize);
     }
   }
 
@@ -2176,8 +2187,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       AtomicLong newLeaderTerm,
       AppendEntryRequest request,
       Peer peer,
-      int quorumSize,
-      List<Node> indirectReceivers) {
+      int quorumSize) {
     Client client = getSyncClient(node);
     if (client != null) {
       AppendNodeEntryHandler handler =
@@ -2185,12 +2195,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       try {
         logger.debug("{} sending a log to {}: {}", name, node, log);
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
-        AppendEntryResult result;
-        if (indirectReceivers == null || indirectReceivers.isEmpty()) {
-          result = client.appendEntry(request);
-        } else {
-          result = client.appendEntryIndirect(request, indirectReceivers);
-        }
+        AppendEntryResult result = client.appendEntry(request);
         Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
 
         handler.onComplete(result);
@@ -2222,6 +2227,10 @@ public abstract class RaftMember implements RaftMemberMBean {
     handler.setPeer(peer);
     handler.setReceiverTerm(newLeaderTerm);
     handler.setQuorumSize(quorumSize);
+    if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) {
+      registerAppendLogHandler(
+          new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm()), handler);
+    }
     return handler;
   }
 
@@ -2243,7 +2252,7 @@ public abstract class RaftMember implements RaftMemberMBean {
   private long checkRequestTerm(long leaderTerm, Node leader) {
     long localTerm;
 
-    synchronized (term) {
+    synchronized (logManager) {
       // if the request comes before the heartbeat arrives, the local term may be smaller than the
       // leader term
       localTerm = term.get();
@@ -2288,6 +2297,7 @@ public abstract class RaftMember implements RaftMemberMBean {
         sentLogHandlers.get(new Pair<>(ack.lastLogIndex, ack.lastLogTerm));
     if (appendNodeEntryHandler != null) {
       appendNodeEntryHandler.onComplete(ack);
+      Statistic.RAFT_RECEIVE_RELAY_ACK.add(1);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 05f262c..4169935 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -284,6 +284,9 @@ public class Timer {
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
+    RAFT_SEND_RELAY(RAFT_MEMBER_RECEIVER, "send relay entries", 1, true, ROOT),
+    RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
+    RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
     RAFT_WAIT_AFTER_ACCEPTED(RAFT_MEMBER_SENDER, "wait after accepted", TIME_SCALE, true, ROOT),
     RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT);
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index b7987f6..a521bb3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -185,16 +185,4 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
     member.acknowledgeAppendLog(ack);
     resultHandler.onComplete(null);
   }
-
-  @Override
-  public void appendEntryIndirect(
-      AppendEntryRequest request,
-      List<Node> subReceivers,
-      AsyncMethodCallback<AppendEntryResult> resultHandler) {
-    try {
-      resultHandler.onComplete(member.appendEntryIndirect(request, subReceivers));
-    } catch (UnknownLogTypeException e) {
-      resultHandler.onError(e);
-    }
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index b812404..a8f2704 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -165,14 +165,4 @@ public abstract class BaseSyncService implements RaftService.Iface {
   public void acknowledgeAppendEntry(AppendEntryResult ack) {
     member.acknowledgeAppendLog(ack);
   }
-
-  @Override
-  public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
-      throws TException {
-    try {
-      return member.appendEntryIndirect(request, subReceivers);
-    } catch (UnknownLogTypeException e) {
-      throw new TException(e);
-    }
-  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 5ef57e8..32cba96 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -260,18 +260,6 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
   }
 
   @Override
-  public void appendEntryIndirect(
-      AppendEntryRequest request,
-      List<Node> subReceivers,
-      AsyncMethodCallback<AppendEntryResult> resultHandler) {
-    try {
-      resultHandler.onComplete(metaGroupMember.appendEntryIndirect(request, subReceivers));
-    } catch (UnknownLogTypeException e) {
-      resultHandler.onError(e);
-    }
-  }
-
-  @Override
   public void acknowledgeAppendEntry(
       AppendEntryResult ack, AsyncMethodCallback<Void> resultHandler) {
     metaGroupMember.acknowledgeAppendLog(ack);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 93a0ba6..aae1cb7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -255,16 +255,6 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
   }
 
   @Override
-  public AppendEntryResult appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
-      throws TException {
-    try {
-      return metaGroupMember.appendEntryIndirect(request, subReceivers);
-    } catch (UnknownLogTypeException e) {
-      throw new TException(e);
-    }
-  }
-
-  @Override
   public void acknowledgeAppendEntry(AppendEntryResult ack) {
     metaGroupMember.acknowledgeAppendLog(ack);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 49cfdbc..5b73575 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -384,4 +384,8 @@ public class ClusterUtils {
     ClusterNode clusterNode2 = new ClusterNode(node2);
     return clusterNode1.equals(clusterNode2);
   }
+
+  public static String nodeToString(Node node) {
+    return node.getInternalIp() + ":" + node.getMetaPort();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 0282b9d..2e0b000 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.qp.physical.sys.KillQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
@@ -103,7 +104,9 @@ public class PartitionUtils {
         || plan instanceof CreateFunctionPlan
         || plan instanceof DropFunctionPlan
         || plan instanceof CreateSnapshotPlan
-        || plan instanceof SetSystemModePlan;
+        || plan instanceof SetSystemModePlan
+        || (plan instanceof DummyPlan
+            && DummyPlan.META_GROUP_ID.equals(((DummyPlan) plan).getGroupIdentifier()));
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4842bb2..8f68b46 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -802,6 +802,11 @@ public class IoTDBConfig {
    */
   private boolean enableIDTableLogFile = false;
 
+  /**
+   * to avoid the situation that in some places the hostname is used while in other the ip is used.
+   */
+  private boolean replaceHostNameWithIp = false;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -2515,4 +2520,12 @@ public class IoTDBConfig {
   public void setEnableIDTableLogFile(boolean enableIDTableLogFile) {
     this.enableIDTableLogFile = enableIDTableLogFile;
   }
+
+  public boolean isReplaceHostNameWithIp() {
+    return replaceHostNameWithIp;
+  }
+
+  public void setReplaceHostNameWithIp(boolean replaceHostNameWithIp) {
+    this.replaceHostNameWithIp = replaceHostNameWithIp;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 849fddb..bb062a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -136,7 +136,9 @@ public class IoTDBDescriptor {
                   "enable_monitor_series_write", Boolean.toString(conf.isEnableStatMonitor()))));
 
       conf.setRpcAddress(properties.getProperty("rpc_address", conf.getRpcAddress()));
-      replaceHostnameWithIP();
+      if (conf.isReplaceHostNameWithIp()) {
+        replaceHostnameWithIP();
+      }
 
       conf.setRpcThriftCompressionEnable(
           Boolean.parseBoolean(
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 0b4770b..918a5a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -539,6 +539,7 @@ public abstract class PhysicalPlan {
     DROP_FUNCTION,
     SELECT_INTO,
     DUMMY,
+    DUMMY_CLUSTER,
     SET_SYSTEM_MODE,
     UNSET_TEMPLATE,
     APPEND_TEMPLATE,
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
index 42e701f..ff93076 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DummyPlan.java
@@ -32,8 +32,12 @@ import java.util.List;
 
 public class DummyPlan extends PhysicalPlan {
 
-  private byte[] workload;
-  private boolean needForward;
+  public static final String GROUP_ID_SEPARATOR = "#";
+  public static final String META_GROUP_ID = "meta";
+
+  protected byte[] workload;
+  protected boolean needForward;
+  protected String groupIdentifier = META_GROUP_ID;
 
   public DummyPlan() {
     super(OperatorType.EMPTY);
@@ -52,6 +56,10 @@ public class DummyPlan extends PhysicalPlan {
       stream.write(workload);
     }
     stream.write(needForward ? 1 : 0);
+
+    byte[] bytes = groupIdentifier.getBytes();
+    stream.writeInt(bytes.length);
+    stream.write(bytes);
   }
 
   @Override
@@ -62,6 +70,10 @@ public class DummyPlan extends PhysicalPlan {
       buffer.put(workload);
     }
     buffer.put(needForward ? (byte) 1 : 0);
+
+    byte[] bytes = groupIdentifier.getBytes();
+    buffer.putInt(bytes.length);
+    buffer.put(bytes);
   }
 
   @Override
@@ -70,6 +82,11 @@ public class DummyPlan extends PhysicalPlan {
     workload = new byte[size];
     buffer.get(workload);
     needForward = buffer.get() == 1;
+
+    int groupIdSize = buffer.getInt();
+    byte[] bytes = new byte[groupIdSize];
+    buffer.get(bytes);
+    groupIdentifier = new String(bytes);
   }
 
   public void setWorkload(byte[] workload) {
@@ -88,6 +105,14 @@ public class DummyPlan extends PhysicalPlan {
     return workload;
   }
 
+  public String getGroupIdentifier() {
+    return groupIdentifier;
+  }
+
+  public void setGroupIdentifier(String groupIdentifier) {
+    this.groupIdentifier = groupIdentifier;
+  }
+
   @Override
   public String toString() {
     return "ExprPlan";
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index df93e31..ea8edf7 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -92,6 +92,7 @@ struct AppendEntryRequest {
   // true if the request is sent from the leader, and the reiceiver just responds to the sender;
   // otherwise, the reiceiver should also notify the leader
   8: optional bool isFromLeader
+  9: optional list<Node> subReceivers
 }
 
 
@@ -107,6 +108,10 @@ struct AppendEntriesRequest {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   7: optional RaftNode header
+  // true if the request is sent from the leader, and the reiceiver just responds to the sender;
+  // otherwise, the reiceiver should also notify the leader
+  8: optional bool isFromLeader
+  9: optional list<Node> subReceivers
 }
 
 struct AddNodeResponse {
@@ -325,11 +330,6 @@ service RaftService {
   **/
   AppendEntryResult appendEntry(1:AppendEntryRequest request)
 
-  /**
-  * Like appendEntry, but the receiver should forward the request to nodes in subReceivers.
-  **/
-  AppendEntryResult appendEntryIndirect(1:AppendEntryRequest request, 2:list<Node> subReceivers)
-
   void sendSnapshot(1:SendSnapshotRequest request)
 
   /**