You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/11/01 06:21:38 UTC

[iotdb] branch expr updated: before add log sequencer

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new f505f20  before add log sequencer
f505f20 is described below

commit f505f201b5b4d8a0906c2e1cfbda5e5da76ab855
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 1 14:21:07 2021 +0800

    before add log sequencer
---
 cluster/distribute-dc.sh                           |   2 +-
 .../org/apache/iotdb/cluster/expr/ExprBench.java   |   2 +
 .../org/apache/iotdb/cluster/expr/ExprMember.java  |   1 +
 .../apache/iotdb/cluster/expr/SequencerExpr.java   | 379 +++++++++++++++++++++
 .../apache/iotdb/cluster/expr/VotingLogList.java   |  32 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |   3 +-
 .../org/apache/iotdb/cluster/log/VotingLog.java    |   2 +
 .../cluster/partition/slot/SlotPartitionTable.java |  16 +-
 .../handlers/caller/AppendNodeEntryHandler.java    |   4 +
 .../server/handlers/caller/ElectionHandler.java    |   2 +-
 .../server/heartbeat/MetaHeartbeatThread.java      |   9 +-
 .../cluster/server/member/DataGroupMember.java     |  10 +-
 .../cluster/server/member/MetaGroupMember.java     |  33 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  81 +++--
 14 files changed, 516 insertions(+), 60 deletions(-)

diff --git a/cluster/distribute-dc.sh b/cluster/distribute-dc.sh
index 279f9ec..1eba6f4 100644
--- a/cluster/distribute-dc.sh
+++ b/cluster/distribute-dc.sh
@@ -1,6 +1,6 @@
 src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
 
-ips=(dc15 dc16 dc17 dc18)
+ips=(dc11 dc12 dc13 dc14 dc15 dc16 dc17 dc18)
 target_lib_path=/home/jt/iotdb_expr/lib
 
 for ip in ${ips[*]}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
index 86ec5e6..e3f7b57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.expr;
 import org.apache.iotdb.cluster.client.sync.SyncClientFactory;
 import org.apache.iotdb.cluster.client.sync.SyncClientPool;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
