You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cmccabe (via GitHub)" <gi...@apache.org> on 2023/04/25 23:24:35 UTC

[GitHub] [kafka] cmccabe opened a new pull request, #13643: MINOR: provide the exact offset to QuorumController.replay

cmccabe opened a new pull request, #13643:
URL: https://github.com/apache/kafka/pull/13643

   Provide the exact record offset to QuorumController.replay() in all cases. There are several situations where this is useful, such as logging, implementing metadata transactions, or handling broker registration records.
   
   In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact record offset from the batch base offset and the record index.
   
   The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can choose a batch end offset later than the one we expect, if someone else is also adding records. While the QC is the only entity submitting data records, control records may be added at any time. In the current implementation, these are really only used for leadership elections. However, this could change with the addition of quorum reconfiguration or similar features.
   
   Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it would have resulted in a batch end offset other than what was expected. This in turn will trigger a controller failover. In the future, if automatically added control records become more common, we may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But for now, this will allow us to rely on the offset as correct.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "jsancio (via GitHub)" <gi...@apache.org>.
jsancio commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273531461


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -755,7 +755,9 @@ public Long apply(List<ApiMessageAndVersion> records) {
                                 recordIndex++;
                             }
                             long nextEndOffset = prevEndOffset + recordIndex;
-                            raftClient.scheduleAtomicAppend(controllerEpoch, OptionalLong.of(nextEndOffset), records);
+                            raftClient.scheduleAtomicAppend(controllerEpoch,
+                                    OptionalLong.of(prevEndOffset + 1),
+                                    records);

Review Comment:
   This indentation doesn't look right. We indent 4 spaces in this case.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log. I think you want something along these lines:
   ```java
       public long logEndOffset() {
           return quorum.maybeLeaderState()
               .map(leader -> leader.accumulator().nextOffset())
               .orElse(log.endOffset().offset);
       }
   ```
   
   Then we can add this method to BatchAccumulator:
   ```java
       public long nextOffset() {
           appendLock.lock();
           try {
               return nextOffset;
           finally {
               appendLock.unlock();
           }
       }
   ```



