You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/23 01:40:24 UTC

[iotdb] branch native_raft updated: remove uncommitted log number limit

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 27c0153f556 remove uncommitted log number limit
27c0153f556 is described below

commit 27c0153f5566ac471e5eb9d369744b091430c554
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue May 23 09:43:12 2023 +0800

    remove uncommitted log number limit
---
 .../protocol/log/dispatch/DispatcherGroup.java     |  4 +-
 .../log/sequencing/SynchronousSequencer.java       | 60 ++++++++--------------
 2 files changed, 23 insertions(+), 41 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 85159c80290..89209a56696 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -39,10 +39,10 @@ public class DispatcherGroup {
   private final Peer peer;
   private final BlockingQueue<VotingEntry> entryQueue;
   private boolean nodeEnabled;
-  private RateLimiter rateLimiter;
+  private volatile RateLimiter rateLimiter;
   private final ExecutorService dispatcherThreadPool;
   private final LogDispatcher logDispatcher;
-  private boolean delayed;
+  private volatile boolean delayed;
   private DynamicThreadGroup dynamicThreadGroup;
   private String name;
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index 7c59049badb..4d1acdcb4e1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
-import java.util.concurrent.TimeUnit;
 
 /**
  * SynchronizedSequencer performs sequencing by taking the monitor of a LogManager within the caller
@@ -51,47 +50,30 @@ public class SynchronousSequencer implements LogSequencer {
   public VotingEntry sequence(Entry e) {
     VotingEntry votingEntry = null;
 
-    long startWaitingTime = System.currentTimeMillis();
+    // TODO: control the number of uncommitted entries while not letting the new leader be blocked
     RaftLogManager logManager = member.getLogManager();
-    while (true) {
-      try {
-        logManager.getLock().writeLock().lock();
-        Entry lastEntry = logManager.getLastEntry();
-        long lastIndex = lastEntry.getCurrLogIndex();
-        long lastTerm = lastEntry.getCurrLogTerm();
-        if ((lastEntry.getCurrLogIndex() - logManager.getCommitLogIndex()
-            <= config.getUncommittedRaftLogNumForRejectThreshold())) {
-          // if the log contains a physical plan which is not a LogPlan, assign the same index to
-          // the plan so the state machine can be bridged with the consensus
-          e.setCurrLogTerm(member.getStatus().getTerm().get());
-          e.setCurrLogIndex(lastIndex + 1);
-          e.setPrevTerm(lastTerm);
-          e.setFromThisNode(true);
-          e.createTime = System.nanoTime();
-
-          // logDispatcher will serialize log, and set log size, and we will use the size after it
-          logManager.append(Collections.singletonList(e), true);
-
-          votingEntry = LogUtils.buildVotingLog(e, member);
-
-          if (!(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance())) {
-            votingEntry = LogUtils.enqueueEntry(votingEntry, member);
-          }
-          break;
-        }
-      } finally {
-        logManager.getLock().writeLock().unlock();
-      }
+    try {
+      logManager.getLock().writeLock().lock();
+      Entry lastEntry = logManager.getLastEntry();
+      long lastIndex = lastEntry.getCurrLogIndex();
+      long lastTerm = lastEntry.getCurrLogTerm();
+
+      e.setCurrLogTerm(member.getStatus().getTerm().get());
+      e.setCurrLogIndex(lastIndex + 1);
+      e.setPrevTerm(lastTerm);
+      e.setFromThisNode(true);
+      e.createTime = System.nanoTime();
+
+      // logDispatcher will serialize log, and set log size, and we will use the size after it
+      logManager.append(Collections.singletonList(e), true);
+
+      votingEntry = LogUtils.buildVotingLog(e, member);
 
-      try {
-        TimeUnit.MILLISECONDS.sleep(10);
-        if (System.currentTimeMillis() - startWaitingTime
-            > config.getMaxWaitingTimeWhenInsertBlocked()) {
-          return null;
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
+      if (!(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance())) {
+        votingEntry = LogUtils.enqueueEntry(votingEntry, member);
       }
+    } finally {
+      logManager.getLock().writeLock().unlock();
     }
 
     if (config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance()) {