You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/10/06 01:53:56 UTC
[iotdb] branch expr_vgraft updated: enable out-of-order enqueue for NB-Raft
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_vgraft by this push:
new 7cb4e6d549 enable out-of-order enqueue for NB-Raft
7cb4e6d549 is described below
commit 7cb4e6d549cd4e31c7f0c6756269e9fa43080ec6
Author: Tian Jiang <jt...@163.com>
AuthorDate: Thu Oct 6 09:53:51 2022 +0800
enable out-of-order enqueue for NB-Raft
---
.../log/sequencing/SynchronousSequencer.java | 27 +++++++++++++-----
.../iotdb/cluster/server/member/RaftMember.java | 33 ++++++++++++++++------
2 files changed, 44 insertions(+), 16 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index 07ce3a1cc7..f44a10a03e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -51,9 +51,19 @@ public class SynchronousSequencer implements LogSequencer {
this.logManager = logManager;
}
+ private SendLogRequest enqueueEntry(SendLogRequest sendLogRequest) {
+ long startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
+
+ if (member.getAllNodes().size() > 1) {
+ member.getLogDispatcher().offer(sendLogRequest);
+ }
+ Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+ return sendLogRequest;
+ }
+
@Override
public SendLogRequest sequence(Log log) {
- SendLogRequest sendLogRequest;
+ SendLogRequest sendLogRequest = null;
long startTime =
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2.getOperationStartTime();
@@ -87,15 +97,13 @@ public class SynchronousSequencer implements LogSequencer {
startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
sendLogRequest = buildSendLogRequest(log);
+ log.setCreateTime(System.nanoTime());
Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.calOperationCostTimeFromStart(startTime);
- startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
- log.setCreateTime(System.nanoTime());
- if (member.getAllNodes().size() > 1) {
- member.getLogDispatcher().offer(sendLogRequest);
+ if (!(ClusterDescriptor.getInstance().getConfig().isUseFollowerSlidingWindow()
+ && ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance())) {
+ sendLogRequest = enqueueEntry(sendLogRequest);
}
- Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
-
break;
}
try {
@@ -111,6 +119,11 @@ public class SynchronousSequencer implements LogSequencer {
}
}
+ if (ClusterDescriptor.getInstance().getConfig().isUseFollowerSlidingWindow()
+ && ClusterDescriptor.getInstance().getConfig().isEnableWeakAcceptance()) {
+ sendLogRequest = enqueueEntry(sendLogRequest);
+ }
+
return sendLogRequest;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 5c29f6e3f3..c89aa376ac 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1887,6 +1887,12 @@ public abstract class RaftMember implements RaftMemberMBean {
}
public AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
+ AppendEntryRequest request = buildAppendEntryRequestBasic(log, serializeNow);
+ request = buildAppendEntryRequestExtended(request, log, serializeNow);
+ return request;
+ }
+
+ protected AppendEntryRequest buildAppendEntryRequestBasic(Log log, boolean serializeNow) {
AppendEntryRequest request = new AppendEntryRequest();
request.setTerm(term.get());
if (serializeNow) {
@@ -1895,25 +1901,34 @@ public abstract class RaftMember implements RaftMemberMBean {
log.setByteSize(byteBuffer.array().length);
request.entry = byteBuffer;
Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(start);
- request.setEntryHash(byteBuffer.hashCode());
}
- request.setLeader(getThisNode());
- // don't need lock because even if it's larger than the commitIndex when appending this log to
- // logManager, the follower can handle the larger commitIndex with no effect
- request.setLeaderCommit(logManager.getCommitLogIndex());
- request.setPrevLogIndex(log.getCurrLogIndex() - 1);
- request.setIsFromLeader(true);
- request.setLeaderSignature(KeyManager.INSTANCE.getNodeSignature());
-
try {
request.setPrevLogTerm(logManager.getTerm(log.getCurrLogIndex() - 1));
} catch (Exception e) {
logger.error("getTerm failed for newly append entries", e);
}
+ request.setLeader(getThisNode());
+ // don't need lock because even if it's larger than the commitIndex when appending this log to
+ // logManager, the follower can handle the larger commitIndex with no effect
+ request.setLeaderCommit(logManager.getCommitLogIndex());
+ request.setPrevLogIndex(log.getCurrLogIndex() - 1);
if (getHeader() != null) {
// data groups use header to find a particular DataGroupMember
request.setHeader(getHeader());
}
+
+ return request;
+ }
+
+ protected AppendEntryRequest buildAppendEntryRequestExtended(AppendEntryRequest request, Log log,
+ boolean serializeNow) {
+ request.setIsFromLeader(true);
+ if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft()) {
+ request.setLeaderSignature(KeyManager.INSTANCE.getNodeSignature());
+ if (serializeNow) {
+ request.setEntryHash(request.entry.hashCode());
+ }
+ }
return request;
}