You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/08/27 01:40:03 UTC

[iotdb] branch expr updated: fix multiple issues

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new f00f6c6  fix multiple issues
f00f6c6 is described below

commit f00f6c65fc999f72d4cc519a88701fa59f736ad5
Author: jt <jt...@163.com>
AuthorDate: Fri Aug 27 09:39:28 2021 +0800

    fix multiple issues
---
 cluster/distribute.sh                              |   8 +-
 cluster/src/assembly/resources/sbin/expr-bench.sh  |  16 ++++
 cluster/src/assembly/resources/sbin/expr-server.sh |  16 ++++
 .../org/apache/iotdb/cluster/expr/ExprMember.java  | 104 ++++++++++++---------
 .../apache/iotdb/cluster/expr/VotingLogList.java   |  20 ++--
 .../org/apache/iotdb/cluster/log/VotingLog.java    |  15 +++
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |  11 +--
 .../handlers/caller/AppendNodeEntryHandler.java    |  74 +++++++--------
 .../server/handlers/caller/HeartbeatHandler.java   |   2 +-
 .../server/handlers/caller/LogCatchUpHandler.java  |   4 +-
 .../cluster/server/member/DataGroupMember.java     |   4 +-
 .../cluster/server/member/MetaGroupMember.java     |   7 +-
 .../iotdb/cluster/server/member/RaftMember.java    |  37 ++++++--
 .../apache/iotdb/cluster/server/monitor/Timer.java |   3 +-
 .../apache/iotdb/db/qp/physical/sys/ExprPlan.java  |   9 ++
 15 files changed, 211 insertions(+), 119 deletions(-)

