You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by GitBox <gi...@apache.org> on 2022/08/02 16:15:35 UTC

[GitHub] [ratis] szetszwo commented on a diff in pull request #699: RATIS-1644. Provide a safe async flush.

szetszwo commented on code in PR #699:
URL: https://github.com/apache/ratis/pull/699#discussion_r935781998


##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java:
##########
@@ -375,22 +377,50 @@ private void flushIfNecessary() throws IOException {
         if (unsafeFlush) {
           // unsafe-flush: call updateFlushedIndexIncreasingly() without waiting the underlying FileChannel.force(..).
           unsafeFlushOutStream();
+          updateFlushedIndexIncreasingly();
+        } else if (asyncFlush) {
+          asyncFlushOutStream();

Review Comment:
   We should combine stateMachine flush future `f`.
   ```
   +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
   @@ -379,7 +379,7 @@ class SegmentedRaftLogWorker {
              unsafeFlushOutStream();
              updateFlushedIndexIncreasingly();
            } else if (asyncFlush) {
   -          asyncFlushOutStream();
   +          asyncFlushOutStream(f);
            } else {
              flushOutStream();
              if (!stateMachineDataPolicy.isSync()) {
   @@ -398,9 +398,11 @@ class SegmentedRaftLogWorker {
        out.asyncFlush(flushExecutor, null).whenComplete((v, e) -> logSyncTimerContext.stop());
      }
    
   -  private void asyncFlushOutStream() throws IOException {
   +  private void asyncFlushOutStream(CompletableFuture<Void> stateMachineFlush) throws IOException {
        final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
   -    out.asyncFlush(flushExecutor, lastWrittenIndex).whenComplete((v, e) -> {
   +    out.asyncFlush(flushExecutor, lastWrittenIndex)
   +        .thenCombine(stateMachineFlush, (async, stateMachine) -> async)
   +        .whenComplete((v, e) -> {
          updateFlushedIndexIncreasingly(v);
          logSyncTimerContext.stop();
        });
   ```



##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java:
##########
@@ -217,7 +218,8 @@ synchronized void updateIndex(long i) {
     final int bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
     this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);
-    this.flushExecutor = !unsafeFlush? null
+    this.asyncFlush = RaftServerConfigKeys.Log.asyncFlushEnabled(properties);

Review Comment:
   We should throw an exception when both asyncFlush and unsafeFlush are true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org