You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:15:42 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r516151645



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1017,12 +1028,9 @@ private boolean handleFetchResponse(
                 log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
                     logger.info("Truncated to offset {} from Fetch response from leader {}",
                         truncationOffset, quorum.leaderIdOrNil());
-
-                    // Since the end offset has been updated, we should complete any delayed
-                    // reads at the end offset.
-                    fetchPurgatory.maybeComplete(
-                        new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED),
-                        currentTimeMs);
+                    // After truncation, we complete all pending reads in order to
+                    // ensure that fetches account for the updated log end offset
+                    fetchPurgatory.completeAll(currentTimeMs);

Review comment:
       > With the new Listener when is this not a noop? Looking at the code, we only add entries to fetchPurgatorywhen the replica is a leader and it receives a Fetch request.
   
   Yeah, that's fair. I don't think we can truncate unless we are a follower and that implies we already cleared the purgatory in `onBecomeFollower`. So I think you are right that we are safe to remove this, though we'll probably need to add it back once we have follower fetching. 
   
   > I think the part that is missing is that the old leader should fetchPurgatory.completeAll when it loses leadership.
   
   I had considered this previously and decided to leave the fetches in purgatory while the election was in progress to prevent unnecessary retries since that is all the client can do while waiting for the outcome. On the other hand, some of the fetches in purgatory might be from other voters. It might be better to respond more quickly so that there are not any unnecessary election delays. I'd suggest we open a separate issue to consider this.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -228,35 +234,80 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);

Review comment:
       I will add a comment. I agree it is a subtle point.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1778,4 +1808,98 @@ public void complete() {
         }
     }
 
+    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+        private final RaftClient.Listener<T> listener;
+        private BatchReader<T> lastSent = null;
+        private long lastAckedOffset = 0;
+        private int claimedEpoch = 0;

Review comment:
       Let me add a helper to `ListenerContext` so that we can keep the field encapsulated.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1778,4 +1808,98 @@ public void complete() {
         }
     }
 
