You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/23 01:40:24 UTC
[iotdb] branch native_raft updated: remove uncommitted log number limit
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 27c0153f556 remove uncommitted log number limit
27c0153f556 is described below
commit 27c0153f5566ac471e5eb9d369744b091430c554
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 23 09:43:12 2023 +0800
remove uncommitted log number limit
---
.../protocol/log/dispatch/DispatcherGroup.java | 4 +-
.../log/sequencing/SynchronousSequencer.java | 60 ++++++++--------------
2 files changed, 23 insertions(+), 41 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 85159c80290..89209a56696 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -39,10 +39,10 @@ public class DispatcherGroup {
private final Peer peer;
private final BlockingQueue<VotingEntry> entryQueue;
private boolean nodeEnabled;
- private RateLimiter rateLimiter;
+ private volatile RateLimiter rateLimiter;
private final ExecutorService dispatcherThreadPool;
private final LogDispatcher logDispatcher;
- private boolean delayed;
+ private volatile boolean delayed;
private DynamicThreadGroup dynamicThreadGroup;
private String name;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index 7c59049badb..4d1acdcb4e1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
-import java.util.concurrent.TimeUnit;
/**
* SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
@@ -51,47 +50,30 @@ public class SynchronousSequencer implements LogSequencer {
public VotingEntry sequence(Entry e) {
VotingEntry votingEntry = null;
- long startWaitingTime = System.currentTimeMillis();
+ // TODO: control the number of uncommitted entries while not letting the new leader be blocked
RaftLogManager logManager = member.getLogManager();
- while (true) {
- try {
- logManager.getLock().writeLock().lock();
- Entry lastEntry = logManager.getLastEntry();
- long lastIndex = lastEntry.getCurrLogIndex();
- long lastTerm = lastEntry.getCurrLogTerm();
- if ((lastEntry.getCurrLogIndex() - logManager.getCommitLogIndex()
- <= config.getUncommittedRaftLogNumForRejectThreshold())) {
- // if the log contains a physical plan which is not a LogPlan, assign the same index to
- // the plan so the state machine can be bridged with the consensus
- e.setCurrLogTerm(member.getStatus().getTerm().get());
- e.setCurrLogIndex(lastIndex + 1);
- e.setPrevTerm(lastTerm);
- e.setFromThisNode(true);
- e.createTime = System.nanoTime();
-
- // logDispatcher will serialize log, and set log size, and we will use the size after it
- logManager.append(Collections.singletonList(e), true);
-
- votingEntry = LogUtils.buildVotingLog(e, member);
-
- if (!(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance())) {
- votingEntry = LogUtils.enqueueEntry(votingEntry, member);
- }
- break;
- }
- } finally {
- logManager.getLock().writeLock().unlock();
- }
+ try {
+ logManager.getLock().writeLock().lock();
+ Entry lastEntry = logManager.getLastEntry();
+ long lastIndex = lastEntry.getCurrLogIndex();
+ long lastTerm = lastEntry.getCurrLogTerm();
+
+ e.setCurrLogTerm(member.getStatus().getTerm().get());
+ e.setCurrLogIndex(lastIndex + 1);
+ e.setPrevTerm(lastTerm);
+ e.setFromThisNode(true);
+ e.createTime = System.nanoTime();
+
+ // logDispatcher will serialize log, and set log size, and we will use the size after it
+ logManager.append(Collections.singletonList(e), true);
+
+ votingEntry = LogUtils.buildVotingLog(e, member);
- try {
- TimeUnit.MILLISECONDS.sleep(10);
- if (System.currentTimeMillis() - startWaitingTime
- > config.getMaxWaitingTimeWhenInsertBlocked()) {
- return null;
- }
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
+ if (!(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance())) {
+ votingEntry = LogUtils.enqueueEntry(votingEntry, member);
}
+ } finally {
+ logManager.getLock().writeLock().unlock();
}
if (config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance()) {