You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/08/17 22:48:57 UTC

incubator-ratis git commit: RATIS-295. RaftLogWorker#flushWrites should also flush state machine data. Contributed by Shashikant Banerjee

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 0b73e892a -> 05812468f


RATIS-295. RaftLogWorker#flushWrites should also flush state machine data.  Contributed by Shashikant Banerjee


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/05812468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/05812468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/05812468

Branch: refs/heads/master
Commit: 05812468f7c85b38ff7391e361c27a76929cf48a
Parents: 0b73e89
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Fri Aug 17 15:45:42 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Fri Aug 17 15:45:42 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/ratis/server/storage/RaftLogWorker.java | 7 +++++++
 .../java/org/apache/ratis/statemachine/StateMachine.java    | 9 +++++++++
 2 files changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/05812468/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 64f5356..68f303d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
 /**
@@ -225,7 +226,13 @@ class RaftLogWorker implements Runnable {
       LOG.debug("flush data to " + out + ", reset pending_sync_number to 0");
       final Timer.Context timerContext = logFlushTimer.get().time();
       try {
+        final CompletableFuture<Void> f = stateMachine != null ?
+            stateMachine.flushStateMachineData(lastWrittenIndex) :
+            CompletableFuture.completedFuture(null);
         out.flush();
+        f.get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw IOUtils.asIOException(e);
       } finally {
         timerContext.stop();
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/05812468/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 88b5276..c76b2e4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -231,4 +231,13 @@ public interface StateMachine extends Closeable {
   default void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) {
 
   }
+
+  /**
+   * Flush the state machine data till the log index provided.
+   * @param index log Index
+   * @return a future for the flush task, null otherwise
+   */
+  default CompletableFuture<Void> flushStateMachineData(long index) {
+    return CompletableFuture.completedFuture(null);
+  }
 }