You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/01 06:41:50 UTC
[iotdb] branch expr updated: extract log appender
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new bff6dcd extract log appender
bff6dcd is described below
commit bff6dcdd6b3c3346deab5110f195229ba6331e8d
Author: jt <jt...@163.com>
AuthorDate: Wed Dec 1 14:41:10 2021 +0800
extract log appender
---
.../cluster/log/appender/BlockingLogAppender.java | 182 +++++++++++++++++++++
.../iotdb/cluster/log/appender/LogAppender.java | 38 +++++
.../cluster/log/appender/LogAppenderFactory.java | 27 +++
.../appender/SlidingWindowLogAppender.java} | 39 +++--
.../iotdb/cluster/server/member/RaftMember.java | 165 ++++---------------
5 files changed, 299 insertions(+), 152 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
new file mode 100644
index 0000000..e93969c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/BlockingLogAppender.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log.appender;
+
+import java.util.List;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockingLogAppender wait for a certain amount of time when it receives out-of-order entries
+ * (entries with indices larger than local last entry's index + 1), if the local log is updated
+ * during the waiting and the received entries are now appendable, it appends them normally.
+ * Otherwise, a LOG_MISMATCH is reported to the leader.
+ */
+public class BlockingLogAppender implements LogAppender {
+
+ private static final Logger logger = LoggerFactory.getLogger(BlockingLogAppender.class);
+
+ private RaftMember member;
+ private RaftLogManager logManager;
+
+ public BlockingLogAppender(RaftMember member) {
+ this.member = member;
+ this.logManager = member.getLogManager();
+ }
+
+ /**
+ * Find the local previous log of "log". If such log is found, discard all local logs behind it
+ * and append "log" to it. Otherwise report a log mismatch.
+ *
+ * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ */
+ public AppendEntryResult appendEntry(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+ long resp = checkPrevLogIndex(prevLogIndex);
+ if (resp != Response.RESPONSE_AGREE) {
+ return new AppendEntryResult(resp).setHeader(member.getHeader());
+ }
+
+ long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+ long success;
+ AppendEntryResult result = new AppendEntryResult();
+ synchronized (logManager) {
+ success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+ if (success != -1) {
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
+ }
+ }
+ Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+ if (success != -1) {
+ logger.debug("{} append a new log {}", member.getName(), log);
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ } else {
+ // the incoming log points to an illegal position, reject it
+ result.status = Response.RESPONSE_LOG_MISMATCH;
+ }
+ return result;
+ }
+
+ /**
+ * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+ */
+ private boolean waitForPrevLog(long prevLogIndex) {
+ long waitStart = System.currentTimeMillis();
+ long alreadyWait = 0;
+ Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
+ long lastLogIndex = logManager.getLastLogIndex();
+ Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
+ while (lastLogIndex < prevLogIndex
+ && alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS()) {
+ try {
+ // each time new logs are appended, this will be notified
+ synchronized (logUpdateCondition) {
+ logUpdateCondition.wait(1);
+ }
+ lastLogIndex = logManager.getLastLogIndex();
+ if (lastLogIndex >= prevLogIndex) {
+ return true;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ alreadyWait = System.currentTimeMillis() - waitStart;
+ }
+
+ return alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS();
+ }
+
+ protected long checkPrevLogIndex(long prevLogIndex) {
+ long lastLogIndex = logManager.getLastLogIndex();
+ long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
+ if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
+ // there are logs missing between the incoming log and the local last log, and such logs
+ // did not come within a timeout, report a mismatch to the sender and it shall fix this
+ // through catch-up
+ return Response.RESPONSE_LOG_MISMATCH;
+ }
+ Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
+ return Response.RESPONSE_AGREE;
+ }
+
+ /**
+ * Find the local previous log of "log". If such log is found, discard all local logs behind it
+ * and append "log" to it. Otherwise report a log mismatch.
+ *
+ * @param logs append logs
+ * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ */
+ public AppendEntryResult appendEntries(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
+ logger.debug(
+ "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
+ member.getName(),
+ prevLogIndex,
+ prevLogTerm,
+ leaderCommit);
+ if (logs.isEmpty()) {
+ return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(member.getHeader());
+ }
+
+ long resp = checkPrevLogIndex(prevLogIndex);
+ if (resp != Response.RESPONSE_AGREE) {
+ return new AppendEntryResult(resp).setHeader(member.getHeader());
+ }
+
+ AppendEntryResult result = new AppendEntryResult();
+ synchronized (logManager) {
+ long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+ resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
+ Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+ if (resp != -1) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} append a new log list {}, commit to {}", member.getName(), logs,
+ leaderCommit);
+ }
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogTerm(logManager.getLastLogTerm());
+ } else {
+ // the incoming log points to an illegal position, reject it
+ result.status = Response.RESPONSE_LOG_MISMATCH;
+ }
+ }
+ return result;
+ }
+
+ public static class Factory implements LogAppenderFactory {
+
+ @Override
+ public LogAppender create(RaftMember member) {
+ return new BlockingLogAppender(member);
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
new file mode 100644
index 0000000..f997b06
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppender.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.cluster.log.appender;
+
+import java.util.List;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
+
+/**
+ * LogAppender appends newly incoming entries to the local log of a member, providing different
+ * policies for out-of-order entries and other cases.
+ */
+public interface LogAppender {
+
+ AppendEntryResult appendEntries(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs);
+
+ AppendEntryResult appendEntry(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, Log log);
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppenderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppenderFactory.java
new file mode 100644
index 0000000..1f1e83c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/LogAppenderFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iotdb.cluster.log.appender;
+
+import org.apache.iotdb.cluster.server.member.RaftMember;
+
+public interface LogAppenderFactory {
+ LogAppender create(RaftMember member);
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SWRaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
similarity index 90%
rename from cluster/src/main/java/org/apache/iotdb/cluster/expr/SWRaftMember.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
index 52a99b9..928f960 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/SWRaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java
@@ -17,21 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.cluster.expr;
+package org.apache.iotdb.cluster.log.appender;
-import org.apache.iotdb.cluster.client.ClientManager;
+import java.util.Arrays;
+import java.util.List;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
-import java.util.Arrays;
-import java.util.List;
-
-public abstract class SWRaftMember extends RaftMember {
+public class SlidingWindowLogAppender implements LogAppender {
private int windowCapacity =
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem() * 2;
@@ -40,13 +39,12 @@ public abstract class SWRaftMember extends RaftMember {
private long firstPosPrevIndex;
private long[] prevTerms = new long[windowCapacity];
- public SWRaftMember(String name, ClientManager clientManager) {
- super(name, clientManager);
- }
+ private RaftMember member;
+ private RaftLogManager logManager;
- @Override
- public void start() {
- super.start();
+ public SlidingWindowLogAppender(RaftMember member) {
+ this.member = member;
+ this.logManager = member.getLogManager();
this.firstPosPrevIndex = logManager.getLastLogIndex();
this.prevTerms[0] = logManager.getLastLogTerm();
}
@@ -135,10 +133,10 @@ public abstract class SWRaftMember extends RaftMember {
}
@Override
- protected AppendEntryResult appendEntries(
+ public AppendEntryResult appendEntries(
long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
if (logs.isEmpty()) {
- return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(getHeader());
+ return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(member.getHeader());
}
AppendEntryResult result = null;
@@ -158,9 +156,8 @@ public abstract class SWRaftMember extends RaftMember {
}
@Override
- protected AppendEntryResult appendEntry(
+ public AppendEntryResult appendEntry(
long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
-
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
long appendedPos = 0;
@@ -191,7 +188,7 @@ public abstract class SWRaftMember extends RaftMember {
} else {
Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
- result.setHeader(getHeader());
+ result.setHeader(member.getHeader());
return result;
}
}
@@ -203,4 +200,12 @@ public abstract class SWRaftMember extends RaftMember {
}
return result;
}
+
+ public static class Factory implements LogAppenderFactory {
+
+ @Override
+ public LogAppender create(RaftMember member) {
+ return new SlidingWindowLogAppender(member);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 4c6bd8a..c822ad5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -41,6 +41,9 @@ import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.VotingLog;
import org.apache.iotdb.cluster.log.VotingLogList;
+import org.apache.iotdb.cluster.log.appender.BlockingLogAppender;
+import org.apache.iotdb.cluster.log.appender.LogAppender;
+import org.apache.iotdb.cluster.log.appender.LogAppenderFactory;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
@@ -134,8 +137,8 @@ public abstract class RaftMember implements RaftMemberMBean {
public static boolean USE_LOG_DISPATCHER = false;
public static boolean USE_INDIRECT_LOG_DISPATCHER = false;
public static boolean ENABLE_WEAK_ACCEPTANCE = true;
- public static boolean ENABLE_COMMIT_RETURN = false;
+ private static final LogAppenderFactory APPENDER_FACTORY = new BlockingLogAppender.Factory();
protected static final LogSequencerFactory SEQUENCER_FACTORY =
ClusterDescriptor.getInstance().getConfig().isUseAsyncSequencing()
? new Factory()
@@ -253,7 +256,7 @@ public abstract class RaftMember implements RaftMemberMBean {
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
* which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
*/
- private LogDispatcher logDispatcher;
+ private volatile LogDispatcher logDispatcher;
/** If this node can not be the leader, this parameter will be set true. */
private volatile boolean skipElection = false;
@@ -271,6 +274,8 @@ public abstract class RaftMember implements RaftMemberMBean {
protected LogSequencer logSequencer;
+ private volatile LogAppender logAppender;
+
protected RaftMember() {}
protected RaftMember(String name, ClientManager clientManager) {
@@ -329,6 +334,17 @@ public abstract class RaftMember implements RaftMemberMBean {
}
}
+ public LogAppender getLogAppender() {
+ if (logAppender == null) {
+ synchronized (this) {
+ if (logAppender == null) {
+ logAppender = APPENDER_FACTORY.create(this);
+ }
+ }
+ }
+ return logAppender;
+ }
+
/**
* Stop the heartbeat thread and the catch-up thread pool. Calling the method twice does not
* induce side effects.
@@ -548,7 +564,8 @@ public abstract class RaftMember implements RaftMemberMBean {
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
AppendEntryResult result =
- appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
+ getLogAppender()
+ .appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
result.setHeader(request.getHeader());
logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result.status);
@@ -646,7 +663,9 @@ public abstract class RaftMember implements RaftMemberMBean {
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
- response = appendEntries(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+ response = getLogAppender()
+ .appendEntries(request.prevLogIndex, request.prevLogTerm, request.leaderCommit,
+ logs);
if (logger.isDebugEnabled()) {
logger.debug(
"{} AppendEntriesRequest of log size {} completed with result {}",
@@ -1021,12 +1040,11 @@ public abstract class RaftMember implements RaftMemberMBean {
long waitedTime = 0;
long localAppliedId;
- if (fastFail) {
- if (leaderCommitId - logManager.getMaxHaveAppliedCommitIndex() > config.getMaxSyncLogLag()) {
- logger.info(
- "{}: The raft log of this member is too backward to provide service directly.", name);
- return false;
- }
+ if (fastFail
+ && leaderCommitId - logManager.getMaxHaveAppliedCommitIndex() > config.getMaxSyncLogLag()) {
+ logger.info(
+ "{}: The raft log of this member is too backward to provide service directly.", name);
+ return false;
}
while (waitedTime < ClusterConstant.getSyncLeaderMaxWaitMs()) {
@@ -1605,7 +1623,7 @@ public abstract class RaftMember implements RaftMemberMBean {
return term;
}
- public synchronized LogDispatcher getLogDispatcher() {
+ public LogDispatcher getLogDispatcher() {
if (logDispatcher == null) {
if (USE_INDIRECT_LOG_DISPATCHER) {
logDispatcher = new IndirectLogDispatcher(this);
@@ -1717,9 +1735,7 @@ public abstract class RaftMember implements RaftMemberMBean {
}
Statistic.RAFT_SENDER_EXIT_LOG_MANAGER.calOperationCostTimeFromStart(startTime);
}
- if (ENABLE_COMMIT_RETURN) {
- return;
- }
+
// when using async applier, the log here may not be applied. To return the execution
// result, we must wait until the log is applied.
startTime = Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.getOperationStartTime();
@@ -2130,127 +2146,6 @@ public abstract class RaftMember implements RaftMemberMBean {
}
/**
- * Find the local previous log of "log". If such log is found, discard all local logs behind it
- * and append "log" to it. Otherwise report a log mismatch.
- *
- * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
- */
- protected AppendEntryResult appendEntry(
- long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
- long resp = checkPrevLogIndex(prevLogIndex);
- if (resp != Response.RESPONSE_AGREE) {
- return new AppendEntryResult(resp).setHeader(getHeader());
- }
-
- long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
- long success;
- AppendEntryResult result = new AppendEntryResult();
- synchronized (logManager) {
- success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
- if (success != -1) {
- result.setLastLogIndex(logManager.getLastLogIndex());
- result.setLastLogTerm(logManager.getLastLogTerm());
- }
- }
- Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
- if (success != -1) {
- logger.debug("{} append a new log {}", name, log);
- result.status = Response.RESPONSE_STRONG_ACCEPT;
- } else {
- // the incoming log points to an illegal position, reject it
- result.status = Response.RESPONSE_LOG_MISMATCH;
- }
- return result;
- }
-
- /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
- private boolean waitForPrevLog(long prevLogIndex) {
- long waitStart = System.currentTimeMillis();
- long alreadyWait = 0;
- Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
- long lastLogIndex = logManager.getLastLogIndex();
- Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
- while (lastLogIndex < prevLogIndex
- && alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS()) {
- try {
- // each time new logs are appended, this will be notified
- synchronized (logUpdateCondition) {
- logUpdateCondition.wait(1);
- }
- lastLogIndex = logManager.getLastLogIndex();
- if (lastLogIndex >= prevLogIndex) {
- return true;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- alreadyWait = System.currentTimeMillis() - waitStart;
- }
-
- return alreadyWait <= ClusterConstant.getWriteOperationTimeoutMS();
- }
-
- protected long checkPrevLogIndex(long prevLogIndex) {
- long lastLogIndex = logManager.getLastLogIndex();
- long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
- if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
- // there are logs missing between the incoming log and the local last log, and such logs
- // did not come within a timeout, report a mismatch to the sender and it shall fix this
- // through catch-up
- return Response.RESPONSE_LOG_MISMATCH;
- }
- Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
- return Response.RESPONSE_AGREE;
- }
-
- /**
- * Find the local previous log of "log". If such log is found, discard all local logs behind it
- * and append "log" to it. Otherwise report a log mismatch.
- *
- * @param logs append logs
- * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
- */
- protected AppendEntryResult appendEntries(
- long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
- logger.debug(
- "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
- name,
- prevLogIndex,
- prevLogTerm,
- leaderCommit);
- if (logs.isEmpty()) {
- return new AppendEntryResult(Response.RESPONSE_AGREE).setHeader(getHeader());
- }
-
- long resp = checkPrevLogIndex(prevLogIndex);
- if (resp != Response.RESPONSE_AGREE) {
- return new AppendEntryResult(resp).setHeader(getHeader());
- }
-
- AppendEntryResult result = new AppendEntryResult();
- synchronized (logManager) {
- long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
- resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
- Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
- if (resp != -1) {
- if (logger.isDebugEnabled()) {
- logger.debug("{} append a new log list {}, commit to {}", name, logs, leaderCommit);
- }
- result.status = Response.RESPONSE_STRONG_ACCEPT;
- result.setLastLogIndex(logManager.getLastLogIndex());
- result.setLastLogTerm(logManager.getLastLogTerm());
- } else {
- // the incoming log points to an illegal position, reject it
- result.status = Response.RESPONSE_LOG_MISMATCH;
- }
- }
- return result;
- }
-
- /**
* Check the term of the AppendEntryRequest. The term checked is the term of the leader, not the
* term of the log. A new leader can still send logs of old leaders.
*