+    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+        private final RaftClient.Listener<T> listener;
+        private BatchReader<T> lastSent = null;
+        private long lastAckedOffset = 0;
+        private int claimedEpoch = 0;
+
+        private ListenerContext(Listener<T> listener) {
+            this.listener = listener;
+        }
+
+        /**
+         * Get the last acked offset, which is one greater than the offset of the
+         * last record which was acked by the state machine.
+         */
+        public synchronized long lastAckedOffset() {
+            return lastAckedOffset;
+        }
+
+        /**
+         * Get the next expected offset, which might be larger than the last acked
+         * offset if there are inflight batches which have not been acked yet.
+         * Note that when fetching from disk, we may not know the last offset of
+         * inflight data until it has been processed by the state machine. In this case,
+         * we delay sending additional data until the state machine has read to the
+         * end and the last offset is determined.
+         */
+        public synchronized OptionalLong nextExpectedOffset() {
+            if (lastSent != null) {
+                OptionalLong lastSentOffset = lastSent.lastOffset();
+                if (lastSentOffset.isPresent()) {
+                    return OptionalLong.of(lastSentOffset.getAsLong() + 1);
+                } else {
+                    return OptionalLong.empty();
+                }
+            } else {
+                return OptionalLong.of(lastAckedOffset);
+            }
+        }
+
+        /**
+         * This API is used for committed records that have been received through
+         * replication. In general, followers will write new data to disk before they
+         * know whether it has been committed. Rather than retaining the uncommitted
+         * data in memory, we let the state machine read the records from disk.
+         */
+        public void fireHandleCommit(long baseOffset, Records records) {
+            BufferSupplier bufferSupplier = BufferSupplier.create();
+            RecordsBatchReader<T> reader = new RecordsBatchReader<>(baseOffset, records,
+                serde, bufferSupplier, this);
+            fireHandleCommit(reader);
+        }
+
+        /**
+         * This API is used for committed records originating from {@link #scheduleAppend(int, List)}
+         * on this instance. In this case, we are able to save the original record objects,
+         * which saves the need to read them back from disk. This is a nice optimization
+         * for the leader which is typically doing more work than all of the followers.
+         */
+        public void fireHandleCommit(long baseOffset, int epoch, List<T> records) {
+            BatchReader.Batch<T> batch = new BatchReader.Batch<>(baseOffset, epoch, records);
+            MemoryBatchReader<T> reader = new MemoryBatchReader<>(Collections.singletonList(batch), this);
+            fireHandleCommit(reader);
+        }
+
+        private void fireHandleCommit(BatchReader<T> reader) {
+            synchronized (this) {
+                this.lastSent = reader;
+            }
+            listener.handleCommit(reader);

Review comment:
       Hmm... That's a good question. I guess the issue is that the listener will then be in an unknown state. Should the IO thread keep sending it updates or should it mark it as failed? This comes back to something I have been wondering in the KIP-500 world. Do we want the process to stay active if either the controller or broker listeners have failed or would it be better to shutdown? At the moment, I am leaning toward the latter. In any case, I suggest we let the errors propagate for now and file a jira to reconsider once we are closer to integration. Does that sound fair?

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an interface
+ * is that it allows us to push blocking operations such as reads from disk outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, Closeable {

Review comment:
       I was thinking about this and having some trouble putting my hesitation into words. I guess there are two main reasons:
   
   1. We need some way to communicate iteration progress back to the IO thread. It is probably still possible to do this with a layer of indirection through `Iterable`, but it seemed more natural if the IO thread had access to the `Iterator` object that was used by the listener.
   2. Iteration is not necessarily cheap since it might involve reading from disk. I thought we may as well enforce a single-read pattern.
   

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an interface
+ * is that it allows us to push blocking operations such as reads from disk outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, Closeable {

Review comment:
       I was thinking about this and having some trouble putting my hesitation into words. I guess there are two main reasons why I preferred the `Iterator`:
   
   1. We need some way to communicate iteration progress back to the IO thread. It is probably still possible to do this with a layer of indirection through `Iterable`, but it seemed more natural if the IO thread had access to the `Iterator` object that was used by the listener.
   2. Iteration is not necessarily cheap since it might involve reading from disk. I thought we may as well enforce a single-read pattern.
   

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an interface
+ * is that it allows us to push blocking operations such as reads from disk outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, Closeable {

Review comment:
       Yeah. Not so much difficult, just awkward because of the on-to-many association between the `Iterable` and `Iterator`. 

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an interface
+ * is that it allows us to push blocking operations such as reads from disk outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, Closeable {

Review comment:
       Yeah. Not so much difficult, just awkward because of the one-to-many association between the `Iterable` and `Iterator`. 

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an interface
+ * is that it allows us to push blocking operations such as reads from disk outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, Closeable {

Review comment:
       Yeah. Not so much difficult, but awkward because of the one-to-many association between the `Iterable` and `Iterator`. 

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1017,12 +1028,9 @@ private boolean handleFetchResponse(
                 log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
                     logger.info("Truncated to offset {} from Fetch response from leader {}",
                         truncationOffset, quorum.leaderIdOrNil());
-
-                    // Since the end offset has been updated, we should complete any delayed
-                    // reads at the end offset.
-                    fetchPurgatory.maybeComplete(
-                        new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED),
-                        currentTimeMs);
+                    // After truncation, we complete all pending reads in order to
+                    // ensure that fetches account for the updated log end offset
+                    fetchPurgatory.completeAll(currentTimeMs);

Review comment:
       Filed https://issues.apache.org/jira/browse/KAFKA-10677.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1778,4 +1808,98 @@ public void complete() {
         }
     }
 
+    private final class ListenerContext implements CloseListener<BatchReader<T>> {
+        private final RaftClient.Listener<T> listener;
+        private BatchReader<T> lastSent = null;
+        private long lastAckedOffset = 0;
+        private int claimedEpoch = 0;
+
+        private ListenerContext(Listener<T> listener) {
+            this.listener = listener;
+        }
+
+        /**
+         * Get the last acked offset, which is one greater than the offset of the
+         * last record which was acked by the state machine.
+         */
+        public synchronized long lastAckedOffset() {
+            return lastAckedOffset;
+        }
+
+        /**
+         * Get the next expected offset, which might be larger than the last acked
+         * offset if there are inflight batches which have not been acked yet.
+         * Note that when fetching from disk, we may not know the last offset of
+         * inflight data until it has been processed by the state machine. In this case,
+         * we delay sending additional data until the state machine has read to the
+         * end and the last offset is determined.
+         */
+        public synchronized OptionalLong nextExpectedOffset() {
+            if (lastSent != null) {
+                OptionalLong lastSentOffset = lastSent.lastOffset();
+                if (lastSentOffset.isPresent()) {
+                    return OptionalLong.of(lastSentOffset.getAsLong() + 1);
+                } else {
+                    return OptionalLong.empty();
+                }
+            } else {
+                return OptionalLong.of(lastAckedOffset);
+            }
+        }
+
+        /**
+         * This API is used for committed records that have been received through
+         * replication. In general, followers will write new data to disk before they
+         * know whether it has been committed. Rather than retaining the uncommitted
+         * data in memory, we let the state machine read the records from disk.
+         */
+        public void fireHandleCommit(long baseOffset, Records records) {
+            BufferSupplier bufferSupplier = BufferSupplier.create();
+            RecordsBatchReader<T> reader = new RecordsBatchReader<>(baseOffset, records,
+                serde, bufferSupplier, this);
+            fireHandleCommit(reader);
+        }
+
+        /**
+         * This API is used for committed records originating from {@link #scheduleAppend(int, List)}
+         * on this instance. In this case, we are able to save the original record objects,
+         * which saves the need to read them back from disk. This is a nice optimization
+         * for the leader which is typically doing more work than all of the followers.
+         */
+        public void fireHandleCommit(long baseOffset, int epoch, List<T> records) {
+            BatchReader.Batch<T> batch = new BatchReader.Batch<>(baseOffset, epoch, records);
+            MemoryBatchReader<T> reader = new MemoryBatchReader<>(Collections.singletonList(batch), this);
+            fireHandleCommit(reader);
+        }
+
+        private void fireHandleCommit(BatchReader<T> reader) {
+            synchronized (this) {
+                this.lastSent = reader;
+            }
+            listener.handleCommit(reader);

Review comment:
       Filed https://issues.apache.org/jira/browse/KAFKA-10676.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org