@@ -107,6 +108,7 @@ public class ExprBench {
   }
 
   public static void main(String[] args) {
+    ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(50000);
     Node target = new Node();
     target.setInternalIp(args[0]);
     target.setMetaPort(Integer.parseInt(args[1]));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 9e07028..050230b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -205,6 +205,7 @@ public class ExprMember extends MetaGroupMember {
 
     // flush [0, flushPos)
     List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+    // logger.info("{}, Flushing {} into log manager", logManager.getLastLogIndex(), logs);
     long success =
         logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
     if (success != -1) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
new file mode 100644
index 0000000..7dae6a7
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/SequencerExpr.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.LogExecutionException;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogApplier;
+import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
+import org.apache.iotdb.cluster.log.LogParser;
+import org.apache.iotdb.cluster.log.VotingLog;
+import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
+import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.NodeCharacter;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.cluster.utils.IOUtils;
+import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequencerExpr extends MetaGroupMember {
+
+  private static final Logger logger = LoggerFactory.getLogger(SequencerExpr.class);
+
+  private int v2ThreadNum = 2000;
+  private int v3ThreadNum = 0000;
+  private AtomicLong reqCnt = new AtomicLong();
+
+  private BlockingQueue<SendLogRequest> nonsequencedLogQueue = new ArrayBlockingQueue<>(
+      4096);
+
+  public SequencerExpr() {
+    LogApplier applier = new LogApplier() {
+      @Override
+      public void apply(Log log) {
+        log.setApplied(true);
+      }
+
+      @Override
+      public void close() {
+
+      }
+    };
+    logManager = new MetaSingleSnapshotLogManager(applier, this);
+
+    new Thread(this::sequenceLog).start();
+    new Thread(this::sequenceLog).start();
+    new Thread(this::sequenceLog).start();
+    new Thread(this::sequenceLog).start();
+    reportThread = Executors.newSingleThreadScheduledExecutor();
+    reportThread.scheduleAtFixedRate(
+        this::generateNodeReport, REPORT_INTERVAL_SEC, REPORT_INTERVAL_SEC, TimeUnit.SECONDS);
+  }
+
+  private TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+    logger.debug("{}: Processing plan {}", getName(), plan);
+    // assign term and index to the new log and append it
+    SendLogRequest sendLogRequest;
+
+    Log log;
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
+      }
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog) log).setPlan(plan);
+    }
+
+    if (log.serialize().capacity() + Integer.BYTES
+        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+      logger.error(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "or reduce the size of requests you send.");
+      return StatusUtils.INTERNAL_ERROR;
+    }
+
+    long startTime =
+        Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
+    synchronized (logManager) {
+      Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.calOperationCostTimeFromStart(
+          startTime);
+
+      plan.setIndex(logManager.getLastLogIndex() + 1);
+      log.setCurrLogTerm(getTerm().get());
+      log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+      startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
+      // just like processPlanLocally,we need to check the size of log
+
+      // logDispatcher will serialize log, and set log size, and we will use the size after it
+      logManager.append(log);
+      Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
+
+      startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
+      sendLogRequest = buildSendLogRequest(log);
+      Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
+
+      startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+      log.setCreateTime(System.nanoTime());
+      votingLogList.insert(sendLogRequest.getVotingLog());
+      getLogDispatcher().offer(sendLogRequest);
+      Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+    }
+
+    try {
+      AppendLogResult appendLogResult =
+          waitAppendResult(
+              sendLogRequest.getVotingLog(),
+              sendLogRequest.getLeaderShipStale(),
+              sendLogRequest.getNewLeaderTerm(),
+              sendLogRequest.getQuorumSize());
+      Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
+          sendLogRequest.getVotingLog().getLog().getCreateTime());
+
+      switch (appendLogResult) {
+        case WEAK_ACCEPT:
+          // TODO: change to weak
+          Statistic.RAFT_WEAK_ACCEPT.add(1);
+          return StatusUtils.OK;
+        case OK:
+          logger.debug(MSG_LOG_IS_ACCEPTED, getName(), log);
+          startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
+          commitLog(log);
+          Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
+          return StatusUtils.OK;
+        case TIME_OUT:
+          logger.debug("{}: log {} timed out...", getName(), log);
+          break;
+        case LEADERSHIP_STALE:
+          // abort the appending, the new leader will fix the local logs by catch-up
+        default:
+          break;
+      }
+    } catch (Exception e) {
+      return handleLogExecutionException(log, IOUtils.getRootCause(e));
+    }
+    return StatusUtils.TIME_OUT;
+  }
+
+  public SendLogRequest enqueueSendLogRequest(Log log) {
+    VotingLog votingLog = buildVotingLog(log);
+    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
+    AtomicLong newLeaderTerm = new AtomicLong(term.get());
+
+    SendLogRequest sendLogRequest = new SendLogRequest(
+        votingLog, leaderShipStale, newLeaderTerm, null, allNodes.size() / 2);
+    try {
+      nonsequencedLogQueue.put(sendLogRequest);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    return sendLogRequest;
+  }
+
+  private void sequenceLog(List<SendLogRequest> sendLogRequests) {
+    long startTime;
+    synchronized (logManager) {
+      for (SendLogRequest sendLogRequest : sendLogRequests) {
+        Log log = sendLogRequest.getVotingLog().getLog();
+        log.setCurrLogTerm(getTerm().get());
+        log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+        if (log instanceof PhysicalPlanLog) {
+          ((PhysicalPlanLog) log).getPlan().setIndex(log.getCurrLogIndex());
+        }
+
+        startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
+        // just like processPlanLocally,we need to check the size of log
+
+        // logDispatcher will serialize log, and set log size, and we will use the size after it
+        logManager.append(log);
+        Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
+
+        AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
+        sendLogRequest.setAppendEntryRequest(appendEntryRequest);
+
+        startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+        log.setCreateTime(System.nanoTime());
+        votingLogList.insert(sendLogRequest.getVotingLog());
+        getLogDispatcher().offer(sendLogRequest);
+        Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+      }
+    }
+    sendLogRequests.clear();
+  }
+
+  private void sequenceLog() {
+    List<SendLogRequest> sendLogRequests = new ArrayList<>();
+    while (!Thread.interrupted()) {
+      try {
+        synchronized (nonsequencedLogQueue) {
+          SendLogRequest request = nonsequencedLogQueue.take();
+          sendLogRequests.add(request);
+          nonsequencedLogQueue.drainTo(sendLogRequests);
+        }
+
+        sequenceLog(sendLogRequests);
+      } catch (InterruptedException e) {
+        return;
+      }
+    }
+  }
+
+  private TSStatus processPlanLocallyV3(PhysicalPlan plan) {
+    logger.debug("{}: Processing plan {}", getName(), plan);
+    // assign term and index to the new log and append it
+    SendLogRequest sendLogRequest;
+
+    Log log;
+    if (plan instanceof LogPlan) {
+      try {
+        log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+      } catch (UnknownLogTypeException e) {
+        logger.error("Can not parse LogPlan {}", plan, e);
+        return StatusUtils.PARSE_LOG_ERROR;
+      }
+    } else {
+      log = new PhysicalPlanLog();
+      ((PhysicalPlanLog) log).setPlan(plan);
+    }
+
+    if (log.serialize().capacity() + Integer.BYTES
+        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+      logger.error(
+          "Log cannot fit into buffer, please increase raft_log_buffer_size;"
+              + "or reduce the size of requests you send.");
+      return StatusUtils.INTERNAL_ERROR;
+    }
+
+    long startTime;
+    sendLogRequest = enqueueSendLogRequest(log);
+
+    try {
+      AppendLogResult appendLogResult =
+          waitAppendResult(
+              sendLogRequest.getVotingLog(),
+              sendLogRequest.getLeaderShipStale(),
+              sendLogRequest.getNewLeaderTerm(),
+              sendLogRequest.getQuorumSize());
+      Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
+          sendLogRequest.getVotingLog().getLog().getCreateTime());
+
+      switch (appendLogResult) {
+        case WEAK_ACCEPT:
+          // TODO: change to weak
+          Statistic.RAFT_WEAK_ACCEPT.add(1);
+          return StatusUtils.OK;
+        case OK:
+          logger.debug(MSG_LOG_IS_ACCEPTED, getName(), log);
+          startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
+          commitLog(log);
+          Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
+          return StatusUtils.OK;
+        case TIME_OUT:
+          logger.debug("{}: log {} timed out...", getName(), log);
+          break;
+        case LEADERSHIP_STALE:
+          // abort the appending, the new leader will fix the local logs by catch-up
+        default:
+          break;
+      }
+    } catch (Exception e) {
+      return handleLogExecutionException(log, IOUtils.getRootCause(e));
+    }
+    return StatusUtils.TIME_OUT;
+  }
+
+  @Override
+  public Client getSyncClient(Node node) {
+    return new Client(null, null) {
+      @Override
+      public AppendEntryResult appendEntry(AppendEntryRequest request) {
+        return new AppendEntryResult().setStatus(Response.RESPONSE_STRONG_ACCEPT);
+      }
+
+      @Override
+      public AppendEntryResult appendEntries(AppendEntriesRequest request) {
+        return new AppendEntryResult().setStatus(Response.RESPONSE_STRONG_ACCEPT);
+      }
+    };
+  }
+
+  private void decentralizedSequencing() {
+    for (int i = 0; i < v2ThreadNum; i++) {
+      new Thread(() -> {
+        while (true) {
+          reqCnt.incrementAndGet();
+          DummyPlan dummyPlan = new DummyPlan();
+          processPlanLocallyV2(dummyPlan);
+        }
+      }).start();
+    }
+  }
+
+  private void centralizedSequencing() {
+    for (int i = 0; i < v3ThreadNum; i++) {
+      new Thread(() -> {
+        while (true) {
+          reqCnt.incrementAndGet();
+          DummyPlan dummyPlan = new DummyPlan();
+          processPlanLocallyV3(dummyPlan);
+        }
+      }).start();
+    }
+  }
+
+  private void startMonitor() {
+    new Thread(() -> {
+      long startTime = System.currentTimeMillis();
+      while (true) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        long consumedTime = System.currentTimeMillis() - startTime;
+        System.out.println("" + consumedTime + ", " + (reqCnt.get() * 1.0 / consumedTime * 1000L));
+      }
+    }).start();
+  }
+
+  public static void main(String[] args) {
+    RaftMember.USE_LOG_DISPATCHER = true;
+    ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(false);
+    SequencerExpr sequencerExpr = new SequencerExpr();
+    sequencerExpr.setCharacter(NodeCharacter.LEADER);
+    PartitionGroup group = new PartitionGroup();
+    for (int i = 0; i < 3; i++) {
+      group.add(new Node().setNodeIdentifier(i).setMetaPort(i));
+    }
+    sequencerExpr.setAllNodes(group);
+    sequencerExpr.centralizedSequencing();
+    sequencerExpr.decentralizedSequencing();
+    sequencerExpr.startMonitor();
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 149e726..28eb19d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -21,10 +21,14 @@ package org.apache.iotdb.cluster.expr;
 
 import org.apache.iotdb.cluster.log.VotingLog;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.List;
 
 public class VotingLogList {
+  private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
 
   private List<VotingLog> logList = new ArrayList<>();
   private volatile long currTerm = -1;
@@ -58,28 +62,36 @@ public class VotingLogList {
    * @param acceptingNodeId
    * @return the lastly removed entry if any.
    */
-  public synchronized void onStronglyAccept(long index, long term, int acceptingNodeId) {
+  public void onStronglyAccept(long index, long term, int acceptingNodeId) {
     int lastEntryIndexToCommit = -1;
-    for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
-      VotingLog votingLog = logList.get(i);
-      if (votingLog.getLog().getCurrLogIndex() <= index
-          && votingLog.getLog().getCurrLogTerm() == term) {
-        votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
-        if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
-          lastEntryIndexToCommit = i;
+
+    List<VotingLog> acceptedLogs;
+    synchronized (this) {
+      for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
+        VotingLog votingLog = logList.get(i);
+        if (votingLog.getLog().getCurrLogIndex() <= index
+            && votingLog.getLog().getCurrLogTerm() == term) {
+          votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
+          if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) {
+            lastEntryIndexToCommit = i;
+          }
+        } else if (votingLog.getLog().getCurrLogIndex() > index) {
+          break;
         }
       }
+
+      List<VotingLog> tmpAcceptedLogs = logList.subList(0, lastEntryIndexToCommit + 1);
+      acceptedLogs = new ArrayList<>(tmpAcceptedLogs);
+      tmpAcceptedLogs.clear();
     }
 
     if (lastEntryIndexToCommit != -1) {
-      List<VotingLog> acceptedLogs = logList.subList(0, lastEntryIndexToCommit + 1);
       for (VotingLog acceptedLog : acceptedLogs) {
         synchronized (acceptedLog) {
           acceptedLog.acceptedTime = System.nanoTime();
           acceptedLog.notifyAll();
         }
       }
-      acceptedLogs.clear();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index d954ccf..6c3d2073 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.TestOnly;
@@ -83,7 +84,7 @@ public class LogDispatcher {
 
   void createQueueAndBindingThreads() {
     for (Node node : member.getAllNodes()) {
-      if (!node.equals(member.getThisNode())) {
+      if (!ClusterUtils.isNodeEquals(node, member.getThisNode())) {
         nodeLogQueues.add(createQueueAndBindingThread(node));
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index 555562a..d56f3f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.log;
 
+import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -27,6 +28,7 @@ public class VotingLog {
   protected Set<Integer> stronglyAcceptedNodeIds;
   protected Set<Integer> weaklyAcceptedNodeIds;
   public long acceptedTime;
+  public volatile ByteBuffer serializedCache;
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index 7a46baa..fcb70cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -190,7 +190,7 @@ public class SlotPartitionTable implements PartitionTable {
   private List<PartitionGroup> getPartitionGroups(Node node) {
     List<PartitionGroup> ret = new ArrayList<>();
 
-    int nodeIndex = nodeRing.indexOf(node);
+    int nodeIndex = findNodeIndex(node);
     if (nodeIndex == -1) {
       logger.info("PartitionGroups is empty due to this node has been removed from the cluster!");
       return ret;
@@ -215,7 +215,7 @@ public class SlotPartitionTable implements PartitionTable {
     PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
 
     // assuming the nodes are [1,2,3,4,5]
-    int nodeIndex = nodeRing.indexOf(raftNode.getNode());
+    int nodeIndex = findNodeIndex(raftNode.getNode());
     if (nodeIndex == -1) {
       logger.warn("Node {} is not in the cluster", raftNode.getNode());
       return null;
@@ -529,7 +529,7 @@ public class SlotPartitionTable implements PartitionTable {
         localGroups.remove(removedGroupIdx);
         // each node exactly joins replicationNum groups, so when a group is removed, the node
         // should join a new one
-        int thisNodeIdx = nodeRing.indexOf(thisNode);
+        int thisNodeIdx = findNodeIndex(thisNode);
 
         // check if this node is to be removed
         if (thisNodeIdx == -1) {
@@ -606,4 +606,14 @@ public class SlotPartitionTable implements PartitionTable {
   public RaftNode[] getSlotNodes() {
     return slotNodes;
   }
+
+  private int findNodeIndex(Node node) {
+    for (int i = 0; i < nodeRing.size(); i++) {
+      if (nodeRing.get(i).getInternalIp().equals(node.getInternalIp())
+          && nodeRing.get(i).getMetaPort() == node.getMetaPort()) {
+        return i;
+      }
+    }
+    return -1;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index f865ec8..73088d2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -114,6 +114,10 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
       synchronized (log) {
+        if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size()
+            >= quorumSize) {
+          log.acceptedTime = System.nanoTime();
+        }
         log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
         log.notifyAll();
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
index 6190d20..cbb5a08 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ElectionHandler.java
@@ -96,8 +96,8 @@ public class ElectionHandler implements AsyncMethodCallback<Long> {
           // the election is valid
           electionValid.set(true);
           terminated.set(true);
-          raftMember.getTerm().notifyAll();
           raftMember.onElectionWins();
+          raftMember.getTerm().notifyAll();
           logger.info("{}: Election {} is won", memberName, currTerm);
         }
         // still need more votes
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 3eb510d..cb28a64 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -86,6 +86,7 @@ public class MetaHeartbeatThread extends HeartbeatThread {
     super.startElection();
 
     if (localMetaMember.getCharacter() == NodeCharacter.LEADER) {
+
       // A new raft leader needs to have at least one log in its term for committing logs with older
       // terms.
       // In the meta group, log frequency is very low. When the leader is changed whiling changing
@@ -93,7 +94,13 @@ public class MetaHeartbeatThread extends HeartbeatThread {
       // operation can be carried out in time.
       localMetaMember
           .getAppendLogThreadPool()
-          .submit(() -> localMetaMember.processPlanLocally(new DummyPlan()));
+          .submit(
+              () -> {
+                while (localMetaMember.getPartitionTable() == null) {
+                  // wait until partition table is ready
+                }
+                localMetaMember.processPlanLocally(new DummyPlan());
+              });
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index e58d148..eadc017 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -266,7 +266,7 @@ public class DataGroupMember extends RaftMember {
   long checkElectorLogProgress(ElectionRequest electionRequest) {
     Node elector = electionRequest.getElector();
     // check if the node is in the group
-    if (!allNodes.contains(elector)) {
+    if (!containsNode(elector)) {
       logger.info(
           "{}: the elector {} is not in the data group {}, so reject this election.",
           name,
@@ -316,7 +316,7 @@ public class DataGroupMember extends RaftMember {
       logger.debug("{}: start to pre adding node {}", name, node);
     }
     synchronized (allNodes) {
-      if (allNodes.contains(node)) {
+      if (containsNode(node)) {
         return false;
       }
       int insertIndex = -1;
@@ -367,7 +367,7 @@ public class DataGroupMember extends RaftMember {
 
     synchronized (allNodes) {
       preAddNode(node);
-      if (allNodes.contains(node) && allNodes.size() > config.getReplicationNum()) {
+      if (containsNode(node) && allNodes.size() > config.getReplicationNum()) {
         // remove the last node because the group size is fixed to replication number
         Node removedNode = allNodes.remove(allNodes.size() - 1);
         peerMap.remove(removedNode);
@@ -895,7 +895,7 @@ public class DataGroupMember extends RaftMember {
       logger.debug("{}: start to pre remove node {}", name, removedNode);
     }
     synchronized (allNodes) {
-      if (allNodes.contains(removedNode) && allNodes.size() == config.getReplicationNum()) {
+      if (containsNode(removedNode) && allNodes.size() == config.getReplicationNum()) {
         // update the group if the deleted node was in it
         PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
         if (newGroup == null) {
@@ -933,7 +933,7 @@ public class DataGroupMember extends RaftMember {
 
     synchronized (allNodes) {
       preRemoveNode(removedNode);
-      if (allNodes.contains(removedNode)) {
+      if (containsNode(removedNode)) {
         // update the group if the deleted node was in it
         allNodes.remove(removedNode);
         peerMap.remove(removedNode);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7919795..e30f867 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -169,7 +169,7 @@ public class MetaGroupMember extends RaftMember {
    * every "REPORT_INTERVAL_SEC" seconds, a reporter thread will print the status of all raft
    * members in this node
    */
-  private static final int REPORT_INTERVAL_SEC = 10;
+  protected static final int REPORT_INTERVAL_SEC = 10;
 
   /**
    * during snapshot, hardlinks of data files are created to for downloading. hardlinks will be
@@ -219,7 +219,7 @@ public class MetaGroupMember extends RaftMember {
    * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread" will print the status
    * of all raft members in this node
    */
-  private ScheduledExecutorService reportThread;
+  protected ScheduledExecutorService reportThread;
 
   /**
    * containing configurations that should be kept the same cluster-wide, and must be checked before
@@ -397,7 +397,7 @@ public class MetaGroupMember extends RaftMember {
       if (node != null
           && (!node.getInternalIp().equals(thisNode.internalIp)
               || node.getMetaPort() != thisNode.getMetaPort())
-          && !allNodes.contains(node)) {
+          && !containsNode(node)) {
         // do not add the local node since it is added in the constructor
         allNodes.add(node);
       }
@@ -417,7 +417,7 @@ public class MetaGroupMember extends RaftMember {
         logger.debug("{}: adding a new node {} into {}", name, newNode, allNodes);
       }
 
-      if (!allNodes.contains(newNode)) {
+      if (!containsNode(newNode)) {
         registerNodeIdentifier(newNode, newNode.getNodeIdentifier());
         allNodes.add(newNode);
       }
@@ -473,7 +473,7 @@ public class MetaGroupMember extends RaftMember {
         TimeUnit.SECONDS);
   }
 
-  private void generateNodeReport() {
+  protected void generateNodeReport() {
     try {
       if (logger.isInfoEnabled()) {
         NodeReport report = genNodeReport();
@@ -619,7 +619,7 @@ public class MetaGroupMember extends RaftMember {
   long checkElectorLogProgress(ElectionRequest electionRequest) {
     Node elector = electionRequest.getElector();
     // check if the node is in the group
-    if (partitionTable != null && !allNodes.contains(elector)) {
+    if (partitionTable != null && !containsNode(elector)) {
       logger.info(
           "{}: the elector {} is not in the data group {}, so reject this election.",
           name,
@@ -723,7 +723,20 @@ public class MetaGroupMember extends RaftMember {
     if (response.isSetFollowerIdentifier()) {
       // register the follower, the response.getFollower() contains the node information of the
       // receiver.
-      registerNodeIdentifier(response.getFollower(), response.getFollowerIdentifier());
+      Node localNode = null;
+      for (Node node : allNodes) {
+        if (node.getInternalIp().equals(response.getFollower().internalIp)
+            && node.getMetaPort() == response.getFollower().getMetaPort()) {
+          localNode = node;
+        }
+      }
+      if (localNode == null) {
+        logger.warn(
+            "Received a heartbeat response from a node that is not in the node list: {}",
+            response.getFollower());
+        return;
+      }
+      registerNodeIdentifier(localNode, response.getFollowerIdentifier());
       // if all nodes' ids are known, we can build the partition table
       if (allNodesIdKnown()) {
         // When the meta raft group is established, the follower reports its node information to the
@@ -902,7 +915,7 @@ public class MetaGroupMember extends RaftMember {
         break;
       }
     }
-    if (allNodes.contains(newNode)) {
+    if (containsNode(newNode)) {
       logger.debug("Node {} is already in the cluster", newNode);
       response.setRespNum((int) Response.RESPONSE_AGREE);
       synchronized (partitionTable) {
@@ -1741,7 +1754,7 @@ public class MetaGroupMember extends RaftMember {
         logger.debug("{}: Removing a node {} from {}", name, oldNode, allNodes);
       }
 
-      if (allNodes.contains(oldNode)) {
+      if (containsNode(oldNode)) {
         allNodes.remove(oldNode);
         idNodeMap.remove(oldNode.nodeIdentifier);
       }
@@ -1857,7 +1870,7 @@ public class MetaGroupMember extends RaftMember {
   private NodeReport genNodeReport() {
     NodeReport report = new NodeReport(thisNode);
     report.setMetaMemberReport(genMemberReport());
-    report.setDataMemberReportList(dataClusterServer.genMemberReports());
+    // report.setDataMemberReportList(dataClusterServer.genMemberReports());
     return report;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 564eb98..41b5e50 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -168,7 +169,7 @@ public abstract class RaftMember {
    * the current term of the node, this object also works as lock of some transactions of the member
    * like elections.
    */
-  AtomicLong term = new AtomicLong(0);
+  protected AtomicLong term = new AtomicLong(0);
 
   volatile NodeCharacter character = NodeCharacter.ELECTOR;
   AtomicReference<Node> leader = new AtomicReference<>(ClusterConstant.EMPTY_NODE);
@@ -787,6 +788,19 @@ public abstract class RaftMember {
   public void setAllNodes(PartitionGroup allNodes) {
     this.allNodes = allNodes;
     this.votingLogList = new VotingLogList(allNodes.size() / 2);
+
+    // update the reference of thisNode to keep consistency
+    boolean foundThisNode = false;
+    for (Node node : allNodes) {
+      if (ClusterUtils.isNodeEquals(node, thisNode)) {
+        thisNode = node;
+        foundThisNode = true;
+        break;
+      }
+    }
+    if (!foundThisNode) {
+      logger.error("{}: did not find this node {}, in the raft group {}", name, thisNode, allNodes);
+    }
   }
 
   public Map<Node, Long> getLastCatchUpResponseTime() {
@@ -1192,10 +1206,9 @@ public abstract class RaftMember {
 
       startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
       log.setCreateTime(System.nanoTime());
+      votingLogList.insert(sendLogRequest.getVotingLog());
       getLogDispatcher().offer(sendLogRequest);
       Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
-
-      votingLogList.insert(sendLogRequest.getVotingLog());
     }
 
     try {
@@ -1586,7 +1599,7 @@ public abstract class RaftMember {
     return term;
   }
 
-  private synchronized LogDispatcher getLogDispatcher() {
+  protected synchronized LogDispatcher getLogDispatcher() {
     if (logDispatcher == null) {
       if (USE_INDIRECT_LOG_DISPATCHER) {
         logDispatcher = new IndirectLogDispatcher(this);
@@ -1602,23 +1615,24 @@ public abstract class RaftMember {
    * one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
    */
   @SuppressWarnings({"java:S2445"}) // safe synchronized
-  private AppendLogResult waitAppendResult(
+  protected AppendLogResult waitAppendResult(
       VotingLog log, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm, int quorumSize) {
     // wait for the followers to vote
     long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
     long nextTimeToPrint = 3000;
+
+    int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
+    int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+    int totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
     synchronized (log) {
       long waitStart = System.currentTimeMillis();
       long alreadyWait = 0;
-      int stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
-      int weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
       while (stronglyAcceptedNodeNum < quorumSize
+          && (!ENABLE_WEAK_ACCEPTANCE || (totalAccepted < quorumSize))
           && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
-          && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)
-          && (!ENABLE_WEAK_ACCEPTANCE
-              || (stronglyAcceptedNodeNum + weaklyAcceptedNodeNum < quorumSize))) {
+          && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
         try {
-          log.wait(1);
+          log.wait(0);
           logger.debug("{} ends waiting", log);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
@@ -1636,6 +1650,7 @@ public abstract class RaftMember {
         }
         stronglyAcceptedNodeNum = log.getStronglyAcceptedNodeIds().size();
         weaklyAcceptedNodeNum = log.getWeaklyAcceptedNodeIds().size();
+        totalAccepted = stronglyAcceptedNodeNum + weaklyAcceptedNodeNum;
       }
 
       if (alreadyWait > 3000) {
@@ -1663,15 +1678,11 @@ public abstract class RaftMember {
     }
 
     // cannot get enough agreements within a certain amount of time
-    if (log.getStronglyAcceptedNodeIds().size() < quorumSize
-        && (log.getStronglyAcceptedNodeIds().size() + log.getWeaklyAcceptedNodeIds().size())
-            < quorumSize) {
+    if (stronglyAcceptedNodeNum < quorumSize && totalAccepted < quorumSize) {
       return AppendLogResult.TIME_OUT;
     }
 
-    if (log.getStronglyAcceptedNodeIds().size() < quorumSize
-        && (log.getStronglyAcceptedNodeIds().size() + log.getWeaklyAcceptedNodeIds().size())
-            >= quorumSize) {
+    if (stronglyAcceptedNodeNum < quorumSize && totalAccepted >= quorumSize) {
       return AppendLogResult.WEAK_ACCEPT;
     }
 
@@ -1680,17 +1691,21 @@ public abstract class RaftMember {
   }
 
   @SuppressWarnings("java:S2445")
-  void commitLog(Log log) throws LogExecutionException {
+  protected void commitLog(Log log) throws LogExecutionException {
     long startTime =
         Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.getOperationStartTime();
-    synchronized (logManager) {
-      Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
-          startTime);
-
-      startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
-      logManager.commitTo(log.getCurrLogIndex());
+    if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+      synchronized (logManager) {
+        if (log.getCurrLogIndex() > logManager.getCommitLogIndex()) {
+          Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT.calOperationCostTimeFromStart(
+              startTime);
+
+          startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
+          logManager.commitTo(log.getCurrLogIndex());
+        }
+      }
+      Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
     }
-    Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
     if (ENABLE_COMMIT_RETURN) {
       return;
     }
@@ -1701,7 +1716,7 @@ public abstract class RaftMember {
       while (!log.isApplied()) {
         // wait until the log is applied
         try {
-          log.wait(5);
+          log.wait(0);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new LogExecutionException(e);
@@ -1742,7 +1757,7 @@ public abstract class RaftMember {
     return tsStatus;
   }
 
-  AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
+  protected AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
     AppendEntryRequest request = new AppendEntryRequest();
     request.setTerm(term.get());
     if (serializeNow) {
@@ -2249,7 +2264,7 @@ public abstract class RaftMember {
     return allNodes.getId();
   }
 
-  enum AppendLogResult {
+  protected enum AppendLogResult {
     OK,
     TIME_OUT,
     LEADERSHIP_STALE,
@@ -2293,4 +2308,14 @@ public abstract class RaftMember {
   public void setSkipElection(boolean skipElection) {
     this.skipElection = skipElection;
   }
+
+  protected boolean containsNode(Node node) {
+    for (Node localNode : allNodes) {
+      if ((localNode.getInternalIp().equals(node.getInternalIp())
+          && localNode.getMetaPort() == node.getMetaPort())) {
+        return true;
+      }
+    }
+    return false;
+  }
 }