You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/02/26 06:00:39 UTC

[ratis] branch master updated: RATIS-1534. SegmentedRaftLogWorker should enforce a minimum time interval between flush calls (#611)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ab82638  RATIS-1534. SegmentedRaftLogWorker should enforce a minimum time interval between flush calls (#611)
ab82638 is described below

commit ab82638e785d664792c82efe125c2b81b4424d64
Author: 诚夏 <Si...@users.noreply.github.com>
AuthorDate: Sat Feb 26 14:00:35 2022 +0800

    RATIS-1534. SegmentedRaftLogWorker should enforce a minimum time interval between flush calls (#611)
---
 .../apache/ratis/server/RaftServerConfigKeys.java  | 11 ++++++++++
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 25 +++++++++++++++-------
 2 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 520620d..9eae751 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -348,6 +348,17 @@ public interface RaftServerConfigKeys {
       setInt(properties::setInt, FORCE_SYNC_NUM_KEY, forceSyncNum);
     }
 
+
+    String FLUSH_INTERVAL_MIN_KEY = PREFIX + ".flush.interval.min";
+    TimeDuration FLUSH_INTERVAL_MIN_DEFAULT = TimeDuration.ZERO;
+    static TimeDuration flushIntervalMin(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(FLUSH_INTERVAL_MIN_DEFAULT.getUnit()),
+              FLUSH_INTERVAL_MIN_KEY, FLUSH_INTERVAL_MIN_DEFAULT, getDefaultLog());
+    }
+    static void setFlushIntervalMin(RaftProperties properties, TimeDuration flushTimeInterval) {
+      setTimeDuration(properties::setTimeDuration, FLUSH_INTERVAL_MIN_KEY, flushTimeInterval);
+    }
+
     /** The policy to handle corrupted raft log. */
     enum CorruptionPolicy {
       /** Rethrow the exception. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index fc8f61e..9e5eb7e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -176,6 +176,9 @@ class SegmentedRaftLogWorker {
   private final RaftServer.Division server;
   private int flushBatchSize;
 
+  private Timestamp lastFlush;
+  private final TimeDuration flushIntervalMin;
+
   private final StateMachineDataPolicy stateMachineDataPolicy;
 
   SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
@@ -214,6 +217,8 @@ class SegmentedRaftLogWorker {
 
     final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+    this.lastFlush = Timestamp.currentTime();
+    this.flushIntervalMin = RaftServerConfigKeys.Log.flushIntervalMin(properties);
   }
 
   void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
@@ -321,6 +326,7 @@ class SegmentedRaftLogWorker {
           }
           task.done();
         }
+        flushIfNecessary();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         if (running) {
@@ -344,13 +350,18 @@ class SegmentedRaftLogWorker {
   }
 
   private boolean shouldFlush() {
-    return pendingFlushNum >= forceSyncNum ||
-        (pendingFlushNum > 0 && queue.isEmpty());
+    if (out == null) {
+      return false;
+    } else if (pendingFlushNum >= forceSyncNum) {
+      return true;
+    }
+    return pendingFlushNum > 0 && queue.isEmpty() && lastFlush.elapsedTime().compareTo(flushIntervalMin) > 0;
   }
 
   @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
-  private void flushWrites() throws IOException {
-    if (out != null) {
+  private void flushIfNecessary() throws IOException {
+    if (shouldFlush()) {
+      raftLogMetrics.onRaftLogFlush();
       LOG.debug("{}: flush {}", name, out);
       final Timer.Context timerContext = logFlushTimer.time();
       try {
@@ -369,6 +380,7 @@ class SegmentedRaftLogWorker {
         }
       } finally {
         timerContext.stop();
+        lastFlush = Timestamp.currentTime();
       }
       updateFlushedIndexIncreasingly();
     }
@@ -513,10 +525,7 @@ class SegmentedRaftLogWorker {
       out.write(entry);
       lastWrittenIndex = entry.getIndex();
       pendingFlushNum++;
-      if (shouldFlush()) {
-        raftLogMetrics.onRaftLogFlush();
-        flushWrites();
-      }
+      flushIfNecessary();
     }
 
     @Override