You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/01/05 02:26:14 UTC

[iotdb] branch expr updated: add async interface in LogDispatcher fix tests

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new 8d02321  add async interface in LogDispatcher fix tests
8d02321 is described below

commit 8d02321cefd293730aca8440927ecbb694867695
Author: jt <jt...@163.com>
AuthorDate: Wed Jan 5 10:25:32 2022 +0800

    add async interface in LogDispatcher
    fix tests
---
 .../java/org/apache/iotdb/cluster/log/Log.java     |   4 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  59 +++++--
 .../org/apache/iotdb/cluster/log/VotingLog.java    |   6 +
 .../log/sequencing/SynchronousSequencer.java       |   2 -
 .../iotdb/cluster/server/StoppedMemberManager.java |   6 +-
 .../handlers/caller/AppendGroupEntryHandler.java   | 162 -----------------
 .../handlers/caller/AppendNodeEntryHandler.java    |  28 +--
 .../cluster/server/member/DataGroupMember.java     |   7 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  17 +-
 .../cluster/server/service/DataGroupEngine.java    |  12 +-
 .../cluster/client/ClientPoolFactoryTest.java      |   2 +-
 .../iotdb/cluster/common/TestMetaGroupMember.java  |  19 +-
 .../iotdb/cluster/log/LogDispatcherTest.java       |  36 ++++
 .../caller/AppendGroupEntryHandlerTest.java        | 196 ---------------------
 .../caller/AppendNodeEntryHandlerTest.java         |  35 ++--
 .../cluster/server/member/DataGroupMemberTest.java |   3 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   5 +-
 17 files changed, 168 insertions(+), 431 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 59dd534..015b792 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -34,8 +34,8 @@ public abstract class Log implements Comparable<Log> {
 
   // make this configurable or adaptive
   private static final int DEFAULT_BUFFER_SIZE = 16 * 1024;
-  private long currLogIndex;
-  private long currLogTerm;
+  private long currLogIndex = -1;
+  private long currLogTerm = -1;
 
   // for async application
   private volatile boolean applied;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ffcdb8f..2626d69 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -246,7 +246,8 @@ public class LogDispatcher {
     private BlockingQueue<SendLogRequest> logBlockingDeque;
     private List<SendLogRequest> currBatch = new ArrayList<>();
     private Peer peer;
-    Client client;
+    Client syncClient;
+    AsyncClient asyncClient;
 
     DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
       this.receiver = receiver;
@@ -255,7 +256,9 @@ public class LogDispatcher {
           member
               .getPeerMap()
               .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
-      client = member.getSyncClient(receiver);
+      if (!clusterConfig.isUseAsyncServer()) {
+        syncClient = member.getSyncClient(receiver);
+      }
     }
 
     @Override
@@ -320,19 +323,19 @@ public class LogDispatcher {
       }
       Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
 
-      if (client == null) {
-        client = member.getSyncClient(receiver);
+      if (syncClient == null) {
+        syncClient = member.getSyncClient(receiver);
       }
       AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
       startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
       try {
-        AppendEntryResult result = client.appendEntries(request);
+        AppendEntryResult result = syncClient.appendEntries(request);
         Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
         handler.onComplete(result);
       } catch (TException e) {
-        client.getInputProtocol().getTransport().close();
-        ClientUtils.putBackSyncClient(client);
-        client = member.getSyncClient(receiver);
+        syncClient.getInputProtocol().getTransport().close();
+        ClientUtils.putBackSyncClient(syncClient);
+        syncClient = member.getSyncClient(receiver);
         logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
         handler.onError(e);
       }
@@ -411,7 +414,7 @@ public class LogDispatcher {
       }
     }
 
-    void sendLog(SendLogRequest logRequest) {
+    void sendLogSync(SendLogRequest logRequest) {
       AppendNodeEntryHandler handler =
           member.getAppendNodeEntryHandler(
               logRequest.getVotingLog(),
@@ -420,12 +423,12 @@ public class LogDispatcher {
               logRequest.newLeaderTerm,
               peer,
               logRequest.quorumSize);
-      // TODO add async interface
+
       int retries = 5;
       try {
         long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
         for (int i = 0; i < retries; i++) {
-          AppendEntryResult result = client.appendEntry(logRequest.appendEntryRequest);
+          AppendEntryResult result = syncClient.appendEntry(logRequest.appendEntryRequest);
           if (result.status == Response.RESPONSE_OUT_OF_WINDOW) {
             Thread.sleep(100);
           } else {
@@ -435,15 +438,43 @@ public class LogDispatcher {
           }
         }
       } catch (TException e) {
-        client.getInputProtocol().getTransport().close();
-        ClientUtils.putBackSyncClient(client);
-        client = member.getSyncClient(receiver);
+        syncClient.getInputProtocol().getTransport().close();
+        ClientUtils.putBackSyncClient(syncClient);
+        syncClient = member.getSyncClient(receiver);
         handler.onError(e);
       } catch (Exception e) {
         handler.onError(e);
       }
     }
 
+    private void sendLogAsync(SendLogRequest logRequest) {
+      AppendNodeEntryHandler handler =
+          member.getAppendNodeEntryHandler(
+              logRequest.getVotingLog(),
+              receiver,
+              logRequest.leaderShipStale,
+              logRequest.newLeaderTerm,
+              peer,
+              logRequest.quorumSize);
+
+      AsyncClient client = member.getAsyncClient(receiver);
+      if (client != null) {
+        try {
+          client.appendEntry(logRequest.appendEntryRequest, handler);
+        } catch (TException e) {
+          handler.onError(e);
+        }
+      }
+    }
+
+    void sendLog(SendLogRequest logRequest) {
+      if (clusterConfig.isUseAsyncServer()) {
+        sendLogAsync(logRequest);
+      } else {
+        sendLogSync(logRequest);
+      }
+    }
+
     class AppendEntriesHandler implements AsyncMethodCallback<AppendEntryResult> {
 
       private final List<AsyncMethodCallback<AppendEntryResult>> singleEntryHandlers;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index dc2d9b0..e271ead 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -26,11 +26,13 @@ public class VotingLog {
   protected Log log;
   protected Set<Integer> stronglyAcceptedNodeIds;
   protected Set<Integer> weaklyAcceptedNodeIds;
+  protected Set<Integer> failedNodeIds;
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
     stronglyAcceptedNodeIds = new HashSet<>(groupSize);
     weaklyAcceptedNodeIds = new HashSet<>(groupSize);
+    failedNodeIds = new HashSet<>(groupSize);
   }
 
   public Log getLog() {
@@ -61,4 +63,8 @@ public class VotingLog {
   public String toString() {
     return log.toString();
   }
+
+  public Set<Integer> getFailedNodeIds() {
+    return failedNodeIds;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 1c2390a..a749960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -31,8 +31,6 @@ import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 
 /**
  * SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index c8efe39..7ad56bf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -58,9 +58,11 @@ public class StoppedMemberManager {
 
   private Map<RaftNode, DataGroupMember> removedMemberMap = new HashMap<>();
   private DataGroupMember.Factory memberFactory;
+  private Node thisNode;
 
-  public StoppedMemberManager(Factory memberFactory) {
+  public StoppedMemberManager(Factory memberFactory, Node thisNode) {
     this.memberFactory = memberFactory;
+    this.thisNode = thisNode;
     recover();
   }
 
@@ -147,7 +149,7 @@ public class StoppedMemberManager {
       Node node = ClusterUtils.stringToNode(split[i]);
       partitionGroup.add(node);
     }
-    DataGroupMember member = memberFactory.create(partitionGroup);
+    DataGroupMember member = memberFactory.create(thisNode, partitionGroup);
     member.setReadOnly();
     removedMemberMap.put(partitionGroup.getHeader(), member);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
deleted file mode 100644
index b03cfaf..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandler.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.server.handlers.caller;
-
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.member.RaftMember;
-
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
-
-/**
- * AppendGroupEntryHandler checks if the log is successfully appended by the quorum or some node has
- * rejected it for some reason when one node has finished the AppendEntryRequest. The target of the
- * log is the data groups, the consistency can be reached as long as quorum data groups agree, even
- * if the actually agreed nodes can be less than quorum, because the same nodes may say "yes" for
- * multiple groups.
- */
-public class AppendGroupEntryHandler implements AsyncMethodCallback<AppendEntryResult> {
-
-  private static final Logger logger = LoggerFactory.getLogger(AppendGroupEntryHandler.class);
-
-  private RaftMember member;
-  private Log log;
-  // the number of nodes that accept the log in each group
-  // to succeed, each number should reach zero
-  // for example: assuming there are 4 nodes and 3 replicas, then the initial array will be:
-  // [2, 2, 2, 2]. And if node0 accepted the log, as node0 is in group 2,3,0, the array will be
-  // [1, 2, 1, 1].
-  private Set<Integer>[] groupReceivedNodeIds;
-  // the index of the node which the request sends log to, if the node accepts the log, all
-  // groups' counters the node is in should decrease
-  private int receiverNodeIndex;
-  private Node receiverNode;
-  // store the flag of leadership lost and the new leader's term
-  private AtomicBoolean leaderShipStale;
-  private AtomicLong newLeaderTerm;
-  private int quorumSize;
-  private int replicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
-
-  private AtomicInteger erroredNodeNum = new AtomicInteger(0);
-
-  public AppendGroupEntryHandler(
-      Set<Integer>[] groupReceivedNodeIds,
-      int receiverNodeIndex,
-      Node receiverNode,
-      AtomicBoolean leaderShipStale,
-      Log log,
-      AtomicLong newLeaderTerm,
-      RaftMember member,
-      int quorumSize) {
-    this.groupReceivedNodeIds = groupReceivedNodeIds;
-    this.receiverNodeIndex = receiverNodeIndex;
-    this.receiverNode = receiverNode;
-    this.leaderShipStale = leaderShipStale;
-    this.log = log;
-    this.newLeaderTerm = newLeaderTerm;
-    this.member = member;
-    this.quorumSize = quorumSize;
-  }
-
-  @Override
-  public void onComplete(AppendEntryResult response) {
-    if (leaderShipStale.get()) {
-      // someone has rejected this log because the leadership is stale
-      return;
-    }
-
-    long resp = response.status;
-
-    if (resp == RESPONSE_STRONG_ACCEPT) {
-      processAgreement();
-    } else if (resp > 0) {
-      // a response > 0 is the term fo the follower
-      synchronized (groupReceivedNodeIds) {
-        // the leader ship is stale, abort and wait for the new leader's heartbeat
-        long previousNewTerm = newLeaderTerm.get();
-        if (previousNewTerm < resp) {
-          newLeaderTerm.set(resp);
-        }
-        leaderShipStale.set(true);
-        groupReceivedNodeIds.notifyAll();
-      }
-    }
-    // rejected because the follower's logs are stale or the follower has no cluster info, just
-    // wait for the heartbeat to handle
-  }
-
-  /**
-   * Decrease all related counters of the receiver node. See the field "groupReceivedCounter" for an
-   * example. If all counters reach 0, wake the waiting thread to welcome the success.
-   */
-  private void processAgreement() {
-    synchronized (groupReceivedNodeIds) {
-      logger.debug("{}: Node {} has accepted log {}", member.getName(), receiverNode, log);
-      // this node is contained in REPLICATION_NUM groups, decrease the counters of these groups
-      for (int i = 0; i < replicationNum; i++) {
-        int nodeIndex = receiverNodeIndex - i;
-        if (nodeIndex < 0) {
-          nodeIndex += groupReceivedNodeIds.length;
-        }
-        groupReceivedNodeIds[nodeIndex].add(receiverNode.nodeIdentifier);
-      }
-
-      // examine if all groups has agreed
-      boolean allAgreed = true;
-      for (Set<Integer> receivedNodeIds : groupReceivedNodeIds) {
-        if (receivedNodeIds.size() < quorumSize) {
-          allAgreed = false;
-          break;
-        }
-      }
-      if (allAgreed) {
-        // wake up the parent thread to welcome the new node
-        groupReceivedNodeIds.notifyAll();
-      }
-    }
-  }
-
-  @Override
-  public void onError(Exception exception) {
-    logger.error(
-        "{}: Cannot send the add node request to node {}",
-        member.getName(),
-        receiverNode,
-        exception);
-    if (erroredNodeNum.incrementAndGet() >= replicationNum / 2) {
-      synchronized (groupReceivedNodeIds) {
-        logger.error(
-            "{}: Over half of the nodes failed, the request is rejected", member.getName());
-        groupReceivedNodeIds.notifyAll();
-      }
-    }
-  }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 5c6be20..24b7b20 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -35,6 +35,9 @@ import java.net.ConnectException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_OUT_OF_WINDOW;
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_WEAK_ACCEPT;
 
@@ -55,9 +58,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
   protected Node receiver;
   protected Peer peer;
   protected int quorumSize;
-  // initialized as the quorum size, and decrease by 1 each time when we receive a rejection or
-  // an exception, upon decreased to zero, the request will be early-aborted
-  private int failedDecreasingCounter;
 
   // nano start time when the send begins
   private long sendStart = Long.MIN_VALUE;
@@ -87,7 +87,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
 
     long resp = response.status;
 
-    if (resp == RESPONSE_STRONG_ACCEPT) {
+    if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
       synchronized (log) {
         log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
         log.notifyAll();
@@ -124,8 +124,14 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       }
     } else {
       // e.g., Response.RESPONSE_LOG_MISMATCH
-      logger.debug(
-          "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+      if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_OUT_OF_WINDOW) {
+        logger.debug(
+            "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+      } else {
+        logger.warn(
+            "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+      }
+
       onFail();
     }
     // rejected because the receiver's logs are stale or the receiver has no cluster info, just
@@ -149,8 +155,8 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
 
   private void onFail() {
     synchronized (log) {
-      failedDecreasingCounter--;
-      if (failedDecreasingCounter <= 0) {
+      log.getFailedNodeIds().add(receiver.nodeIdentifier);
+      if (log.getFailedNodeIds().size() > quorumSize) {
         // quorum members have failed, there is no need to wait for others
         log.getStronglyAcceptedNodeIds().add(Integer.MAX_VALUE);
         log.notifyAll();
@@ -182,13 +188,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
     this.receiverTerm = receiverTerm;
   }
 
-  public int getQuorumSize() {
-    return quorumSize;
-  }
-
   public void setQuorumSize(int quorumSize) {
     this.quorumSize = quorumSize;
-    this.failedDecreasingCounter =
-        ClusterDescriptor.getInstance().getConfig().getReplicationNum() - quorumSize;
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index b2108be..533adf8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -197,7 +197,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
             : new BlockingLogAppender.Factory();
   }
 
-  DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, MetaGroupMember metaGroupMember) {
+  DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember metaGroupMember) {
     // The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
     super(
         "Data-"
@@ -211,6 +211,7 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
             ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
             ClientManager.Type.DataGroupClient));
     this.metaGroupMember = metaGroupMember;
+    setThisNode(thisNode);
     setAllNodes(nodes);
     mbeanName =
         String.format(
@@ -335,8 +336,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
       this.metaGroupMember = metaGroupMember;
     }
 
-    public DataGroupMember create(PartitionGroup partitionGroup) {
-      return new DataGroupMember(protocolFactory, partitionGroup, metaGroupMember);
+    public DataGroupMember create(Node thisNode, PartitionGroup partitionGroup) {
+      return new DataGroupMember(thisNode, partitionGroup, metaGroupMember);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d71f5e3..4d69153 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -236,7 +236,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * client manager that provides reusable Thrift clients to connect to other RaftMembers and
    * execute RPC requests. It will be initialized according to the implementation of the subclasses
    */
-  private ClientManager clientManager;
+  protected ClientManager clientManager;
   /**
    * when the commit progress is updated by a heartbeat, this object is notified so that we may know
    * if this node is up-to-date with the leader, and whether the given consistency is reached
@@ -359,6 +359,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     if (heartBeatService == null) {
       return;
     }
+    clientManager.s
 
     heartBeatService.shutdownNow();
     catchUpService.shutdownNow();
@@ -1196,7 +1197,6 @@ public abstract class RaftMember implements RaftMemberMBean {
       long startTime;
       switch (appendLogResult) {
         case WEAK_ACCEPT:
-          // TODO: change to weak
           Statistic.RAFT_WEAK_ACCEPT.add(1);
           Statistic.LOG_DISPATCHER_FROM_CREATE_TO_OK.calOperationCostTimeFromStart(
               log.getCreateTime());
@@ -1674,14 +1674,16 @@ public abstract class RaftMember implements RaftMemberMBean {
     synchronized (log) {
       long waitStart = System.currentTimeMillis();
       long alreadyWait = 0;
-      while (stronglyAcceptedNodeNum < quorumSize
+      while (
+          log.getLog().getCurrLogIndex() == -1 ||
+          stronglyAcceptedNodeNum < quorumSize
           && (!(ENABLE_WEAK_ACCEPTANCE && canBeWeaklyAccepted(log.getLog()))
               || (totalAccepted < quorumSize)
               || votingLogList.size() > config.getMaxNumOfLogsInMem())
           && alreadyWait < ClusterConstant.getWriteOperationTimeoutMS()
           && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
         try {
-          log.wait(0);
+          log.wait(1);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("Unexpected interruption when sending a log", e);
@@ -1762,7 +1764,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       while (!log.isApplied()) {
         // wait until the log is applied
         try {
-          log.wait(0);
+          log.wait(1);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new LogExecutionException(e);
@@ -2289,4 +2291,9 @@ public abstract class RaftMember implements RaftMemberMBean {
   public String getRaftGroupFullId() {
     return (getHeader() != null ? getHeader().node.nodeIdentifier : 0) + "#" + getRaftGroupId();
   }
+
+  @TestOnly
+  public void setClientManager(ClientManager clientManager) {
+    this.clientManager = clientManager;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
index 74bda7e..a36701b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupEngine.java
@@ -76,7 +76,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
 
   private DataGroupEngine() {
     dataMemberFactory = new DataGroupMember.Factory(protocolFactory, metaGroupMember);
-    stoppedMemberManager = new StoppedMemberManager(dataMemberFactory);
+    stoppedMemberManager = new StoppedMemberManager(dataMemberFactory, thisNode);
   }
 
   public static DataGroupEngine getInstance() {
@@ -96,7 +96,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
       DataGroupMember.Factory dataMemberFactory, MetaGroupMember metaGroupMember) {
     DataGroupEngine.metaGroupMember = metaGroupMember;
     this.dataMemberFactory = dataMemberFactory;
-    this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory);
+    this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory, thisNode);
   }
 
   @Override
@@ -242,7 +242,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
       }
       if (partitionGroup != null && partitionGroup.contains(thisNode)) {
         // the two nodes are in the same group, create a new data member
-        member = dataMemberFactory.create(partitionGroup);
+        member = dataMemberFactory.create(thisNode, partitionGroup);
         headerGroupMap.put(header, member);
         stoppedMemberManager.remove(header);
         logger.info("Created a member for header {}, group is {}", header, partitionGroup);
@@ -310,7 +310,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
         if (newGroup.contains(thisNode)) {
           RaftNode header = newGroup.getHeader();
           logger.info("Adding this node into a new group {}", newGroup);
-          DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup);
+          DataGroupMember dataGroupMember = dataMemberFactory.create(thisNode, newGroup);
           dataGroupMember = addDataGroupMember(dataGroupMember, header);
           dataGroupMember.pullNodeAdditionSnapshots(
               ((SlotPartitionTable) partitionTable).getNodeSlots(header), node);
@@ -385,7 +385,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
       if (prevMember == null || !prevMember.getAllNodes().equals(partitionGroup)) {
         logger.info("Building member of data group: {}", partitionGroup);
         // no previous member or member changed
-        DataGroupMember dataGroupMember = dataMemberFactory.create(partitionGroup);
+        DataGroupMember dataGroupMember = dataMemberFactory.create(thisNode, partitionGroup);
         // the previous member will be replaced here
         addDataGroupMember(dataGroupMember, header);
         dataGroupMember.setUnchanged(true);
@@ -455,7 +455,7 @@ public class DataGroupEngine implements IService, DataGroupEngineMBean {
         RaftNode header = group.getHeader();
         if (!headerGroupMap.containsKey(header)) {
           logger.info("{} should join a new group {}", thisNode, group);
-          DataGroupMember dataGroupMember = dataMemberFactory.create(group);
+          DataGroupMember dataGroupMember = dataMemberFactory.create(thisNode, group);
           addDataGroupMember(dataGroupMember, header);
         }
         // pull new slots from the removed node
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
index f1e313e..7c10e1a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
@@ -46,7 +46,7 @@ import java.util.NoSuchElementException;
 public class ClientPoolFactoryTest {
   private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
 
-  private long mockMaxWaitTimeoutMs = 10 * 1000L;
+  private long mockMaxWaitTimeoutMs = 1000L;
   private int mockMaxClientPerMember = 10;
 
   private int maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
index 761f5e5..8320ea7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestMetaGroupMember.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.cluster.common;
 
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -27,13 +29,20 @@ public class TestMetaGroupMember extends MetaGroupMember {
 
   public TestMetaGroupMember() {
     super();
-    allNodes = new PartitionGroup();
-    thisNode = TestUtils.getNode(0);
-    for (int i = 0; i < 10; i++) {
-      allNodes.add(TestUtils.getNode(i));
-    }
+
     MetaSingleSnapshotLogManager manager =
         new MetaSingleSnapshotLogManager(new TestLogApplier(), this);
     setLogManager(manager);
+
+    PartitionGroup group = new PartitionGroup();
+    thisNode = TestUtils.getNode(0);
+    for (int i = 0; i < 10; i++) {
+      group.add(TestUtils.getNode(i));
+    }
+    setAllNodes(group);
+
+    this.clientManager = new ClientManager(
+        ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+        ClientManager.Type.MetaGroupClient);
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
index 229bdcc..1ff27b5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogDispatcherTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.cluster.log;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.ClientManager.Type;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestSyncClient;
@@ -107,6 +110,38 @@ public class LogDispatcherTest {
           }
 
           @Override
+          public AsyncClient getAsyncClient(Node node) {
+            return new TestAsyncClient() {
+
+              @Override
+              public void appendEntry(AppendEntryRequest request,
+                  AsyncMethodCallback<AppendEntryResult> resultHandler) throws TException {
+                try {
+                  if (!downNode.contains(node)) {
+                    resultHandler.onComplete(mockedAppendEntry(request));
+                  }
+                  resultHandler.onComplete(new AppendEntryResult(-1));
+                } catch (UnknownLogTypeException e) {
+                  throw new TException(e);
+                }
+              }
+
+              @Override
+              public void appendEntries(AppendEntriesRequest request,
+                  AsyncMethodCallback<AppendEntryResult> resultHandler) throws TException {
+                try {
+                  if (!downNode.contains(node)) {
+                    resultHandler.onComplete(mockedAppendEntries(request));
+                  }
+                  resultHandler.onComplete(new AppendEntryResult(-1));
+                } catch (UnknownLogTypeException e) {
+                  throw new TException(e);
+                }
+              }
+            };
+          }
+
+          @Override
           public Client getSyncClient(Node node) {
             return new TestSyncClient() {
               @Override
@@ -172,6 +207,7 @@ public class LogDispatcherTest {
     boolean useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
     ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
     LogDispatcher dispatcher = new LogDispatcher(raftMember);
+    raftMember.setClientManager(new ClientManager(true, Type.MetaGroupClient));
     try {
       List<Log> logs = TestUtils.prepareTestLogs(10);
       for (Log log : logs) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
deleted file mode 100644
index 14eac19..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendGroupEntryHandlerTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.server.handlers.caller;
-
-import org.apache.iotdb.cluster.common.TestException;
-import org.apache.iotdb.cluster.common.TestLog;
-import org.apache.iotdb.cluster.common.TestMetaGroupMember;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
-import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class AppendGroupEntryHandlerTest {
-
-  private int REPLICATION_NUM;
-  private int prevReplicationNum;
-  private RaftMember member;
-
-  @Before
-  public void setUp() {
-    prevReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
-    ClusterDescriptor.getInstance().getConfig().setReplicationNum(2);
-    REPLICATION_NUM = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
-    member = new TestMetaGroupMember();
-  }
-
-  @After
-  public void tearDown() throws IOException, StorageEngineException {
-    ClusterDescriptor.getInstance().getConfig().setReplicationNum(prevReplicationNum);
-    member.stop();
-    member.closeLogManager();
-    EnvironmentUtils.cleanAllDir();
-  }
-
-  @Test
-  public void testAgreement() throws InterruptedException {
-    Set<Integer>[] groupReceivedCounter = new Set[10];
-    for (int i = 0; i < 10; i++) {
-      groupReceivedCounter[i] = new HashSet<>();
-    }
-    AtomicBoolean leadershipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(-1);
-    Log testLog = new TestLog();
-    synchronized (groupReceivedCounter) {
-      for (int i = 0; i < 10; i += 2) {
-        AppendGroupEntryHandler handler =
-            new AppendGroupEntryHandler(
-                groupReceivedCounter,
-                i,
-                TestUtils.getNode(i),
-                leadershipStale,
-                testLog,
-                newLeaderTerm,
-                member,
-                REPLICATION_NUM / 2);
-        new Thread(() -> handler.onComplete(new AppendEntryResult(Response.RESPONSE_AGREE)))
-            .start();
-      }
-      groupReceivedCounter.wait();
-    }
-    for (int i = 0; i < 10; i++) {
-      assertEquals(0, groupReceivedCounter[i]);
-    }
-    assertFalse(leadershipStale.get());
-    assertEquals(-1, newLeaderTerm.get());
-  }
-
-  @Test
-  public void testNoAgreement() throws InterruptedException {
-    Set<Integer>[] groupReceivedCounter = new Set[10];
-    for (int i = 0; i < 10; i++) {
-      groupReceivedCounter[i] = new HashSet<>();
-    }
-    AtomicBoolean leadershipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(-1);
-    Log testLog = new TestLog();
-    synchronized (groupReceivedCounter) {
-      for (int i = 0; i < 5; i++) {
-        AppendGroupEntryHandler handler =
-            new AppendGroupEntryHandler(
-                groupReceivedCounter,
-                i,
-                TestUtils.getNode(i),
-                leadershipStale,
-                testLog,
-                newLeaderTerm,
-                member,
-                REPLICATION_NUM / 2);
-        handler.onComplete(new AppendEntryResult(Response.RESPONSE_AGREE));
-      }
-    }
-    for (int i = 0; i < 10; i++) {
-      if (i < 5) {
-        assertEquals(Math.max(0, REPLICATION_NUM - (5 - i)), groupReceivedCounter[i]);
-      } else {
-        assertEquals(Math.min(10 - i, REPLICATION_NUM), groupReceivedCounter[i]);
-      }
-    }
-    assertFalse(leadershipStale.get());
-    assertEquals(-1, newLeaderTerm.get());
-  }
-
-  @Test
-  public void testLeadershipStale() throws InterruptedException {
-    Set<Integer>[] groupReceivedCounter = new Set[10];
-    for (int i = 0; i < 10; i++) {
-      groupReceivedCounter[i] = new HashSet<>();
-    }
-    AtomicBoolean leadershipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(-1);
-    Log testLog = new TestLog();
-    synchronized (groupReceivedCounter) {
-      AppendGroupEntryHandler handler =
-          new AppendGroupEntryHandler(
-              groupReceivedCounter,
-              0,
-              TestUtils.getNode(0),
-              leadershipStale,
-              testLog,
-              newLeaderTerm,
-              member,
-              REPLICATION_NUM / 2);
-      new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
-      groupReceivedCounter.wait();
-    }
-    for (int i = 0; i < 10; i++) {
-      assertEquals(REPLICATION_NUM / 2, groupReceivedCounter[i]);
-    }
-    assertTrue(leadershipStale.get());
-    assertEquals(100, newLeaderTerm.get());
-  }
-
-  @Test
-  public void testError() throws InterruptedException {
-    Set<Integer>[] groupReceivedCounter = new Set[10];
-    for (int i = 0; i < 10; i++) {
-      groupReceivedCounter[i] = new HashSet<>();
-    }
-    AtomicBoolean leadershipStale = new AtomicBoolean(false);
-    AtomicLong newLeaderTerm = new AtomicLong(-1);
-    Log testLog = new TestLog();
-
-    AppendGroupEntryHandler handler =
-        new AppendGroupEntryHandler(
-            groupReceivedCounter,
-            0,
-            TestUtils.getNode(0),
-            leadershipStale,
-            testLog,
-            newLeaderTerm,
-            member,
-            REPLICATION_NUM / 2);
-    handler.onError(new TestException());
-
-    for (int i = 0; i < 10; i++) {
-      assertEquals(REPLICATION_NUM / 2, groupReceivedCounter[i]);
-    }
-    assertFalse(leadershipStale.get());
-    assertEquals(-1, newLeaderTerm.get());
-  }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 20f7700..b46eded 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -72,21 +72,24 @@ public class AppendNodeEntryHandlerTest {
       ClusterDescriptor.getInstance().getConfig().setReplicationNum(10);
       VotingLog votingLog = new VotingLog(log, 10);
       Peer peer = new Peer(1);
-      synchronized (votingLog) {
-        for (int i = 0; i < 10; i++) {
-          AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
-          handler.setLeaderShipStale(leadershipStale);
-          handler.setLog(votingLog);
-          handler.setMember(member);
-          handler.setReceiverTerm(receiverTerm);
-          handler.setReceiver(TestUtils.getNode(i));
-          handler.setPeer(peer);
-          long resp = i >= 5 ? Response.RESPONSE_AGREE : Response.RESPONSE_LOG_MISMATCH;
-          AppendEntryResult result = new AppendEntryResult();
-          result.setStatus(resp);
-          new Thread(() -> handler.onComplete(result)).start();
+      for (int i = 0; i < 10; i++) {
+        AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
+        handler.setLeaderShipStale(leadershipStale);
+        handler.setLog(votingLog);
+        handler.setMember(member);
+        handler.setReceiverTerm(receiverTerm);
+        handler.setReceiver(TestUtils.getNode(i));
+        handler.setPeer(peer);
+        handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
+        long resp = i >= 5 ? Response.RESPONSE_AGREE : Response.RESPONSE_LOG_MISMATCH;
+        AppendEntryResult result = new AppendEntryResult();
+        result.setStatus(resp);
+        new Thread(() -> handler.onComplete(result)).start();
+      }
+      while (votingLog.getStronglyAcceptedNodeIds().size() < 5) {
+        synchronized (votingLog) {
+          votingLog.wait(1);
         }
-        votingLog.wait();
       }
       assertEquals(-1, receiverTerm.get());
       assertFalse(leadershipStale.get());
@@ -112,6 +115,7 @@ public class AppendNodeEntryHandlerTest {
       handler.setReceiverTerm(receiverTerm);
       handler.setReceiver(TestUtils.getNode(i));
       handler.setPeer(peer);
+      handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       AppendEntryResult result = new AppendEntryResult();
       result.setStatus(Response.RESPONSE_AGREE);
       handler.onComplete(result);
@@ -138,6 +142,7 @@ public class AppendNodeEntryHandlerTest {
       handler.setReceiverTerm(receiverTerm);
       handler.setReceiver(TestUtils.getNode(0));
       handler.setPeer(peer);
+      handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       new Thread(() -> handler.onComplete(new AppendEntryResult(100L))).start();
       votingLog.wait();
     }
@@ -164,11 +169,13 @@ public class AppendNodeEntryHandlerTest {
       handler.setReceiverTerm(receiverTerm);
       handler.setReceiver(TestUtils.getNode(0));
       handler.setPeer(peer);
+      handler.setQuorumSize(ClusterDescriptor.getInstance().getConfig().getReplicationNum() / 2);
       handler.onError(new TestException());
 
       assertEquals(-1, receiverTerm.get());
       assertFalse(leadershipStale.get());
       assertEquals(0, votingLog.getStronglyAcceptedNodeIds().size());
+
     } finally {
       ClusterDescriptor.getInstance().getConfig().setReplicationNum(replicationNum);
     }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index c3db432..b2f8e01 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -196,7 +196,7 @@ public class DataGroupMemberTest extends BaseMember {
 
   private DataGroupMember getDataGroupMember(Node node, PartitionGroup nodes) {
     DataGroupMember dataGroupMember =
-        new DataGroupMember(new Factory(), nodes, testMetaMember) {
+        new DataGroupMember(node, nodes, testMetaMember) {
           @Override
           public boolean syncLeader(CheckConsistency checkConsistency) {
             return true;
@@ -290,7 +290,6 @@ public class DataGroupMemberTest extends BaseMember {
           }
         };
     PartitionedSnapshotLogManager logManager = getLogManager(nodes, dataGroupMember);
-    dataGroupMember.setThisNode(node);
     dataGroupMember.setLogManager(logManager);
     dataGroupMember.setLeader(node);
     dataGroupMember.setCharacter(NodeCharacter.LEADER);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 8ca9a7b..4e4f82f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -85,7 +85,6 @@ import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
 import org.apache.iotdb.db.auth.entity.Role;
 import org.apache.iotdb.db.auth.entity.User;
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -189,7 +188,7 @@ public class MetaGroupMemberTest extends BaseMember {
         new DataGroupEngine(
             new DataGroupMember.Factory(new Factory(), testMetaMember) {
               @Override
-              public DataGroupMember create(PartitionGroup partitionGroup) {
+              public DataGroupMember create(Node thisNode, PartitionGroup partitionGroup) {
                 return getDataGroupMember(partitionGroup, TestUtils.getNode(0));
               }
             },
@@ -228,7 +227,7 @@ public class MetaGroupMemberTest extends BaseMember {
 
   private DataGroupMember getDataGroupMember(PartitionGroup group, Node node) {
     DataGroupMember dataGroupMember =
-        new DataGroupMember(new Factory(), group, testMetaMember) {
+        new DataGroupMember(node, group, testMetaMember) {
           @Override
           public boolean syncLeader(CheckConsistency checkConsistency) {
             return true;