You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/08/17 22:48:57 UTC
incubator-ratis git commit: RATIS-295. RaftLogWorker#flushWrites
should also flush state machine data. Contributed by Shashikant Banerjee
Repository: incubator-ratis
Updated Branches:
refs/heads/master 0b73e892a -> 05812468f
RATIS-295. RaftLogWorker#flushWrites should also flush state machine data. Contributed by Shashikant Banerjee
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/05812468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/05812468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/05812468
Branch: refs/heads/master
Commit: 05812468f7c85b38ff7391e361c27a76929cf48a
Parents: 0b73e89
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Fri Aug 17 15:45:42 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Fri Aug 17 15:45:42 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/ratis/server/storage/RaftLogWorker.java | 7 +++++++
.../java/org/apache/ratis/statemachine/StateMachine.java | 9 +++++++++
2 files changed, 16 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/05812468/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 64f5356..68f303d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
/**
@@ -225,7 +226,13 @@ class RaftLogWorker implements Runnable {
LOG.debug("flush data to " + out + ", reset pending_sync_number to 0");
final Timer.Context timerContext = logFlushTimer.get().time();
try {
+ final CompletableFuture<Void> f = stateMachine != null ?
+ stateMachine.flushStateMachineData(lastWrittenIndex) :
+ CompletableFuture.completedFuture(null);
out.flush();
+ f.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw IOUtils.asIOException(e);
} finally {
timerContext.stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/05812468/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 88b5276..c76b2e4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -231,4 +231,13 @@ public interface StateMachine extends Closeable {
default void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
}
+
+ /**
+ * Flush the state machine data till the log index provided.
+ * @param index log Index
+ * @return a future for the flush task, null otherwise
+ */
+ default CompletableFuture<Void> flushStateMachineData(long index) {
+ return CompletableFuture.completedFuture(null);
+ }
}