You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 06:41:50 UTC

[iotdb] branch expr updated: extract log appender

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new bff6dcd  extract log appender
bff6dcd is described below

commit bff6dcdd6b3c3346deab5110f195229ba6331e8d
Author: jt <jt...@163.com>
AuthorDate: Wed Dec 1 14:41:10 2021 +0800

    extract log appender
---
 .../cluster/log/appender/BlockingLogAppender.java  | 182 +++++++++++++++++++++
 .../iotdb/cluster/log/appender/LogAppender.java    |  38 +++++
 .../cluster/log/appender/LogAppenderFactory.java   |  27 +++
 .../appender/SlidingWindowLogAppender.java}        |  39 +++--
 .../iotdb/cluster/server/member/RaftMember.java    | 165 ++++---------------
 5 files changed, 299 insertions(+), 152 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
new file mode 100644
index 0000000..e93969c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.appender;
+
+import java.util.List;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockingLogAppender wait for a certain amount of time when it receives out-of-order entries
+ * (entries with indices larger than local last entry's index + 1), if the local log is updated
+ * during the waiting and the received entries are now appendable, it appends them normally.
+ * Otherwise, a LOG_MISMATCH is reported to the leader.
+ */
+public class BlockingLogAppender implements LogAppender {
+
+  private static final Logger logger = LoggerFactory.getLogger(BlockingLogAppender.class);
+
+  private RaftMember member;
+  private RaftLogManager logManager;
+
+  public BlockingLogAppender(RaftMember member) {
+    this.member = member;
+    this.logManager = member.getLogManager();
+  }
+
+  /**
+   * Find the local previous log of "log". If such log is found, discard all local logs behind it
+   * and append "log" to it. Otherwise report a log mismatch.
+   *
+   * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   */
+  public AppendEntryResult appendEntry(
+      long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+    long resp = checkPrevLogIndex(prevLogIndex);
+    if (resp != Response.RESPONSE_AGREE) {
+      return new AppendEntryResult(resp).setHeader(member.getHeader());
+    }
+
+    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+    long success;
+    AppendEntryResult result = new AppendEntryResult();
+    synchronized (logManager) {
+      success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+      if (success != -1) {
+        result.setLastLogIndex(logManager.getLastLogIndex());
+        result.setLastLogTerm(logManager.getLastLogTerm());
+      }
+    }
+    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+    if (success != -1) {
+      logger.debug("{} append a new log {}", member.getName(), log);
+      result.status = Response.RESPONSE_STRONG_ACCEPT;
+    } else {
+      // the incoming log points to an illegal position, reject it
+      result.status = Response.RESPONSE_LOG_MISMATCH;
+    }
+    return result;
+  }
+
+  /**
+   * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+   */
+  private boolean waitForPrevLog(long prevLogIndex) {
+    long waitStart = System.currentTimeMillis();
+    long alreadyWait = 0;
+    Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
+    long lastLogIndex = logManager.getLastLogIndex();
+    Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
+    while (lastLogIndex < prevLogIndex
+        && alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS()) {
+      try {
+        // each time new logs are appended, this will be notified
+        synchronized (logUpdateCondition) {
+          logUpdateCondition.wait(1);
+        }
+        lastLogIndex = logManager.getLastLogIndex();
+        if (lastLogIndex >= prevLogIndex) {
+          return true;
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      alreadyWait = System.currentTimeMillis() - waitStart;
+    }
+
+    return alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS();
+  }
+
+  protected long checkPrevLogIndex(long prevLogIndex) {
+    long lastLogIndex = logManager.getLastLogIndex();
+    long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
+    if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
+      // there are logs missing between the incoming log and the local last log, and such logs
+      // did not come within a timeout, report a mismatch to the sender and it shall fix this
+      // through catch-up
+      return Response.RESPONSE_LOG_MISMATCH;
+    }
+    Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
+    return Response.RESPONSE_AGREE;
+  }
+
+  /**
+   * Find the local previous log of "log". If such log is found, discard all local logs behind it
+   * and append "log" to it. Otherwise report a log mismatch.
+   *
+   * @param logs append logs
+   * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   */
+  public AppendEntryResult appendEntries(
+      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+    logger.debug(
+        "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
+        member.getName(),
+        prevLogIndex,
+        prevLogTerm,
+        leaderCommit);
+    if (logs.isEmpty()) {
+      return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(member.getHeader());
+    }
+
+    long resp = checkPrevLogIndex(prevLogIndex);
+    if (resp != Response.RESPONSE_AGREE) {
+      return new AppendEntryResult(resp).setHeader(member.getHeader());
+    }
+
+    AppendEntryResult result = new AppendEntryResult();
+    synchronized (logManager) {
+      long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+      resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
+      Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+      if (resp != -1) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{} append a new log list {}, commit to {}", member.getName(), logs,
+              leaderCommit);
+        }
+        result.status = Response.RESPONSE_STRONG_ACCEPT;
+        result.setLastLogIndex(logManager.getLastLogIndex());
+        result.setLastLogTerm(logManager.getLastLogTerm());
+      } else {
+        // the incoming log points to an illegal position, reject it
+        result.status = Response.RESPONSE_LOG_MISMATCH;
+      }
+    }
+    return result;
+  }
+
+  public static class Factory implements LogAppenderFactory {
+
+    @Override
+    public LogAppender create(RaftMember member) {
+      return new BlockingLogAppender(member);
+    }
+  }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
new file mode 100644
index 0000000..f997b06
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.cluster.log.appender;
+
+import java.util.List;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+
+/**
+ * LogAppender appends newly incoming entries to the local log of a member, providing different
+ * policies for out-of-order entries and other cases.
+ */
+public interface LogAppender {
+
+  AppendEntryResult appendEntries(
+      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs);
+
+  AppendEntryResult appendEntry(
+      long prevLogIndex, long prevLogTerm, long leaderCommit, Log log);
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppenderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppenderFactory.java
new file mode 100644
index 0000000..1f1e83c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppenderFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.cluster.log.appender;
+
+import org.apache.iotdb.cluster.server.member.RaftMember;
+
+public interface LogAppenderFactory {
+  LogAppender create(RaftMember member);
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SWRaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
similarity index 90%
rename from cluster/src/main/java/org/apache/iotdb/cluster/expr/SWRaftMember.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index 52a99b9..928f960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SWRaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -17,21 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.expr;
+package org.apache.iotdb.cluster.log.appender;
 
-import org.apache.iotdb.cluster.client.ClientManager;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 
-import java.util.Arrays;
-import java.util.List;
-
-public abstract class SWRaftMember extends RaftMember {
+public class SlidingWindowLogAppender implements LogAppender {
 
   private int windowCapacity =
       ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem() * 2;
@@ -40,13 +39,12 @@ public abstract class SWRaftMember extends RaftMember {
   private long firstPosPrevIndex;
   private long[] prevTerms = new long[windowCapacity];
 
-  public SWRaftMember(String name, ClientManager clientManager) {
-    super(name, clientManager);
-  }
+  private RaftMember member;
+  private RaftLogManager logManager;
 
-  @Override
-  public void start() {
-    super.start();
+  public SlidingWindowLogAppender(RaftMember member) {
+    this.member = member;
+    this.logManager = member.getLogManager();
     this.firstPosPrevIndex = logManager.getLastLogIndex();
     this.prevTerms[0] = logManager.getLastLogTerm();
   }
@@ -135,10 +133,10 @@ public abstract class SWRaftMember extends RaftMember {
   }
 
   @Override
-  protected AppendEntryResult appendEntries(
+  public AppendEntryResult appendEntries(
       long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
     if (logs.isEmpty()) {
-      return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(getHeader());
+      return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(member.getHeader());
     }
 
     AppendEntryResult result = null;
@@ -158,9 +156,8 @@ public abstract class SWRaftMember extends RaftMember {
   }
 
   @Override
-  protected AppendEntryResult appendEntry(
+  public AppendEntryResult appendEntry(
       long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
-
     long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
     long appendedPos = 0;
 
@@ -191,7 +188,7 @@ public abstract class SWRaftMember extends RaftMember {
       } else {
         Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
         result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
-        result.setHeader(getHeader());
+        result.setHeader(member.getHeader());
         return result;
       }
     }
@@ -203,4 +200,12 @@ public abstract class SWRaftMember extends RaftMember {
     }
     return result;
   }
+
+  public static class Factory implements LogAppenderFactory {
+
+    @Override
+    public LogAppender create(RaftMember member) {
+      return new SlidingWindowLogAppender(member);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4c6bd8a..c822ad5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -41,6 +41,9 @@ import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.VotingLog;
 import org.apache.iotdb.cluster.log.VotingLogList;
+import org.apache.iotdb.cluster.log.appender.BlockingLogAppender;
+import org.apache.iotdb.cluster.log.appender.LogAppender;
+import org.apache.iotdb.cluster.log.appender.LogAppenderFactory;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
@@ -134,8 +137,8 @@ public abstract class RaftMember implements RaftMemberMBean {
   public static boolean USE_LOG_DISPATCHER = false;
   public static boolean USE_INDIRECT_LOG_DISPATCHER = false;
   public static boolean ENABLE_WEAK_ACCEPTANCE = true;
-  public static boolean ENABLE_COMMIT_RETURN = false;
 
+  private static final LogAppenderFactory APPENDER_FACTORY = new BlockingLogAppender.Factory();
   protected static final LogSequencerFactory SEQUENCER_FACTORY =
       ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing()
           ? new Factory()
@@ -253,7 +256,7 @@ public abstract class RaftMember implements RaftMemberMBean {
    * logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
    * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
    */
-  private LogDispatcher logDispatcher;
+  private volatile LogDispatcher logDispatcher;
 
   /** If this node can not be the leader, this parameter will be set true. */
   private volatile boolean skipElection = false;
@@ -271,6 +274,8 @@ public abstract class RaftMember implements RaftMemberMBean {
 
   protected LogSequencer logSequencer;
 
+  private volatile LogAppender logAppender;
+
   protected RaftMember() {}
 
   protected RaftMember(String name, ClientManager clientManager) {
@@ -329,6 +334,17 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
   }
 
+  public LogAppender getLogAppender() {
+    if (logAppender == null) {
+      synchronized (this) {
+        if (logAppender == null) {
+          logAppender = APPENDER_FACTORY.create(this);
+        }
+      }
+    }
+    return logAppender;
+  }
+
   /**
    * Stop the heartbeat thread and the catch-up thread pool. Calling the method twice does not
    * induce side effects.
@@ -548,7 +564,8 @@ public abstract class RaftMember implements RaftMemberMBean {
     Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
     AppendEntryResult result =
-        appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
+        getLogAppender()
+            .appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
     result.setHeader(request.getHeader());
 
     logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result.status);
@@ -646,7 +663,9 @@ public abstract class RaftMember implements RaftMemberMBean {
 
     Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
-    response = appendEntries(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+    response = getLogAppender()
+        .appendEntries(request.prevLogIndex, request.prevLogTerm, request.leaderCommit,
+            logs);
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{} AppendEntriesRequest of log size {} completed with result {}",
@@ -1021,12 +1040,11 @@ public abstract class RaftMember implements RaftMemberMBean {
     long waitedTime = 0;
     long localAppliedId;
 
-    if (fastFail) {
-      if (leaderCommitId - logManager.getMaxHaveAppliedCommitIndex() > config.getMaxSyncLogLag()) {
-        logger.info(
-            "{}: The raft log of this member is too backward to provide service directly.", name);
-        return false;
-      }
+    if (fastFail
+        && leaderCommitId - logManager.getMaxHaveAppliedCommitIndex() > config.getMaxSyncLogLag()) {
+      logger.info(
+          "{}: The raft log of this member is too backward to provide service directly.", name);
+      return false;
     }
 
     while (waitedTime < ClusterConstant.getSyncLeaderMaxWaitMs()) {
@@ -1605,7 +1623,7 @@ public abstract class RaftMember implements RaftMemberMBean {
     return term;
   }
 
-  public synchronized LogDispatcher getLogDispatcher() {
+  public LogDispatcher getLogDispatcher() {
     if (logDispatcher == null) {
       if (USE_INDIRECT_LOG_DISPATCHER) {
         logDispatcher = new IndirectLogDispatcher(this);
@@ -1717,9 +1735,7 @@ public abstract class RaftMember implements RaftMemberMBean {
       }
       Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
     }
-    if (ENABLE_COMMIT_RETURN) {
-      return;
-    }
+
     // when using async applier, the log here may not be applied. To return the execution
     // result, we must wait until the log is applied.
     startTime = Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.getOperationStartTime();
@@ -2130,127 +2146,6 @@ public abstract class RaftMember implements RaftMemberMBean {
   }
 
   /**
-   * Find the local previous log of "log". If such log is found, discard all local logs behind it
-   * and append "log" to it. Otherwise report a log mismatch.
-   *
-   * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
-   */
-  protected AppendEntryResult appendEntry(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
-    long resp = checkPrevLogIndex(prevLogIndex);
-    if (resp != Response.RESPONSE_AGREE) {
-      return new AppendEntryResult(resp).setHeader(getHeader());
-    }
-
-    long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
-    long success;
-    AppendEntryResult result = new AppendEntryResult();
-    synchronized (logManager) {
-      success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
-      if (success != -1) {
-        result.setLastLogIndex(logManager.getLastLogIndex());
-        result.setLastLogTerm(logManager.getLastLogTerm());
-      }
-    }
-    Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
-    if (success != -1) {
-      logger.debug("{} append a new log {}", name, log);
-      result.status = Response.RESPONSE_STRONG_ACCEPT;
-    } else {
-      // the incoming log points to an illegal position, reject it
-      result.status = Response.RESPONSE_LOG_MISMATCH;
-    }
-    return result;
-  }
-
-  /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
-  private boolean waitForPrevLog(long prevLogIndex) {
-    long waitStart = System.currentTimeMillis();
-    long alreadyWait = 0;
-    Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
-    long lastLogIndex = logManager.getLastLogIndex();
-    Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
-    while (lastLogIndex < prevLogIndex
-        && alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS()) {
-      try {
-        // each time new logs are appended, this will be notified
-        synchronized (logUpdateCondition) {
-          logUpdateCondition.wait(1);
-        }
-        lastLogIndex = logManager.getLastLogIndex();
-        if (lastLogIndex >= prevLogIndex) {
-          return true;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return false;
-      }
-      alreadyWait = System.currentTimeMillis() - waitStart;
-    }
-
-    return alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS();
-  }
-
-  protected long checkPrevLogIndex(long prevLogIndex) {
-    long lastLogIndex = logManager.getLastLogIndex();
-    long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
-    if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
-      // there are logs missing between the incoming log and the local last log, and such logs
-      // did not come within a timeout, report a mismatch to the sender and it shall fix this
-      // through catch-up
-      return Response.RESPONSE_LOG_MISMATCH;
-    }
-    Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
-    return Response.RESPONSE_AGREE;
-  }
-
-  /**
-   * Find the local previous log of "log". If such log is found, discard all local logs behind it
-   * and append "log" to it. Otherwise report a log mismatch.
-   *
-   * @param logs append logs
-   * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
-   *     .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
-   */
-  protected AppendEntryResult appendEntries(
-      long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
-    logger.debug(
-        "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
-        name,
-        prevLogIndex,
-        prevLogTerm,
-        leaderCommit);
-    if (logs.isEmpty()) {
-      return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(getHeader());
-    }
-
-    long resp = checkPrevLogIndex(prevLogIndex);
-    if (resp != Response.RESPONSE_AGREE) {
-      return new AppendEntryResult(resp).setHeader(getHeader());
-    }
-
-    AppendEntryResult result = new AppendEntryResult();
-    synchronized (logManager) {
-      long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
-      resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
-      Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
-      if (resp != -1) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("{} append a new log list {}, commit to {}", name, logs, leaderCommit);
-        }
-        result.status = Response.RESPONSE_STRONG_ACCEPT;
-        result.setLastLogIndex(logManager.getLastLogIndex());
-        result.setLastLogTerm(logManager.getLastLogTerm());
-      } else {
-        // the incoming log points to an illegal position, reject it
-        result.status = Response.RESPONSE_LOG_MISMATCH;
-      }
-    }
-    return result;
-  }
-
-  /**
    * Check the term of the AppendEntryRequest. The term checked is the term of the leader, not the
    * term of the log. A new leader can still send logs of old leaders.
    *