You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/12 09:19:17 UTC
[incubator-ratis] branch master updated: RATIS-960. Add APIs to
support streaming state machine data. (#121)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9375d21 RATIS-960. Add APIs to support streaming state machine data. (#121)
9375d21 is described below
commit 9375d21daf6bb5e10199685257e1bb571ca32d4d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Jun 12 02:19:07 2020 -0700
RATIS-960. Add APIs to support streaming state machine data. (#121)
---
.../apache/ratis/statemachine/StateMachine.java | 52 ++++++++++++++++++++++
1 file changed, 52 insertions(+)
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 511af47..e4cf833 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -53,6 +54,7 @@ public interface StateMachine extends Closeable {
}
interface DataApi {
+ /** A noop implementation of {@link DataApi}. */
DataApi DEFAULT = new DataApi() {};
/**
@@ -74,6 +76,25 @@ public interface StateMachine extends Closeable {
}
/**
+ * Create asynchronously a {@link DataStream} to stream state machine data.
+ * The state machine may use the first message (i.e. request.getMessage()) as the header to create the stream.
+ *
+ * @return a future of the stream.
+ */
+ default CompletableFuture<DataStream> stream(RaftClientRequest request) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * Link asynchronously the given stream with the given log entry.
+ *
+ * @return a future for the link task.
+ */
+ default CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
* Flush the state machine data till the given log index.
*
* @param logIndex The log index to flush.
@@ -95,6 +116,37 @@ public interface StateMachine extends Closeable {
}
}
+ /**
+ * For streaming state machine data.
+ */
+ interface DataStream {
+ /** @return a channel for streaming state machine data. */
+ WritableByteChannel getWritableByteChannel();
+
+ /**
+ * Clean up asynchronously this stream.
+ *
+ * When there is an error, this method is invoked to clean up the associated resources.
+ * If this stream is not yet linked (see {@link DataApi#link(DataStream, LogEntryProto)}),
+ * the state machine may choose to remove the data or to keep the data internally for future recovery.
+ * If this stream is already linked, the data must not be removed.
+ *
+ * @return a future for the cleanup task.
+ */
+ CompletableFuture<?> cleanUp();
+ }
+
+ /**
+ * Get the optional {@link DataApi} object.
+ *
+ * If this {@link StateMachine} chooses to support {@link DataApi},
+ * it may either implement {@link DataApi} directly or override this method to return a {@link DataApi} object.
+ *
+ * Otherwise, this {@link StateMachine} does not support {@link DataApi}.
+ * Then, this method returns the default noop {@link DataApi} object.
+ *
+ * @return The optional {@link DataApi} object.
+ */
default DataApi data() {
return this instanceof DataApi? (DataApi)this : DataApi.DEFAULT;
}