diff --git a/cluster/distribute.sh b/cluster/distribute.sh
index 7981466..8c70e27 100644
--- a/cluster/distribute.sh
+++ b/cluster/distribute.sh
@@ -1,7 +1,7 @@
-src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/*
 
-ips=(fit35 fit36 fit38 fit39)
-target_lib_path=/data/iotdb_expr/lib/
+ips=(fit36 fit38 fit39)
+target_lib_path=/data/iotdb_expr/lib
 
 for ip in ${ips[*]}
   do
@@ -10,7 +10,7 @@ for ip in ${ips[*]}
   done
 
 ips=(fit31 fit33 fit34)
-target_lib_path=/disk/iotdb_expr/lib/
+target_lib_path=/disk/iotdb_expr/lib
 
 for ip in ${ips[*]}
   do
diff --git a/cluster/src/assembly/resources/sbin/expr-bench.sh b/cluster/src/assembly/resources/sbin/expr-bench.sh
index d4fa092..dc2bc46 100644
--- a/cluster/src/assembly/resources/sbin/expr-bench.sh
+++ b/cluster/src/assembly/resources/sbin/expr-bench.sh
@@ -33,6 +33,22 @@ if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
   shift
 fi
 
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+    if [ $enable_printgc == "true" ]; then
+      . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+    else
+        . "$IOTDB_CONF/iotdb-env.sh"
+    fi
+elif [ -f "${IOTDB_HOME}/conf/iotdb-env.sh" ]; then
+    if [ $enable_printgc == "true" ]; then
+      . "${IOTDB_HOME}/conf/iotdb-env.sh" "printgc"
+    else
+      . "${IOTDB_HOME}/conf/iotdb-env.sh"
+    fi
+else
+    echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
 if [ -n "$JAVA_HOME" ]; then
     for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
         if [ -x "$java" ]; then
diff --git a/cluster/src/assembly/resources/sbin/expr-server.sh b/cluster/src/assembly/resources/sbin/expr-server.sh
index f323d0b..12d9dad 100644
--- a/cluster/src/assembly/resources/sbin/expr-server.sh
+++ b/cluster/src/assembly/resources/sbin/expr-server.sh
@@ -33,6 +33,22 @@ if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
   shift
 fi
 
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+    if [ $enable_printgc == "true" ]; then
+      . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+    else
+        . "$IOTDB_CONF/iotdb-env.sh"
+    fi
+elif [ -f "${IOTDB_HOME}/conf/iotdb-env.sh" ]; then
+    if [ $enable_printgc == "true" ]; then
+      . "${IOTDB_HOME}/conf/iotdb-env.sh" "printgc"
+    else
+      . "${IOTDB_HOME}/conf/iotdb-env.sh"
+    fi
+else
+    echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
 if [ -n "$JAVA_HOME" ]; then
     for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
         if [ -x "$java" ]; then
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 3c6189b..0a061db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -20,9 +20,12 @@
 package org.apache.iotdb.cluster.expr;
 
 import java.nio.ByteBuffer;
-import java.sql.Time;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
@@ -42,39 +45,35 @@ import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ExprMember extends MetaGroupMember {
 
+  private static final Logger logger = LoggerFactory.getLogger(ExprMember.class);
+  private static final ExecutorService bypassPool = Executors.newCachedThreadPool();
   public static boolean bypassRaft = false;
   public static boolean useSlidingWindow = false;
 
-  private VotingLogList votingLogList;
-
   private int windowSize = 10000;
   private Log[] logWindow = new Log[windowSize];
-  private long[] prevIndices = new long[windowSize];
+  private long firstPosPrevIndex = 0;
   private long[] prevTerms = new long[windowSize];
 
-
   public ExprMember() {
   }
 
   public ExprMember(Node thisNode, List<Node> allNodes) {
     this.thisNode = thisNode;
     this.allNodes = allNodes;
-    this.votingLogList = new VotingLogList(allNodes.size()/2 + 1);
   }
 
   public ExprMember(TProtocolFactory factory,
       Node thisNode, Coordinator coordinator)
       throws QueryProcessException {
     super(factory, thisNode, coordinator);
-  }
-
-  @Override
-  public void setAllNodes(List<Node> allNodes) {
-    super.setAllNodes(allNodes);
-    this.votingLogList = new VotingLogList(allNodes.size()/2 + 1);
+    this.firstPosPrevIndex = logManager.getLastLogIndex();
+    this.prevTerms[0] = logManager.getLastLogTerm();
   }
 
   @Override
@@ -84,37 +83,50 @@ public class ExprMember extends MetaGroupMember {
 
   @Override
   public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
-    if (bypassRaft) {
-      if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) {
-        return StatusUtils.OK;
-      } else if (plan instanceof ExprPlan) {
-        ((ExprPlan) plan).setNeedForward(false);
-      }
+    try {
+      if (bypassRaft) {
+        int bufferSize = 4096;
+        if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) {
+          return StatusUtils.OK;
+        } else if (plan instanceof ExprPlan) {
+          ((ExprPlan) plan).setNeedForward(false);
+          bufferSize += ((ExprPlan) plan).getWorkload().length;
+        }
 
-      ExecutNonQueryReq req = new ExecutNonQueryReq();
-      ByteBuffer byteBuffer = ByteBuffer.allocate(128 * 1024);
-      plan.serialize(byteBuffer);
-      byteBuffer.flip();
-      req.setPlanBytes(byteBuffer);
-
-      for (Node node : getAllNodes()) {
-        if (!ClusterUtils.isNodeEquals(node, thisNode)) {
-          Client syncClient = getSyncClient(node);
-          try {
-            long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG
-                .getOperationStartTime();
-            syncClient.executeNonQueryPlan(req);
-            Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
-          } catch (TException e) {
-            ClientUtils.putBackSyncClient(syncClient);
-            return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
+        ExecutNonQueryReq req = new ExecutNonQueryReq();
+        ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
+        plan.serialize(byteBuffer);
+        byteBuffer.flip();
+        req.setPlanBytes(byteBuffer);
+        List<Future> futures = new ArrayList<>();
+        for (Node node : getAllNodes()) {
+          if (!ClusterUtils.isNodeEquals(node, thisNode)) {
+            futures.add(bypassPool.submit(() -> {
+              Client syncClient = getSyncClient(node);
+              try {
+                long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG
+                    .getOperationStartTime();
+                syncClient.executeNonQueryPlan(req);
+                Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+              } catch (TException e) {
+                ClientUtils.putBackSyncClient(syncClient);
+                return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
+              }
+              ClientUtils.putBackSyncClient(syncClient);
+              return null;
+            }));
           }
-          ClientUtils.putBackSyncClient(syncClient);
         }
+        for (Future future : futures) {
+          future.get();
+        }
+        return StatusUtils.OK;
       }
-      return StatusUtils.OK;
+      return processNonPartitionedMetaPlan(plan);
+    } catch (Exception e) {
+      logger.error("Exception in processing plan", e);
+      return StatusUtils.INTERNAL_ERROR.deepCopy().setMessage(e.getMessage());
     }
-    return processNonPartitionedMetaPlan(plan);
   }
 
   /**
@@ -130,11 +142,10 @@ public class ExprMember extends MetaGroupMember {
 
   private void checkLogPrev(int pos) {
     // check the previous entry
-    long prevLogIndex = prevIndices[pos];
     long prevLogTerm = prevTerms[pos];
     if (pos > 0) {
       Log prev = logWindow[pos - 1];
-      if (prev != null && (prev.getCurrLogIndex() != prevLogIndex || prev.getCurrLogTerm() != prevLogTerm)) {
+      if (prev != null && prev.getCurrLogTerm() != prevLogTerm) {
         logWindow[pos - 1] = null;
       }
     }
@@ -145,9 +156,8 @@ public class ExprMember extends MetaGroupMember {
     Log log = logWindow[pos];
     boolean nextMismatch = false;
     if (pos < windowSize - 1) {
-      long nextPrevIndex = prevIndices[pos + 1];
       long nextPrevTerm = prevTerms[pos + 1];
-      if (!(nextPrevIndex != log.getCurrLogIndex() || nextPrevTerm != log.getCurrLogTerm())) {
+      if (nextPrevTerm != log.getCurrLogTerm()) {
         nextMismatch = true;
       }
     }
@@ -170,7 +180,7 @@ public class ExprMember extends MetaGroupMember {
    * @return
    */
   private long flushWindow(AppendEntryResult result, long leaderCommit) {
-    long windowPrevLogIndex = prevIndices[0];
+    long windowPrevLogIndex = firstPosPrevIndex;
     long windowPrevLogTerm = prevTerms[0];
 
     int flushPos = 0;
@@ -179,18 +189,21 @@ public class ExprMember extends MetaGroupMember {
         break;
       }
     }