##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -147,6 +126,13 @@ private long append(int epoch, List<T> records, boolean isAtomic) {
 
         appendLock.lock();
         try {
+            long endOffset = nextOffset + records.size() - 1;

Review Comment:
   I think must readers will assume that this "end offset" is exclusive. I think this offset is inclusive. We normally use `lastOffset` for this kind of offset.
   ```java
               long lastOffset = nextOffset + records.size() - 1;
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   Can we also add tests for this new functionality?



##########
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java:
##########
@@ -232,7 +233,7 @@ public void testLingerBeginsOnFirstWrite() {
         );
 
         time.sleep(15);
-        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a")));
+        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), false));

Review Comment:
   We need tests for the new functionality added to the BatchAccumulator.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -355,7 +355,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception
         for (int i = 0; i < size; i++)
             batchToLarge.add("a");
 
-        assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge));
+        assertThrows(RecordBatchTooLargeException.class,
+                () -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge));

Review Comment:
   We need tests for the new functionality added to `KafkaRaftClient`. That is both the new method `logEndOffset` and the changes to `scheduleAtomicAppend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1264405122


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -740,22 +739,27 @@ public Long apply(List<ApiMessageAndVersion> records) {
                             // Start by trying to apply the record to our in-memory state. This should always
                             // succeed; if it does not, that's a fatal error. It is important to do this before
                             // scheduling the record for Raft replication.
-                            int i = 1;
+                            int recordIndex = 0;
                             for (ApiMessageAndVersion message : records) {
+                                long recordOffset = prevEndOffset + 1 + recordIndex;
                                 try {
-                                    replay(message.message(), Optional.empty(), prevEndOffset + records.size());
+                                    replay(message.message(), Optional.empty(), recordOffset);
                                 } catch (Throwable e) {
-                                    String failureMessage = String.format("Unable to apply %s record, which was " +
-                                            "%d of %d record(s) in the batch following last write offset %d.",
-                                            message.message().getClass().getSimpleName(), i, records.size(),
-                                            prevEndOffset);
+                                    String failureMessage = String.format("Unable to apply %s " +
+                                        "record at offset %d on active controller, from the " +
+                                        "batch with baseOffset %d",
+                                        message.message().getClass().getSimpleName(),
+                                        recordOffset, prevEndOffset + 1);
                                     throw fatalFaultHandler.handleFault(failureMessage, e);
                                 }
-                                i++;
+                                recordIndex++;
                             }
-                            prevEndOffset = raftClient.scheduleAtomicAppend(controllerEpoch, records);
-                            snapshotRegistry.getOrCreateSnapshot(prevEndOffset);
-                            return prevEndOffset;
+                            long nextEndOffset = prevEndOffset + recordIndex;
+                            raftClient.scheduleAtomicAppend(controllerEpoch, OptionalLong.of(nextEndOffset), records);
+                            snapshotRegistry.getOrCreateSnapshot(nextEndOffset);
+                            snapshotRegistry.getOrCreateSnapshot(nextEndOffset);

Review Comment:
   is this duplicate line intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   > This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log.
   
   I feel that a method named "logEndOffset" should just return the log end offset. Returning something else would be misleading.
   
   In any case, I only plan on using this method when the leader becomes active. We will not use it after that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1276255234


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1168,15 +1165,15 @@ private void updateWriteOffset(long offset) {
         }
     }
 
-    private void claim(int epoch) {
+    private void claim(int epoch, long newLastWriteOffset) {
         try {
             if (curClaimEpoch != -1) {
                 throw new RuntimeException("Cannot claim leadership because we are already the " +
                         "active controller.");
             }
             curClaimEpoch = epoch;
             controllerMetrics.setActive(true);
-            updateWriteOffset(lastCommittedOffset);
+            updateWriteOffset(newLastWriteOffset);

Review Comment:
   Understood, I just wanted to make sure I understood the behavior here regarding the new RaftClient API. It sounds like it's fine for us as long as we're single writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263961378


##########
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+    /**
+     * True if this exception should be treated as a fault.
+     */
+    private final boolean isFault;
+
+    /**
+     * True if this exception should cause a controller failover.
+     * All faults cause failover
+     */
+    private final boolean causesFailover;
+
+    /**
+     * The internal exception.
+     */
+    private final Throwable internalException;
+
+    /**
+     * The exception to present to RPC callers, or Optional.empty if the internal exception should
+     * be presented directly.
+     */
+    private final Optional<Throwable> externalException;
+
+    /**
+     * Create an EventHandlerExceptionInfo object from an internal exception.
+     *
+     * @param internal                  The internal exception.
+     * @param latestControllerSupplier  A function we can call to obtain the latest leader id.
+     *
+     * @return                          The new immutable info object.
+     */
+    public static EventHandlerExceptionInfo fromInternal(
+        Throwable internal,
+        Supplier<OptionalInt> latestControllerSupplier
+    ) {
+        if (internal instanceof ApiException) {
+            // This exception is a standard API error response from the controller, which can pass
+            // through without modification.
+            return new EventHandlerExceptionInfo(false, false, internal);
+        } else if (internal instanceof NotLeaderException) {
+            // The controller has lost leadership.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+        } else if (internal instanceof RejectedExecutionException) {
+            // The controller event queue is shutting down.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new TimeoutException("The controller is shutting down.", internal));
+        } else if (internal instanceof BoundedListTooLongException) {
+            // The operation could not be performed because it would have created an overly large
+            // batch.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new PolicyViolationException("Unable to perform excessively large batch " +
+                    "operation."));
+        } else if (internal instanceof UnexpectedEndOffsetException) {
+            // The active controller picked the wrong end offset for its next batch. It must now
+            // fail over. This should be pretty rare.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                new NotControllerException("Unexpected end offset. Controller not known."));

Review Comment:
   Yes, "will resign" is ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1275472446


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1168,15 +1165,15 @@ private void updateWriteOffset(long offset) {
         }
     }
 
-    private void claim(int epoch) {
+    private void claim(int epoch, long newLastWriteOffset) {
         try {
             if (curClaimEpoch != -1) {
                 throw new RuntimeException("Cannot claim leadership because we are already the " +
                         "active controller.");
             }
             curClaimEpoch = epoch;
             controllerMetrics.setActive(true);
-            updateWriteOffset(lastCommittedOffset);
+            updateWriteOffset(newLastWriteOffset);

Review Comment:
   Well, there is not really any difference between the previous iterations of this PR and the current one in this regard. If some other component that isn't the controller is adding messages, our supplied `requiredBaseOffset` may be invalid. It is only a snapshot of the offset at a point in time, after all. Which is why we check `requiredBaseOffset` in `atomicAppend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1654251894

   > @cmccabe, I don't follow this comment. When the client calls KafkaRaftClient::schedule{Atomic}Append the KafkaRaftClient compare the provided offset with the nextOffset stored in the BatchAccumulator. If we want this method to succeed in most cases KafkaRaftClient::logEndOffset should return that offset, BatchAccumulator::nextOffset and not the log end offset.
   
   There can't be anything in the accumulator when we become active, because we are not adding things to the accumulator when we are inactive. Therefore all we need to know is the end of the log.
   
   After becoming active, the active controller tracks its own offset and doesn't need to access the offset in BatchAccumulator or the end of the log offset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263851139


##########
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+    /**
+     * True if this exception should be treated as a fault.

Review Comment:
   Should we mention that this will increment the error metric?



##########
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+    /**
+     * True if this exception should be treated as a fault.
+     */
+    private final boolean isFault;
+
+    /**
+     * True if this exception should cause a controller failover.
+     * All faults cause failover
+     */
+    private final boolean causesFailover;
+
+    /**
+     * The internal exception.
+     */
+    private final Throwable internalException;
+
+    /**
+     * The exception to present to RPC callers, or Optional.empty if the internal exception should
+     * be presented directly.
+     */
+    private final Optional<Throwable> externalException;
+
+    /**
+     * Create an EventHandlerExceptionInfo object from an internal exception.
+     *
+     * @param internal                  The internal exception.
+     * @param latestControllerSupplier  A function we can call to obtain the latest leader id.
+     *
+     * @return                          The new immutable info object.
+     */
+    public static EventHandlerExceptionInfo fromInternal(
+        Throwable internal,
+        Supplier<OptionalInt> latestControllerSupplier
+    ) {
+        if (internal instanceof ApiException) {
+            // This exception is a standard API error response from the controller, which can pass
+            // through without modification.
+            return new EventHandlerExceptionInfo(false, false, internal);
+        } else if (internal instanceof NotLeaderException) {
+            // The controller has lost leadership.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+        } else if (internal instanceof RejectedExecutionException) {
+            // The controller event queue is shutting down.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new TimeoutException("The controller is shutting down.", internal));
+        } else if (internal instanceof BoundedListTooLongException) {
+            // The operation could not be performed because it would have created an overly large
+            // batch.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new PolicyViolationException("Unable to perform excessively large batch " +
+                    "operation."));
+        } else if (internal instanceof UnexpectedEndOffsetException) {
+            // The active controller picked the wrong end offset for its next batch. It must now
+            // fail over. This should be pretty rare.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                new NotControllerException("Unexpected end offset. Controller not known."));

Review Comment:
   nit: "Controller not known"? maybe  "Controller will resign" or something?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -740,22 +735,24 @@ public Long apply(List<ApiMessageAndVersion> records) {
                             // Start by trying to apply the record to our in-memory state. This should always
                             // succeed; if it does not, that's a fatal error. It is important to do this before
                             // scheduling the record for Raft replication.
-                            int i = 1;
+                            int i = 0;

Review Comment:
   I realize it already existed, but can we rename `i` to something more descriptive



##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -172,15 +176,17 @@ default void beginShutdown() {}
      * uncommitted entries after observing an epoch change.
      *
      * @param epoch the current leader epoch
+     * @param requiredEndOffset if this is set, it is the offset we must use.

Review Comment:
   nit "offset which must be exactly the end offset of the atomic append" or something more precise than "use"



##########
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+    /**
+     * True if this exception should be treated as a fault.
+     */
+    private final boolean isFault;
+
+    /**
+     * True if this exception should cause a controller failover.
+     * All faults cause failover
+     */
+    private final boolean causesFailover;
+
+    /**
+     * The internal exception.
+     */
+    private final Throwable internalException;
+
+    /**
+     * The exception to present to RPC callers, or Optional.empty if the internal exception should
+     * be presented directly.
+     */
+    private final Optional<Throwable> externalException;
+
+    /**
+     * Create an EventHandlerExceptionInfo object from an internal exception.
+     *
+     * @param internal                  The internal exception.
+     * @param latestControllerSupplier  A function we can call to obtain the latest leader id.
+     *
+     * @return                          The new immutable info object.
+     */
+    public static EventHandlerExceptionInfo fromInternal(
+        Throwable internal,
+        Supplier<OptionalInt> latestControllerSupplier
+    ) {
+        if (internal instanceof ApiException) {
+            // This exception is a standard API error response from the controller, which can pass
+            // through without modification.
+            return new EventHandlerExceptionInfo(false, false, internal);
+        } else if (internal instanceof NotLeaderException) {
+            // The controller has lost leadership.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+        } else if (internal instanceof RejectedExecutionException) {
+            // The controller event queue is shutting down.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new TimeoutException("The controller is shutting down.", internal));
+        } else if (internal instanceof BoundedListTooLongException) {
+            // The operation could not be performed because it would have created an overly large
+            // batch.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new PolicyViolationException("Unable to perform excessively large batch " +
+                    "operation."));
+        } else if (internal instanceof UnexpectedEndOffsetException) {
+            // The active controller picked the wrong end offset for its next batch. It must now
+            // fail over. This should be pretty rare.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                new NotControllerException("Unexpected end offset. Controller not known."));
+        } else if (internal instanceof InterruptedException) {
+            // The controller event queue has been interrupted. This normally only happens during
+            // a JUnit test that has hung. The test framework sometimes sends an InterruptException
+            // to all threads to try to get them to shut down. This isn't the correct way to shut
+            // the test, but it may happen if something hung.
+            return new EventHandlerExceptionInfo(true, true, internal,
+                new UnknownServerException("The controller was interrupted."));
+        } else {
+            // This is the catch-all case for things that aren't supposed to happen. Null pointer
+            // exceptions, illegal argument exceptions, etc. They get translated into an
+            // UnknownServerException and a controller failover.
+            return new EventHandlerExceptionInfo(true, true, internal,
+                new UnknownServerException(internal));
+        }
+    }
+
+    /**
+     * Returns true if the class and message fields match for two exceptions. Handles nulls.
+     */
+    static boolean exceptionClassesAndMessagesMatch(Throwable a, Throwable b) {
+        if (a == null) return b == null;
+        if (b == null) return false;
+        if (!a.getClass().equals(b.getClass())) return false;
+        if (!Objects.equals(a.getMessage(), b.getMessage())) return false;
+        return true;
+    }
+
+    EventHandlerExceptionInfo(
+        boolean isFault,
+        boolean causesFailover,
+        Throwable internalException
+    ) {
+        this.isFault = isFault;
+        this.causesFailover = causesFailover;
+        this.internalException = internalException;
+        this.externalException = Optional.empty();
+    }
+
+    EventHandlerExceptionInfo(
+        boolean isFault,
+        boolean causesFailover,
+        Throwable internalException,
+        Throwable externalException
+    ) {
+        this.isFault = isFault;
+        this.causesFailover = causesFailover;
+        this.internalException = internalException;
+        this.externalException = Optional.of(externalException);
+    }
+
+    public boolean isFault() {
+        return isFault;
+    }
+
+    public boolean causesFailover() {
+        return causesFailover;
+    }
+
+    public Throwable effectiveExternalException() {
+        return externalException.orElse(internalException);
+    }
+
+    public String failureMessage(
+        int epoch,
+        OptionalLong deltaUs,
+        boolean isActiveController,
+        long lastCommittedOffset
+    ) {
+        StringBuilder bld = new StringBuilder();
+        if (deltaUs.isPresent()) {
+            bld.append("failed with ");
+        } else {
+            bld.append("unable to start processing because of ");
+        }
+        bld.append(internalException.getClass().getSimpleName());
+        if (externalException.isPresent()) {
+            bld.append(" (treated as ").
+                append(externalException.get().getClass().getSimpleName()).append(")");
+        }
+        if (causesFailover()) {
+            bld.append(" at epoch ").append(epoch);

Review Comment:
   Can we just always log the epoch? 



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -973,14 +970,14 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                                 log.debug("Replaying commits from the active node up to " +
                                     "offset {} and epoch {}.", offset, epoch);
                             }
-                            int i = 1;
+                            int i = 0;

Review Comment:
   Same `i` comment as above



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -458,38 +459,32 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
         controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
     }
 
-    private Throwable handleEventException(String name,
-                                           OptionalLong startProcessingTimeNs,
-                                           Throwable exception) {
-        Throwable externalException =
-                ControllerExceptions.toExternalException(exception, () -> latestController());
-        if (!startProcessingTimeNs.isPresent()) {
-            log.error("{}: unable to start processing because of {}. Reason: {}", name,
-                exception.getClass().getSimpleName(), exception.getMessage());
-            return externalException;
-        }
-        long endProcessingTime = time.nanoseconds();
-        long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
-        long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-        if (ControllerExceptions.isExpected(exception)) {
-            log.info("{}: failed with {} in {} us. Reason: {}", name,
-                exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
-            return externalException;
+    private Throwable handleEventException(
+        String name,
+        OptionalLong startProcessingTimeNs,
+        Throwable exception
+    ) {
+        OptionalLong deltaUs;
+        if (startProcessingTimeNs.isPresent()) {
+            long endProcessingTime = time.nanoseconds();
+            long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
+            deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS));
+        } else {
+            deltaUs = OptionalLong.empty();
+        }
+        EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
+                fromInternal(exception, () -> latestController());
+        String failureMessage = info.failureMessage(lastCommittedEpoch, deltaUs,
+                isActiveController(), lastCommittedOffset);
+        if (info.isFault()) {
+            nonFatalFaultHandler.handleFault(name + ": " + failureMessage, exception);
+        } else {
+            log.info("{}: {}", name, failureMessage);

Review Comment:
   why not ERROR here?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1076,10 +1073,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                         renounce();
                     }
                 } else if (newLeader.isLeader(nodeId)) {
-                    log.info("Becoming the active controller at epoch {}, committed offset {}, " +
-                        "committed epoch {}", newLeader.epoch(), lastCommittedOffset,
-                        lastCommittedEpoch);
-                    claim(newLeader.epoch());
+                    long newLastWriteOffset = endOffset - 1;

Review Comment:
   After an election, is the endOffset report by raft both the last written and last committed offset?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263966038


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -458,38 +459,32 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
         controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
     }
 
-    private Throwable handleEventException(String name,
-                                           OptionalLong startProcessingTimeNs,
-                                           Throwable exception) {
-        Throwable externalException =
-                ControllerExceptions.toExternalException(exception, () -> latestController());
-        if (!startProcessingTimeNs.isPresent()) {
-            log.error("{}: unable to start processing because of {}. Reason: {}", name,
-                exception.getClass().getSimpleName(), exception.getMessage());
-            return externalException;
-        }
-        long endProcessingTime = time.nanoseconds();
-        long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
-        long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-        if (ControllerExceptions.isExpected(exception)) {
-            log.info("{}: failed with {} in {} us. Reason: {}", name,
-                exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
-            return externalException;
+    private Throwable handleEventException(
+        String name,
+        OptionalLong startProcessingTimeNs,
+        Throwable exception
+    ) {
+        OptionalLong deltaUs;
+        if (startProcessingTimeNs.isPresent()) {
+            long endProcessingTime = time.nanoseconds();
+            long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
+            deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS));
+        } else {
+            deltaUs = OptionalLong.empty();
+        }
+        EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
+                fromInternal(exception, () -> latestController());
+        String failureMessage = info.failureMessage(lastCommittedEpoch, deltaUs,
+                isActiveController(), lastCommittedOffset);
+        if (info.isFault()) {
+            nonFatalFaultHandler.handleFault(name + ": " + failureMessage, exception);
+        } else {
+            log.info("{}: {}", name, failureMessage);

Review Comment:
   In general if something isn't a fault, it shouldn't be logged at ERROR level.
   
   For example, if someone tries to create a topic but one already exists with that name, that should not cause ERROR messages in the controller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1654254344

   > I couldn't find a test for the new KafkaRaftClient::scheduleAtomicAppend.
   
   I added `KafkaRaftClientTest.testAppendWithRequiredBaseOffset`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   > This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log.
   
   Hmm... the method name is "logEndOffset." So it should just return the log end offset, right? Returning something else would be misleading.
   
   In any case, we only need this method when the leader becomes active. We will not use it after that.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   > Can we also add tests for this new functionality?
   
   Yes, good point. I will add a test for the `requiredEndOffset` parameter and the `logEndOffset` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1264717661


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -740,22 +739,27 @@ public Long apply(List<ApiMessageAndVersion> records) {
                             // Start by trying to apply the record to our in-memory state. This should always
                             // succeed; if it does not, that's a fatal error. It is important to do this before
                             // scheduling the record for Raft replication.
-                            int i = 1;
+                            int recordIndex = 0;
                             for (ApiMessageAndVersion message : records) {
+                                long recordOffset = prevEndOffset + 1 + recordIndex;
                                 try {
-                                    replay(message.message(), Optional.empty(), prevEndOffset + records.size());
+                                    replay(message.message(), Optional.empty(), recordOffset);
                                 } catch (Throwable e) {
-                                    String failureMessage = String.format("Unable to apply %s record, which was " +
-                                            "%d of %d record(s) in the batch following last write offset %d.",
-                                            message.message().getClass().getSimpleName(), i, records.size(),
-                                            prevEndOffset);
+                                    String failureMessage = String.format("Unable to apply %s " +
+                                        "record at offset %d on active controller, from the " +
+                                        "batch with baseOffset %d",
+                                        message.message().getClass().getSimpleName(),
+                                        recordOffset, prevEndOffset + 1);
                                     throw fatalFaultHandler.handleFault(failureMessage, e);
                                 }
-                                i++;
+                                recordIndex++;
                             }
-                            prevEndOffset = raftClient.scheduleAtomicAppend(controllerEpoch, records);
-                            snapshotRegistry.getOrCreateSnapshot(prevEndOffset);
-                            return prevEndOffset;
+                            long nextEndOffset = prevEndOffset + recordIndex;
+                            raftClient.scheduleAtomicAppend(controllerEpoch, OptionalLong.of(nextEndOffset), records);
+                            snapshotRegistry.getOrCreateSnapshot(nextEndOffset);
+                            snapshotRegistry.getOrCreateSnapshot(nextEndOffset);

Review Comment:
   Good catch. That was not intentional. Although given the way that function works, would have been benign.
   
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe merged pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe merged PR #13643:
URL: https://github.com/apache/kafka/pull/13643


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1636107554

   > Regarding the failure case where the end offset of a batch is not equal to what the controller expected, will this only happen if a raft election occurred that was not initiated by a resignation? What are the circumstances when this can happen? Raft timeouts?
   >
   > IIUC, when an election happens in the middle of an atomic batch, the batch will be lost anyways. The node will finish writing the batch to the local log at epoch N, then process the new leader at N+1, and then it will truncate its own log once it fetches from the new leader at N+1 and sees the start offset for the epoch is less than its own end offset. Is that about right?
   
   There's been some discussion of adding more Raft internal control records. One example is if we wanted to implement a dynamic change-of-quorum mechanism. There would probably be internal Raft records associated with that. It's not clear whether change-of-quorum events would also always involve a leader change -- I think in some cases they would not.
   
   Like I said earlier, if we end up adding more background raft messages, we might introduce some mechanism for the active controller to get an "offset lock" so it can get an offset, replay the records, and then try to commit them under that lock. That would minimize failovers caused by these background messages. But since they don't exist today, we can avoid that for today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "jsancio (via GitHub)" <gi...@apache.org>.
jsancio commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1640778580

   > > @cmccabe Why do you need this change now? Since KRaft is a single writer (The active controller) and the active controller is guarantee to have see all of the records before becoming leader, don't you always know what is going to be the next offset?
   > 
   > It is possible for Raft itself to insert control records. So it is not enough to rely on guessing the next offset. We have to know for sure.
   
   I it is not possible in the current implementation. The active controller will lose leadership if KRaft inserts a control record. Have you seen this in practice?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267137597


##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -172,15 +176,17 @@ default void beginShutdown() {}
      * uncommitted entries after observing an epoch change.
      *
      * @param epoch the current leader epoch
+     * @param requiredEndOffset if this is set, it is the offset we must use as the end offset (inclusive).

Review Comment:
   ok. we can use `requiredBaseOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1264232306


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1076,10 +1073,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                         renounce();
                     }
                 } else if (newLeader.isLeader(nodeId)) {
-                    log.info("Becoming the active controller at epoch {}, committed offset {}, " +
-                        "committed epoch {}", newLeader.epoch(), lastCommittedOffset,
-                        lastCommittedEpoch);
-                    claim(newLeader.epoch());
+                    long newLastWriteOffset = endOffset - 1;

Review Comment:
   The endOffset comes directly from the log and is as described... the end offset (exclusive).
   
   Thinking about it more, I don't think I need to assume that it's committed, so I won't.
   
   But it is used to calculate the next offset that the active controller should try to write to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "jsancio (via GitHub)" <gi...@apache.org>.
jsancio commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1644166696

   @cmccabe @mumrah and I discuss this PR offline. We agreed to add `long RaftClient::logEndOffset()` method that would return
   1. The `BatchAccumulator`'s baseOffset when the replica is a leader or
   2. The log end offset when the replica is not a leader.
   
   This is instead of the sending the LEO in the `RaftClient.Listener::handleLeaderChange` callback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1648709714

   Thanks, @jsancio . I updated the PR with the results of our discussion. One small change I made is that I think `RaftClient#logEndOffset` can just unconditionally return the log end offset, as the name implies. We only need to call it when becoming active anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1640698569

   > @cmccabe This doesn't look like a MINOR change since you are changing the interface of a few raft types. Do you mind creating a Jira that explains the problem you are trying to solve?
   
   OK. @jsancio , I filed https://issues.apache.org/jira/browse/KAFKA-15213 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   > This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log.
   
   I don't care about that, though, since I only plan on using this method when the leader becomes active. We will not use it after that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273742201


##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -147,6 +126,13 @@ private long append(int epoch, List<T> records, boolean isAtomic) {
 
         appendLock.lock();
         try {
+            long endOffset = nextOffset + records.size() - 1;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1640706676

   > @cmccabe Why do you need this change now? Since KRaft is a single writer (The active controller) and the active controller is guarantee to have see all of the records before becoming leader, don't you always know what is going to be the next offset?
   
   It is possible for Raft itself to insert control records. So it is not enough to rely on guessing the next offset. We have to know for sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1640907827

   > It is not possible in the current implementation. The active controller will lose leadership if KRaft inserts a control record. Have you seen this in practice?
   
   The implementation could change. If we're going to rely on the offsets for correctness, we need guarantees.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "mumrah (via GitHub)" <gi...@apache.org>.
mumrah commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1275381984


##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -172,15 +173,17 @@ default void beginShutdown() {}
      * uncommitted entries after observing an epoch change.

Review Comment:
   It's probably worth adding a sentence or two about the new optimistic concurrency.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1168,15 +1165,15 @@ private void updateWriteOffset(long offset) {
         }
     }
 
-    private void claim(int epoch) {
+    private void claim(int epoch, long newLastWriteOffset) {
         try {
             if (curClaimEpoch != -1) {
                 throw new RuntimeException("Cannot claim leadership because we are already the " +
                         "active controller.");
             }
             curClaimEpoch = epoch;
             controllerMetrics.setActive(true);
-            updateWriteOffset(lastCommittedOffset);
+            updateWriteOffset(newLastWriteOffset);

Review Comment:
   Previously, we would update `lastCommittedOffset` as we got the `handleCommit` callback from our RaftClient. Since we process Raft events sequentially (and they are delivered sequentially from a single thread), we always process any commit callbacks before a leader change. Which means this offset is valid with respect to the end offset when the leadership changed.
   
   Now, while we're processing a leader change, we _ask_ RaftClient for its end offset. Is there any possibility that commits could be made that would make this end offset greater than we expect? Basically, can we be sure that the end offset doesn't change between the time Raft becomes the leader and this event is processed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263963588


##########
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+    /**
+     * True if this exception should be treated as a fault.
+     */
+    private final boolean isFault;
+
+    /**
+     * True if this exception should cause a controller failover.
+     * All faults cause failover
+     */
+    private final boolean causesFailover;
+
+    /**
+     * The internal exception.
+     */
+    private final Throwable internalException;
+
+    /**
+     * The exception to present to RPC callers, or Optional.empty if the internal exception should
+     * be presented directly.
+     */
+    private final Optional<Throwable> externalException;
+
+    /**
+     * Create an EventHandlerExceptionInfo object from an internal exception.
+     *
+     * @param internal                  The internal exception.
+     * @param latestControllerSupplier  A function we can call to obtain the latest leader id.
+     *
+     * @return                          The new immutable info object.
+     */
+    public static EventHandlerExceptionInfo fromInternal(
+        Throwable internal,
+        Supplier<OptionalInt> latestControllerSupplier
+    ) {
+        if (internal instanceof ApiException) {
+            // This exception is a standard API error response from the controller, which can pass
+            // through without modification.
+            return new EventHandlerExceptionInfo(false, false, internal);
+        } else if (internal instanceof NotLeaderException) {
+            // The controller has lost leadership.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+        } else if (internal instanceof RejectedExecutionException) {
+            // The controller event queue is shutting down.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new TimeoutException("The controller is shutting down.", internal));
+        } else if (internal instanceof BoundedListTooLongException) {
+            // The operation could not be performed because it would have created an overly large
+            // batch.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new PolicyViolationException("Unable to perform excessively large batch " +
+                    "operation."));
+        } else if (internal instanceof UnexpectedEndOffsetException) {
+            // The active controller picked the wrong end offset for its next batch. It must now
+            // fail over. This should be pretty rare.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                new NotControllerException("Unexpected end offset. Controller not known."));
+        } else if (internal instanceof InterruptedException) {
+            // The controller event queue has been interrupted. This normally only happens during
+            // a JUnit test that has hung. The test framework sometimes sends an InterruptException
+            // to all threads to try to get them to shut down. This isn't the correct way to shut
+            // the test, but it may happen if something hung.
+            return new EventHandlerExceptionInfo(true, true, internal,
+                new UnknownServerException("The controller was interrupted."));
+        } else {
+            // This is the catch-all case for things that aren't supposed to happen. Null pointer
+            // exceptions, illegal argument exceptions, etc. They get translated into an
+            // UnknownServerException and a controller failover.
+            return new EventHandlerExceptionInfo(true, true, internal,
+                new UnknownServerException(internal));
+        }
+    }
+
+    /**
+     * Returns true if the class and message fields match for two exceptions. Handles nulls.
+     */
+    static boolean exceptionClassesAndMessagesMatch(Throwable a, Throwable b) {
+        if (a == null) return b == null;
+        if (b == null) return false;
+        if (!a.getClass().equals(b.getClass())) return false;
+        if (!Objects.equals(a.getMessage(), b.getMessage())) return false;
+        return true;
+    }
+
+    EventHandlerExceptionInfo(
+        boolean isFault,
+        boolean causesFailover,
+        Throwable internalException
+    ) {
+        this.isFault = isFault;
+        this.causesFailover = causesFailover;
+        this.internalException = internalException;
+        this.externalException = Optional.empty();
+    }
+
+    EventHandlerExceptionInfo(
+        boolean isFault,
+        boolean causesFailover,
+        Throwable internalException,
+        Throwable externalException
+    ) {
+        this.isFault = isFault;
+        this.causesFailover = causesFailover;
+        this.internalException = internalException;
+        this.externalException = Optional.of(externalException);
+    }
+
+    public boolean isFault() {
+        return isFault;
+    }
+
+    public boolean causesFailover() {
+        return causesFailover;
+    }
+
+    public Throwable effectiveExternalException() {
+        return externalException.orElse(internalException);
+    }
+
+    public String failureMessage(
+        int epoch,
+        OptionalLong deltaUs,
+        boolean isActiveController,
+        long lastCommittedOffset
+    ) {
+        StringBuilder bld = new StringBuilder();
+        if (deltaUs.isPresent()) {
+            bld.append("failed with ");
+        } else {
+            bld.append("unable to start processing because of ");
+        }
+        bld.append(internalException.getClass().getSimpleName());
+        if (externalException.isPresent()) {
+            bld.append(" (treated as ").
+                append(externalException.get().getClass().getSimpleName()).append(")");
+        }
+        if (causesFailover()) {
+            bld.append(" at epoch ").append(epoch);

Review Comment:
   It's not really useful to log the epoch if we're not failing over.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267142346


##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -89,52 +91,12 @@ public BatchAccumulator(
         this.appendLock = new ReentrantLock();
     }
 
-    /**
-     * Append a list of records into as many batches as necessary.
-     *
-     * The order of the elements in the records argument will match the order in the batches.
-     * This method will use as many batches as necessary to serialize all of the records. Since
-     * this method can split the records into multiple batches it is possible that some of the
-     * records will get committed while other will not when the leader fails.
-     *
-     * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException}
-     *              will be thrown
-     * @param records the list of records to include in the batches
-     * @return the expected offset of the last record
-     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
-     *         batch size; if this exception is throw some of the elements in records may have
-     *         been committed
-     * @throws NotLeaderException if the epoch is less than the leader epoch
-     * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
-     * @throws BufferAllocationException if we failed to allocate memory for the records
-     * @throws IllegalStateException if we tried to append new records after the batch has been built
-     */
-    public long append(int epoch, List<T> records) {
-        return append(epoch, records, false);
-    }
-
-    /**
-     * Append a list of records into an atomic batch. We guarantee all records are included in the
-     * same underlying record batch so that either all of the records become committed or none of
-     * them do.
-     *
-     * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException}
-     *              will be thrown
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record
-     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
-     *         batch size; if this exception is throw none of the elements in records were
-     *         committed
-     * @throws NotLeaderException if the epoch is less than the leader epoch
-     * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
-     * @throws BufferAllocationException if we failed to allocate memory for the records
-     * @throws IllegalStateException if we tried to append new records after the batch has been built
-     */
-    public long appendAtomic(int epoch, List<T> records) {
-        return append(epoch, records, true);
-    }
-
-    private long append(int epoch, List<T> records, boolean isAtomic) {
+    public long append(
+        int epoch,
+        List<T> records,
+        OptionalLong requiredEndOffset,
+        boolean isAtomic
+    ) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1267130598


##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -80,8 +81,11 @@ interface Listener<T> {
          * epoch.
          *
          * @param leader the current leader and epoch
+         * @param endOffset the current log end offset (exclusive). This is useful for nodes that
+         *                  are claiming leadership, because it lets them know what log offset they
+         *                  should attempt to write to next.

Review Comment:
   > What problem are you trying so solve?
   
   We have to know the offset of records that we apply. But we apply records before we submit them to Raft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273742201


##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -147,6 +126,13 @@ private long append(int epoch, List<T> records, boolean isAtomic) {
 
         appendLock.lock();
         try {
+            long endOffset = nextOffset + records.size() - 1;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273720728


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   > This is not correct in all cases. The leader can have records in the base accumulator that have not been sent to the log.
   
   I don't care about that, though, since I only plan on using this method when the leader becomes active. We will not use it after that. I also feel that a method named "logEndOffset" should just return the log end offset. Returning something else would be misleading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1273831307


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   > Can we also add tests for this new functionality?
   
   I will add a test for the `requiredEndOffset` parameter and the `logEndOffset` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1276690892


##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -171,16 +172,21 @@ default void beginShutdown() {}
      * to resign its leadership. The state machine is expected to discard all
      * uncommitted entries after observing an epoch change.
      *
+     * If the current base offset does not match the supplied required base offset,
+     * then this method will throw {@link UnexpectedBaseOffsetException}.
+     *
      * @param epoch the current leader epoch
+     * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset.
      * @param records the list of records to append
      * @return the expected offset of the last record if append succeed
      * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum
      *         batch size; if this exception is throw none of the elements in records were
      *         committed
      * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch
      * @throws BufferAllocationException we failed to allocate memory for the records
+     * @throws UnexpectedBaseOffsetException the requested base offset could not be obtained.
      */
-    long scheduleAtomicAppend(int epoch, List<T> records);
+    long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List<T> records);

Review Comment:
   The controller doesn't use `scheduleAppend`. We only use `scheduleAtomicAppend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #13643: KAFKA-15213: provide the exact offset to QuorumController.replay

Posted by "jsancio (via GitHub)" <gi...@apache.org>.
jsancio commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1276417011


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2394,6 +2394,11 @@ public Optional<OffsetAndEpoch> latestSnapshotId() {
         return log.latestSnapshotId();
     }
 
+    @Override
+    public long logEndOffset() {
+        return log.endOffset().offset;
+    }

Review Comment:
   > Hmm... the method name is "logEndOffset." So it should just return the log end offset, right? Returning something else would be misleading.
   
   @cmccabe, I don't follow this comment. When the client calls `KafkaRaftClient::schedule{Atomic}Append` the `KafkaRaftClient` compare the provided offset with the `nextOffset` stored in the `BatchAccumulator`. If we want this method to succeed in most cases `KafkaRaftClient::logEndOffset` should return that offset, `BatchAccumulator::nextOffset` and not the log end offset.
   
   Maybe `logEndOffset` is not a great name. I am okay renaming this to `KafkaRaftClient::endOffset()` but I am open to suggestions.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -355,7 +355,8 @@ public void testAppendFailedWithRecordBatchTooLargeException() throws Exception
         for (int i = 0; i < size; i++)
             batchToLarge.add("a");
 
-        assertThrows(RecordBatchTooLargeException.class, () -> context.client.scheduleAtomicAppend(epoch, batchToLarge));
+        assertThrows(RecordBatchTooLargeException.class,
+                () -> context.client.scheduleAtomicAppend(epoch, OptionalLong.empty(), batchToLarge));

Review Comment:
   I couldn't find a test for the new `KafkaRaftClient::scheduleAtomicAppend`.



##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -171,16 +172,21 @@ default void beginShutdown() {}
      * to resign its leadership. The state machine is expected to discard all
      * uncommitted entries after observing an epoch change.
      *
+     * If the current base offset does not match the supplied required base offset,
+     * then this method will throw {@link UnexpectedBaseOffsetException}.
+     *
      * @param epoch the current leader epoch
+     * @param requiredBaseOffset if this is set, it is the offset we must use as the base offset.
      * @param records the list of records to append
      * @return the expected offset of the last record if append succeed
      * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if the size of the records is greater than the maximum
      *         batch size; if this exception is throw none of the elements in records were
      *         committed
      * @throws NotLeaderException if we are not the current leader or the epoch doesn't match the leader epoch
      * @throws BufferAllocationException we failed to allocate memory for the records
+     * @throws UnexpectedBaseOffsetException the requested base offset could not be obtained.
      */
-    long scheduleAtomicAppend(int epoch, List<T> records);
+    long scheduleAtomicAppend(int epoch, OptionalLong requiredBaseOffset, List<T> records);

Review Comment:
   What's the argument/reason for adding this functionality to `scheduleAtomicAppend` and not `scheduleAppend`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "jsancio (via GitHub)" <gi...@apache.org>.
jsancio commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1265927585


##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -172,15 +176,17 @@ default void beginShutdown() {}
      * uncommitted entries after observing an epoch change.
      *
      * @param epoch the current leader epoch
+     * @param requiredEndOffset if this is set, it is the offset we must use as the end offset (inclusive).

Review Comment:
   This is the same base or starting offset. The `RaftClient` does expose LEO but it does expose a base offset. I think we should do something like this:
   ```java
        * @param expectedBaseOffset if this is set, it matches the base offset that KRaft will use for the {@code records}.
   ```



##########
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##########
@@ -80,8 +81,11 @@ interface Listener<T> {
          * epoch.
          *
          * @param leader the current leader and epoch
+         * @param endOffset the current log end offset (exclusive). This is useful for nodes that
+         *                  are claiming leadership, because it lets them know what log offset they
+         *                  should attempt to write to next.

Review Comment:
   This is only possible in the leader. This is not well defined on any replica outside of the leader. What problem are you trying so solve?
   
   KRaft guarantees that the committed offset is the LEO when notifying the leader that is now the leader of the partition. For all other replicas leadership change and lost of leadership is done immediately. 



##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -147,6 +109,13 @@ private long append(int epoch, List<T> records, boolean isAtomic) {
 
         appendLock.lock();
         try {
+            long endOffset = nextOffset + records.size() - 1;
+            requiredEndOffset.ifPresent(r -> {

Review Comment:
   If the user of `RaftClient` provides the "expected base offset" this becomes `expectedBaseOffset.ifPresent(baseOffset -> if (baseOffset == nextOffset) { ... } );`



##########
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##########
@@ -89,52 +91,12 @@ public BatchAccumulator(
         this.appendLock = new ReentrantLock();
     }
 
-    /**
-     * Append a list of records into as many batches as necessary.
-     *
-     * The order of the elements in the records argument will match the order in the batches.
-     * This method will use as many batches as necessary to serialize all of the records. Since
-     * this method can split the records into multiple batches it is possible that some of the
-     * records will get committed while other will not when the leader fails.
-     *
-     * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException}
-     *              will be thrown
-     * @param records the list of records to include in the batches
-     * @return the expected offset of the last record
-     * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
-     *         batch size; if this exception is throw some of the elements in records may have
-     *         been committed
-     * @throws NotLeaderException if the epoch is less than the leader epoch
-     * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
-     * @throws BufferAllocationException if we failed to allocate memory for the records
-     * @throws IllegalStateException if we tried to append new records after the batch has been built
-     */
-    public long append(int epoch, List<T> records) {
-        return append(epoch, records, false);
-    }
-
-    /**
-     * Append a list of records into an atomic batch. We guarantee all records are included in the
-     * same underlying record batch so that either all of the records become committed or none of
-     * them do.
-     *
-     * @param epoch the expected leader epoch. If this does not match, then {@link NotLeaderException}
-     *              will be thrown
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record
-     * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
-     *         batch size; if this exception is throw none of the elements in records were
-     *         committed
-     * @throws NotLeaderException if the epoch is less than the leader epoch
-     * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
-     * @throws BufferAllocationException if we failed to allocate memory for the records
-     * @throws IllegalStateException if we tried to append new records after the batch has been built
-     */
-    public long appendAtomic(int epoch, List<T> records) {
-        return append(epoch, records, true);
-    }
-
-    private long append(int epoch, List<T> records, boolean isAtomic) {
+    public long append(
+        int epoch,
+        List<T> records,
+        OptionalLong requiredEndOffset,
+        boolean isAtomic
+    ) {

Review Comment:
   Please document this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263962510


##########
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+    /**
+     * True if this exception should be treated as a fault.

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263971924


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -740,22 +735,24 @@ public Long apply(List<ApiMessageAndVersion> records) {
                             // Start by trying to apply the record to our in-memory state. This should always
                             // succeed; if it does not, that's a fatal error. It is important to do this before
                             // scheduling the record for Raft replication.
-                            int i = 1;
+                            int i = 0;

Review Comment:
   it's just an index. I can call it `recordIndex`.
   
   I will revise the error mesage a bit too. The clunky wording from before reflected the fact that we didn't really know the offset.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -973,14 +970,14 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                                 log.debug("Replaying commits from the active node up to " +
                                     "offset {} and epoch {}.", offset, epoch);
                             }
-                            int i = 1;
+                            int i = 0;

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org