You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/03 13:44:07 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

jsancio commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r516172571



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1017,12 +1028,9 @@ private boolean handleFetchResponse(
                 log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
                     logger.info("Truncated to offset {} from Fetch response from leader {}",
                         truncationOffset, quorum.leaderIdOrNil());
-
-                    // Since the end offset has been updated, we should complete any delayed
-                    // reads at the end offset.
-                    fetchPurgatory.maybeComplete(
-                        new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED),
-                        currentTimeMs);
+                    // After truncation, we complete all pending reads in order to
+                    // ensure that fetches account for the updated log end offset
+                    fetchPurgatory.completeAll(currentTimeMs);

Review comment:
       > I had considered this previously and decided to leave the fetches in purgatory while the election was in progress to prevent unnecessary retries since that is all the client can do while waiting for the outcome. On the other hand, some of the fetches in purgatory might be from other voters. It might be better to respond more quickly so that there are not any unnecessary election delays. I'd suggest we open a separate issue to consider this.
   
   Sounds good to create a Jira for this.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1778,4 +1808,98 @@ public void complete() {
         }
     }
 
+    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+        private final RaftClient.Listener<T> listener;
+        private BatchReader<T> lastSent = null;
+        private long lastAckedOffset = 0;
+        private int claimedEpoch = 0;
+
+        private ListenerContext(Listener<T> listener) {
+            this.listener = listener;
+        }
+
+        /**
+         * Get the last acked offset, which is one greater than the offset of the
+         * last record which was acked by the state machine.
+         */
+        public synchronized long lastAckedOffset() {
+            return lastAckedOffset;
+        }
+
+        /**
+         * Get the next expected offset, which might be larger than the last acked
+         * offset if there are inflight batches which have not been acked yet.
+         * Note that when fetching from disk, we may not know the last offset of
+         * inflight data until it has been processed by the state machine. In this case,
+         * we delay sending additional data until the state machine has read to the
+         * end and the last offset is determined.
+         */
+        public synchronized OptionalLong nextExpectedOffset() {
+            if (lastSent != null) {
+                OptionalLong lastSentOffset = lastSent.lastOffset();
+                if (lastSentOffset.isPresent()) {
+                    return OptionalLong.of(lastSentOffset.getAsLong() + 1);
+                } else {
+                    return OptionalLong.empty();
+                }
+            } else {
+                return OptionalLong.of(lastAckedOffset);
+            }
+        }
+
+        /**
+         * This API is used for committed records that have been received through
+         * replication. In general, followers will write new data to disk before they
+         * know whether it has been committed. Rather than retaining the uncommitted
+         * data in memory, we let the state machine read the records from disk.
+         */
+        public void fireHandleCommit(long baseOffset, Records records) {
+            BufferSupplier bufferSupplier = BufferSupplier.create();
+            RecordsBatchReader<T> reader = new RecordsBatchReader<>(baseOffset, records,
+                serde, bufferSupplier, this);
+            fireHandleCommit(reader);
+        }
+
+        /**
+         * This API is used for committed records originating from {@link #scheduleAppend(int, List)}
+         * on this instance. In this case, we are able to save the original record objects,
+         * which saves the need to read them back from disk. This is a nice optimization
+         * for the leader which is typically doing more work than all of the followers.
+         */
+        public void fireHandleCommit(long baseOffset, int epoch, List<T> records) {
+            BatchReader.Batch<T> batch = new BatchReader.Batch<>(baseOffset, epoch, records);
+            MemoryBatchReader<T> reader = new MemoryBatchReader<>(Collections.singletonList(batch), this);
+            fireHandleCommit(reader);
+        }
+
+        private void fireHandleCommit(BatchReader<T> reader) {
+            synchronized (this) {
+                this.lastSent = reader;
+            }
+            listener.handleCommit(reader);

Review comment:
       >  At the moment, I am leaning toward the latter. In any case, I suggest we let the errors propagate for now and file a jira to reconsider once we are closer to integration. Does that sound fair?
   
   Sounds fair to create a Jira and revisit this later.

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an interface
+ * is that it allows us to push blocking operations such as reads from disk outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, Closeable {

Review comment:
       > We need some way to communicate iteration progress back to the IO thread. It is probably still possible to do this with a layer of indirection through Iterable, but it seemed more natural if the IO thread had access to the Iterator object that was used by the listener.
   
   Okay. Specifically, you are saying that this would be difficult to implement with an `Iterable`:
   ```java
   lastReturnedOffset = res.lastOffset();
   ```
   https://github.com/apache/kafka/pull/9482/files/867650fa2344497ac3f3505bd5058f2eae0cc0c4#diff-37728c07e52382d38a9db6f655c3921d274c4277a8909d7613fc433d6bf69636R140




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org