+
     // flush [0, flushPos)
     List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
     long success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit,
         logs);
     if (success != -1) {
       System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - flushPos);
+      System.arraycopy(prevTerms, flushPos, prevTerms, 0, windowSize - flushPos);
       for (int i = 1; i <= flushPos; i++) {
         logWindow[windowSize - i] = null;
       }
     }
+    firstPosPrevIndex = logManager.getLastLogIndex();
     result.status = Response.RESPONSE_STRONG_ACCEPT;
-    result.setLastLogIndex(logManager.getLastLogIndex());
+    result.setLastLogIndex(firstPosPrevIndex);
     result.setLastLogTerm(logManager.getLastLogTerm());
     return success;
   }
@@ -200,6 +213,7 @@ public class ExprMember extends MetaGroupMember {
     if (!useSlidingWindow) {
       return super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
     }
+
     long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     long appendedPos = 0;
 
@@ -215,10 +229,8 @@ public class ExprMember extends MetaGroupMember {
       } else if (windowPos < windowSize) {
         // the new entry falls into the window
         logWindow[windowPos] = log;
-        prevIndices[windowPos] = prevLogIndex;
         prevTerms[windowPos] = prevLogTerm;
         checkLog(windowPos);
-
         if (windowPos == 0) {
           appendedPos = flushWindow(result, leaderCommit);
         } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 695d37b..99f456a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.cluster.log.VotingLog;
 
 public class VotingLogList {
 
-  private List<ExprVotingLog> logList = new ArrayList<>();
+  private List<VotingLog> logList = new ArrayList<>();
   private volatile long currTerm = -1;
   private int quorumSize;
 
@@ -39,7 +39,7 @@ public class VotingLogList {
    *
    * @param log
    */
-  public synchronized void insert(ExprVotingLog log) {
+  public synchronized void insert(VotingLog log) {
     if (log.getLog().getCurrLogTerm() != currTerm) {
       logList.clear();
       currTerm = log.getLog().getCurrLogTerm();
@@ -58,10 +58,10 @@ public class VotingLogList {
    * @param acceptingNodeId
    * @return the lastly removed entry if any.
    */
-  public synchronized VotingLog onStronglyAccept(long index, long term, int acceptingNodeId) {
+  public synchronized void onStronglyAccept(long index, long term, int acceptingNodeId) {
     int lastEntryIndexToCommit = -1;
     for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
-      ExprVotingLog votingLog = logList.get(i);
+      VotingLog votingLog = logList.get(i);
       if (votingLog.getLog().getCurrLogIndex() <= index
           && votingLog.getLog().getCurrLogTerm() == term) {
         votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
@@ -71,13 +71,15 @@ public class VotingLogList {
       }
     }
 
-    VotingLog ret = null;
     if (lastEntryIndexToCommit != -1) {
-      ret = logList.get(lastEntryIndexToCommit);
-      logList.subList(0, lastEntryIndexToCommit + 1).clear();
+      List<VotingLog> acceptedLogs = logList.subList(0, lastEntryIndexToCommit + 1);
+      for (VotingLog acceptedLog : acceptedLogs) {
+        synchronized (acceptedLog) {
+          acceptedLog.notifyAll();
+        }
+      }
+      acceptedLogs.clear();
     }
-
-    return ret;
   }
 
   public synchronized void clear() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index da9f4af..dc2d9b0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -25,10 +25,12 @@ import java.util.Set;
 public class VotingLog {
   protected Log log;
   protected Set<Integer> stronglyAcceptedNodeIds;
+  protected Set<Integer> weaklyAcceptedNodeIds;
 
   public VotingLog(Log log, int groupSize) {
     this.log = log;
     stronglyAcceptedNodeIds = new HashSet<>(groupSize);
+    weaklyAcceptedNodeIds = new HashSet<>(groupSize);
   }
 
   public Log getLog() {
@@ -46,4 +48,17 @@ public class VotingLog {
   public void setStronglyAcceptedNodeIds(Set<Integer> stronglyAcceptedNodeIds) {
     this.stronglyAcceptedNodeIds = stronglyAcceptedNodeIds;
   }
+
+  public Set<Integer> getWeaklyAcceptedNodeIds() {
+    return weaklyAcceptedNodeIds;
+  }
+
+  public void setWeaklyAcceptedNodeIds(Set<Integer> weaklyAcceptedNodeIds) {
+    this.weaklyAcceptedNodeIds = weaklyAcceptedNodeIds;
+  }
+
+  @Override
+  public String toString() {
+    return log.toString();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 1472b12..2df2adc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -83,12 +83,11 @@ public class CatchUpTask implements Runnable {
     long localFirstIndex = 0;
     try {
       // to avoid snapshot catch up when index is volatile
-      synchronized (raftMember.getLogManager()) {
-        localFirstIndex = raftMember.getLogManager().getFirstIndex();
-        lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1);
-        hi = raftMember.getLogManager().getLastLogIndex() + 1;
-        logs = raftMember.getLogManager().getEntries(lo, hi);
-      }
+      localFirstIndex = raftMember.getLogManager().getFirstIndex();
+      lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1);
+      hi = raftMember.getLogManager().getLastLogIndex() + 1;
+      logs = raftMember.getLogManager().getEntries(lo, hi);
+
       // this may result from peer's match index being changed concurrently, making the peer
       // actually catch up now
       if (logger.isInfoEnabled()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index bf87703..aa817d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.cluster.server.handlers.caller;
 
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_WEAK_ACCEPT;
 
 import java.net.ConnectException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -80,50 +81,43 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
       // someone has rejected this log because the leadership is stale
       return;
     }
+
     long resp = response.status;
-    synchronized (log) {
-      if (resp == RESPONSE_STRONG_ACCEPT) {
-        log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
-        int remaining = quorumSize - log.getStronglyAcceptedNodeIds().size();
-        logger.debug(
-            "{}: Received an agreement from {} for {}, remaining votes to succeed: {}",
-            member.getName(),
-            receiver,
-            log,
-            remaining);
-        if (remaining == 0) {
-          logger.debug(
-              "{}: Log [{}] {} is accepted by the quorum",
-              member.getName(),
-              log.getLog(),
-              log);
-          log.notifyAll();
-        }
-        peer.setMatchIndex(Math.max(log.getLog().getCurrLogIndex(), peer.getMatchIndex()));
-      } else if (resp > 0) {
-        // a response > 0 is the follower's term
-        // the leader ship is stale, wait for the new leader's heartbeat
-        long prevReceiverTerm = receiverTerm.get();
-        logger.debug(
-            "{}: Received a rejection from {} because term is stale: {}/{}",
-            member.getName(),
-            receiver,
-            prevReceiverTerm,
-            resp);
-        if (resp > prevReceiverTerm) {
-          receiverTerm.set(resp);
-        }
-        leaderShipStale.set(true);
+
+    if (resp == RESPONSE_STRONG_ACCEPT) {
+      member.getVotingLogList().onStronglyAccept(log.getLog().getCurrLogIndex(),
+          log.getLog().getCurrLogTerm(), receiver.nodeIdentifier);
+      peer.setMatchIndex(Math.max(log.getLog().getCurrLogIndex(), peer.getMatchIndex()));
+    } else if (resp > 0) {
+      // a response > 0 is the follower's term
+      // the leader ship is stale, wait for the new leader's heartbeat
+      long prevReceiverTerm = receiverTerm.get();
+      logger.debug(
+          "{}: Received a rejection from {} because term is stale: {}/{}",
+          member.getName(),
+          receiver,
+          prevReceiverTerm,
+          resp);
+      if (resp > prevReceiverTerm) {
+        receiverTerm.set(resp);
+      }
+      leaderShipStale.set(true);
+      synchronized (log) {
+        log.notifyAll();
+      }
+    } else if (resp == RESPONSE_WEAK_ACCEPT) {
+      synchronized (log) {
+        log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
         log.notifyAll();
-      } else {
-        // e.g., Response.RESPONSE_LOG_MISMATCH
-        logger.debug(
-            "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
-        onFail();
       }
-      // rejected because the receiver's logs are stale or the receiver has no cluster info, just
-      // wait for the heartbeat to handle
+    } else {
+      // e.g., Response.RESPONSE_LOG_MISMATCH
+      logger.debug(
+          "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+      onFail();
     }
+    // rejected because the receiver's logs are stale or the receiver has no cluster info, just
+    // wait for the heartbeat to handle
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 6d95e06..de0b83f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -111,7 +111,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
       if (lastLogIdx == peer.getLastHeartBeatIndex()) {
         // the follower's lastLogIndex is unchanged, increase inconsistent counter
         int inconsistentNum = peer.incInconsistentHeartbeatNum();
-        if (inconsistentNum >= 10000) {
+        if (inconsistentNum >= 5) {
           logger.info(
               "{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
               memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
index eeda328..62933a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
@@ -32,6 +32,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
 import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_WEAK_ACCEPT;
 
 /**
  * LogCatchUpHandler checks the result of appending a log in a catch-up task and decides to abort
@@ -51,7 +53,7 @@ public class LogCatchUpHandler implements AsyncMethodCallback<AppendEntryResult>
   public void onComplete(AppendEntryResult response) {
     logger.debug("{}: Received a catch-up result of {} from {}", memberName, log, follower);
     long resp = response.status;
-    if (resp == RESPONSE_AGREE) {
+    if (resp == RESPONSE_AGREE || resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_WEAK_ACCEPT) {
       synchronized (appendSucceed) {
         appendSucceed.set(true);
         appendSucceed.notifyAll();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 49acf81..484f5f6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -171,7 +171,7 @@ public class DataGroupMember extends RaftMember {
         new AsyncClientPool(new SingleManagerFactory(factory)));
     this.thisNode = thisNode;
     this.metaGroupMember = metaGroupMember;
-    allNodes = nodes;
+    setAllNodes(nodes);
     setQueryManager(new ClusterQueryManager());
     slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir());
     LogApplier applier = new DataLogApplier(metaGroupMember, this);
@@ -792,7 +792,7 @@ public class DataGroupMember extends RaftMember {
     synchronized (allNodes) {
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
-        allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+        setAllNodes(metaGroupMember.getPartitionTable().getHeaderGroup(getHeader()));
         initPeerMap();
         if (removedNode.equals(leader.get())) {
           // if the leader is removed, also start an election immediately
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 6bffeda..061a5d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.expr.VotingLogList;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
@@ -268,7 +269,7 @@ public class MetaGroupMember extends RaftMember {
         new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
         new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
-    allNodes = new ArrayList<>();
+    setAllNodes(new ArrayList<>());
     initPeerMap();
 
     dataClientProvider = new DataClientProvider(factory);
@@ -685,7 +686,7 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private void updateNodeList(Collection<Node> nodes) {
-    allNodes = new ArrayList<>(nodes);
+    setAllNodes(new ArrayList<>(nodes));
     initPeerMap();
     logger.info("All nodes in the partition table: {}", allNodes);
     initIdNodeMap();
@@ -714,7 +715,7 @@ public class MetaGroupMember extends RaftMember {
         // leader through the first heartbeat. After the leader knows the node information of all
         // nodes, it can replace the incomplete node information previously saved locally, and build
         // partitionTable to send it to other followers.
-        allNodes = new ArrayList<>(idNodeMap.values());
+        setAllNodes(new ArrayList<>(idNodeMap.values()));
         if (partitionTable == null) {
           partitionTable = new SlotPartitionTable(allNodes, thisNode);
           logger.info("Partition table is set up");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f78e3d9..b2a8442 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.expr.VotingLogList;
 import org.apache.iotdb.cluster.log.CommitLogCallback;
 import org.apache.iotdb.cluster.log.CommitLogTask;
 import org.apache.iotdb.cluster.log.HardState;
@@ -261,6 +262,8 @@ public abstract class RaftMember {
    */
   protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers = new ConcurrentHashMap<>();
 
+  protected VotingLogList votingLogList;
+
   protected RaftMember() {
   }
 
@@ -772,6 +775,7 @@ public abstract class RaftMember {
 
   public void setAllNodes(List<Node> allNodes) {
     this.allNodes = allNodes;
+    this.votingLogList = new VotingLogList(allNodes.size()/2 + 1);
   }
 
   public Map<Node, Long> getLastCatchUpResponseTime() {
@@ -1132,6 +1136,8 @@ public abstract class RaftMember {
       log.setCreateTime(System.nanoTime());
       getLogDispatcher().offer(sendLogRequest);
       Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+
+      votingLogList.insert(sendLogRequest.getVotingLog());
     }
 
     try {
@@ -1144,6 +1150,10 @@ public abstract class RaftMember {
       Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
           sendLogRequest.getVotingLog().getLog().getCreateTime());
       switch (appendLogResult) {
+        case WEAK_ACCEPT:
+          // TODO: change to weak
+          Statistic.RAFT_WEAK_ACCEPT.add(1);
+          return StatusUtils.OK;
         case OK:
           logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
           startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
@@ -1541,9 +1551,10 @@ public abstract class RaftMember {
     synchronized (log) {
       long waitStart = System.currentTimeMillis();
       long alreadyWait = 0;
-      while (log.getStronglyAcceptedNodeIds().size() >= quorumSize
+      while (log.getStronglyAcceptedNodeIds().size() < quorumSize
           && alreadyWait < RaftServer.getWriteOperationTimeoutMS()
-          && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+          && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)
+          && log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size() < quorumSize) {
         try {
           log.wait(RaftServer.getWriteOperationTimeoutMS());
         } catch (InterruptedException e) {
@@ -1566,10 +1577,16 @@ public abstract class RaftMember {
     }
 
     // cannot get enough agreements within a certain amount of time
-    if (log.getStronglyAcceptedNodeIds().size() < quorumSize) {
+    if (log.getStronglyAcceptedNodeIds().size() < quorumSize
+        && (log.getStronglyAcceptedNodeIds().size() + log.getWeaklyAcceptedNodeIds().size()) < quorumSize) {
       return AppendLogResult.TIME_OUT;
     }
 
+    if (log.getStronglyAcceptedNodeIds().size() < quorumSize
+        && (log.getStronglyAcceptedNodeIds().size() + log.getWeaklyAcceptedNodeIds().size()) >= quorumSize) {
+      return AppendLogResult.WEAK_ACCEPT;
+    }
+
     // voteCounter has counted down to zero
     return AppendLogResult.OK;
   }
@@ -1647,6 +1664,7 @@ public abstract class RaftMember {
     // logManager, the follower can handle the larger commitIndex with no effect
     request.setLeaderCommit(logManager.getCommitLogIndex());
     request.setPrevLogIndex(log.getCurrLogIndex() - 1);
+    request.setIsFromLeader(true);
     try {
       request.setPrevLogTerm(logManager.getTerm(log.getCurrLogIndex() - 1));
     } catch (Exception e) {
@@ -1913,7 +1931,7 @@ public abstract class RaftMember {
         && alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
       synchronized (peer) {
         try {
-          peer.wait(RaftServer.getWriteOperationTimeoutMS());
+          peer.wait(1000);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           logger.warn("Waiting for peer to catch up interrupted");
@@ -1939,12 +1957,14 @@ public abstract class RaftMember {
               , quorumSize);
       try {
         logger.debug("{} sending a log to {}: {}", name, node, log);
+        long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
         AppendEntryResult result;
         if (indirectReceivers == null || indirectReceivers.isEmpty()) {
           result = client.appendEntry(request);
         } else {
           result = client.appendEntryIndirect(request, indirectReceivers);
         }
+        Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
 
         handler.onComplete(result);
       } catch (TException e) {
@@ -2051,11 +2071,11 @@ public abstract class RaftMember {
   protected long checkPrevLogIndex(long prevLogIndex) {
     long lastLogIndex = logManager.getLastLogIndex();
     long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
+    Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
     if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
       // there are logs missing between the incoming log and the local last log, and such logs
       // did not come within a timeout, report a mismatch to the sender and it shall fix this
       // through catch-up
-      Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
       return Response.RESPONSE_LOG_MISMATCH;
     }
     Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
@@ -2143,7 +2163,8 @@ public abstract class RaftMember {
   enum AppendLogResult {
     OK,
     TIME_OUT,
-    LEADERSHIP_STALE
+    LEADERSHIP_STALE,
+    WEAK_ACCEPT
   }
 
   /**
@@ -2167,4 +2188,8 @@ public abstract class RaftMember {
   public void removeAppendLogHandler(Pair<Long, Long> indexTerm) {
     sentLogHandlers.remove(indexTerm);
   }
+
+  public VotingLogList getVotingLogList() {
+    return votingLogList;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index f4ac980..7ecca9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -237,7 +237,8 @@ public class Timer {
         "from create to end",
         TIME_SCALE,
         true,
-        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP);
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT);
 
     String className;
     String blockName;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
index 2a1e16b..e3cb3a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
@@ -81,4 +81,13 @@ public class ExprPlan extends PhysicalPlan {
   public void setNeedForward(boolean needForward) {
     this.needForward = needForward;
   }
+
+  public byte[] getWorkload() {
+    return workload;
+  }
+
+  @Override
+  public String toString() {
+    return "ExprPlan";
+  }
 }