You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/02/26 06:00:39 UTC
[ratis] branch master updated: RATIS-1534. SegmentedRaftLogWorker should enforce a minimum time interval between flush calls (#611)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ab82638 RATIS-1534. SegmentedRaftLogWorker should enforce a minimum time interval between flush calls (#611)
ab82638 is described below
commit ab82638e785d664792c82efe125c2b81b4424d64
Author: 诚夏 <Si...@users.noreply.github.com>
AuthorDate: Sat Feb 26 14:00:35 2022 +0800
RATIS-1534. SegmentedRaftLogWorker should enforce a minimum time interval between flush calls (#611)
---
.../apache/ratis/server/RaftServerConfigKeys.java | 11 ++++++++++
.../raftlog/segmented/SegmentedRaftLogWorker.java | 25 +++++++++++++++-------
2 files changed, 28 insertions(+), 8 deletions(-)
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 520620d..9eae751 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -348,6 +348,17 @@ public interface RaftServerConfigKeys {
setInt(properties::setInt, FORCE_SYNC_NUM_KEY, forceSyncNum);
}
+
+ String FLUSH_INTERVAL_MIN_KEY = PREFIX + ".flush.interval.min";
+ TimeDuration FLUSH_INTERVAL_MIN_DEFAULT = TimeDuration.ZERO;
+ static TimeDuration flushIntervalMin(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(FLUSH_INTERVAL_MIN_DEFAULT.getUnit()),
+ FLUSH_INTERVAL_MIN_KEY, FLUSH_INTERVAL_MIN_DEFAULT, getDefaultLog());
+ }
+ static void setFlushIntervalMin(RaftProperties properties, TimeDuration flushTimeInterval) {
+ setTimeDuration(properties::setTimeDuration, FLUSH_INTERVAL_MIN_KEY, flushTimeInterval);
+ }
+
/** The policy to handle corrupted raft log. */
enum CorruptionPolicy {
/** Rethrow the exception. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index fc8f61e..9e5eb7e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -176,6 +176,9 @@ class SegmentedRaftLogWorker {
private final RaftServer.Division server;
private int flushBatchSize;
+ private Timestamp lastFlush;
+ private final TimeDuration flushIntervalMin;
+
private final StateMachineDataPolicy stateMachineDataPolicy;
SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
@@ -214,6 +217,8 @@ class SegmentedRaftLogWorker {
final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+ this.lastFlush = Timestamp.currentTime();
+ this.flushIntervalMin = RaftServerConfigKeys.Log.flushIntervalMin(properties);
}
void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException {
@@ -321,6 +326,7 @@ class SegmentedRaftLogWorker {
}
task.done();
}
+ flushIfNecessary();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (running) {
@@ -344,13 +350,18 @@ class SegmentedRaftLogWorker {
}
private boolean shouldFlush() {
- return pendingFlushNum >= forceSyncNum ||
- (pendingFlushNum > 0 && queue.isEmpty());
+ if (out == null) {
+ return false;
+ } else if (pendingFlushNum >= forceSyncNum) {
+ return true;
+ }
+ return pendingFlushNum > 0 && queue.isEmpty() && lastFlush.elapsedTime().compareTo(flushIntervalMin) > 0;
}
@SuppressFBWarnings("NP_NULL_PARAM_DEREF")
- private void flushWrites() throws IOException {
- if (out != null) {
+ private void flushIfNecessary() throws IOException {
+ if (shouldFlush()) {
+ raftLogMetrics.onRaftLogFlush();
LOG.debug("{}: flush {}", name, out);
final Timer.Context timerContext = logFlushTimer.time();
try {
@@ -369,6 +380,7 @@ class SegmentedRaftLogWorker {
}
} finally {
timerContext.stop();
+ lastFlush = Timestamp.currentTime();
}
updateFlushedIndexIncreasingly();
}
@@ -513,10 +525,7 @@ class SegmentedRaftLogWorker {
out.write(entry);
lastWrittenIndex = entry.getIndex();
pendingFlushNum++;
- if (shouldFlush()) {
- raftLogMetrics.onRaftLogFlush();
- flushWrites();
- }
+ flushIfNecessary();
}
@Override