You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/12 09:19:17 UTC

[incubator-ratis] branch master updated: RATIS-960. Add APIs to support streaming state machine data. (#121)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9375d21  RATIS-960. Add APIs to support streaming state machine data. (#121)
9375d21 is described below

commit 9375d21daf6bb5e10199685257e1bb571ca32d4d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Jun 12 02:19:07 2020 -0700

    RATIS-960. Add APIs to support streaming state machine data. (#121)
---
 .../apache/ratis/statemachine/StateMachine.java    | 52 ++++++++++++++++++++++
 1 file changed, 52 insertions(+)

diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 511af47..e4cf833 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
@@ -53,6 +54,7 @@ public interface StateMachine extends Closeable {
   }
 
   interface DataApi {
+    /** A noop implementation of {@link DataApi}. */
     DataApi DEFAULT = new DataApi() {};
 
     /**
@@ -74,6 +76,25 @@ public interface StateMachine extends Closeable {
     }
 
     /**
+     * Create asynchronously a {@link DataStream} to stream state machine data.
+     * The state machine may use the first message (i.e. request.getMessage()) as the header to create the stream.
+     *
+     * @return a future of the stream.
+     */
+    default CompletableFuture<DataStream> stream(RaftClientRequest request) {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    /**
+     * Link asynchronously the given stream with the given log entry.
+     *
+     * @return a future for the link task.
+     */
+    default CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    /**
      * Flush the state machine data till the given log index.
      *
      * @param logIndex The log index to flush.
@@ -95,6 +116,37 @@ public interface StateMachine extends Closeable {
     }
   }
 
+  /**
+   * For streaming state machine data.
+   */
+  interface DataStream {
+    /** @return a channel for streaming state machine data. */
+    WritableByteChannel getWritableByteChannel();
+
+    /**
+     * Clean up asynchronously this stream.
+     *
+     * When there is an error, this method is invoked to clean up the associated resources.
+     * If this stream is not yet linked (see {@link DataApi#link(DataStream, LogEntryProto)}),
+     * the state machine may choose to remove the data or to keep the data internally for future recovery.
+     * If this stream is already linked, the data must not be removed.
+     *
+     * @return a future for the cleanup task.
+     */
+    CompletableFuture<?> cleanUp();
+  }
+
+  /**
+   * Get the optional {@link DataApi} object.
+   *
+   * If this {@link StateMachine} chooses to support {@link DataApi},
+   * it may either implement {@link DataApi} directly or override this method to return a {@link DataApi} object.
+   *
+   * Otherwise, this {@link StateMachine} does not support {@link DataApi}.
+   * Then, this method returns the default noop {@link DataApi} object.
+   *
+   * @return The optional {@link DataApi} object.
+   */
   default DataApi data() {
     return this instanceof DataApi? (DataApi)this : DataApi.DEFAULT;
   }