You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/22 20:09:10 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510426714



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient<T> {
+
+    interface Listener<T> {
+        /**
+         * Callback which is invoked for all records committed to the log.
+         * It is the responsibility of the caller to invoke {@link BatchReader#close()}
+         * after consuming the reader.
+         *
+         * Note that there is not a one-to-one correspondence between writes through
+         * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation
+         * is free to batch together the records from multiple append calls provided
+         * that batch boundaries are respected. This means that each batch specified
+         * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of
+         * a batch provided by the {@link BatchReader}.
+         *
+         * @param reader reader instance which must be iterated
+         */
+        void handleCommit(BatchReader<T> reader);
+
+        /**
+         * Invoked after this node has become a leader. This is only called after
+         * all commits up to the start of the leader's epoch have been sent to
+         * {@link #handleCommit(BatchReader)}.
+         *
+         * After becoming a leader, the client is eligible to write to the log
+         * using {@link #scheduleAppend(int, List)}.
+         *
+         * @param epoch the claimed leader epoch
+         */
+        default void handleClaim(int epoch) {}
+
+        /**
+         * Invoked after a leader has stepped down. This callback may or may not
+         * fire before the next leader has been elected.
+         */
+        default void handleResign() {}
+    }
 
     /**
-     * Initialize the client. This should only be called once and it must be
-     * called before any of the other APIs can be invoked.
+     * Initialize the client. This should only be called once on startup.
      *
      * @throws IOException For any IO errors during initialization
      */
     void initialize() throws IOException;
 
     /**
-     * Append a new entry to the log. The client must be in the leader state to
-     * accept an append: it is up to the state machine implementation
-     * to ensure this using {@link #currentLeaderAndEpoch()}.
-     *
-     * TODO: One improvement we can make here is to allow the caller to specify
-     * the current leader epoch in the record set. That would ensure that each
-     * leader change must be "observed" by the state machine before new appends
-     * are accepted.
-     *
-     * @param records The records to append to the log
-     * @param timeoutMs Maximum time to wait for the append to complete
-     * @return A future containing the last offset and epoch of the appended records (if successful)
-     */
-    CompletableFuture<OffsetAndEpoch> append(Records records, AckMode ackMode, long timeoutMs);
-
-    /**
-     * Read a set of records from the log. Note that it is the responsibility of the state machine
-     * to filter control records added by the Raft client itself.
-     *
-     * If the fetch offset is no longer valid, then the future will be completed exceptionally
-     * with a {@link LogTruncationException}.
+     * Register a listener to get commit/leader notifications.
      *
-     * @param position The position to fetch from
-     * @param isolation The isolation level to apply to the read
-     * @param maxWaitTimeMs The maximum time to wait for new data to become available before completion
-     * @return The record set, which may be empty if fetching from the end of the log
+     * @param listener the listener
      */
-    CompletableFuture<Records> read(OffsetAndEpoch position, Isolation isolation, long maxWaitTimeMs);
+    void register(Listener<T> listener);
 
     /**
-     * Get the current leader (if known) and the current epoch.
+     * Append a list of records to the log. The write will be scheduled for some time
+     * in the future. There is no guarantee that appended records will be written to
+     * the log and eventually committed. However, it is guaranteed that if any of the
+     * records become committed, then all of them will be.
      *
-     * @return Current leader and epoch information
+     * @param epoch the current leader epoch
+     * @param records the list of records to append
+     * @return the offset within the current epoch that the log entries will be appended,
+     *         or null if the leader was unable to accept the write (e.g. due to memory
+     *         being reached).

Review comment:
       Agreed. I added this here: https://github.com/apache/kafka/pull/9418/files#diff-ac850e29114f9b5a03aaf3ccb8f07feda8b48e5de6912f4c527b8477aa3d6cbcR60.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org