You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/06 03:08:42 UTC

[kafka] branch trunk updated: KAFKA-13943; Make `LocalLogManager` implementation consistent with the `RaftClient` contract (#12224)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e4c8f704c2 KAFKA-13943; Make `LocalLogManager` implementation consistent with the `RaftClient` contract (#12224)
5e4c8f704c2 is described below

commit 5e4c8f704c28d9f60a49a2bf2fb4ea8b7f7e231d
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Wed Jul 6 05:08:28 2022 +0200

    KAFKA-13943; Make `LocalLogManager` implementation consistent with the `RaftClient` contract (#12224)
    
    Fixes two issues in the implementation of `LocalLogManager`:
    
    - As per the interface contract for `RaftClient.scheduleAtomicAppend()`, it should throw a `NotLeaderException` exception when the provided current leader epoch does not match the current epoch. However, the current `LocalLogManager`'s implementation of the API returns a LONG_MAX instead of throwing an exception. This change fixes the behaviour and makes it consistent with the interface contract.
    -  As per the interface contract for `RaftClient.resign(epoch)`if the parameter epoch does not match the current epoch, this call will be ignored. But in the current `LocalLogManager` implementation the leader epoch might change when the thread is waiting to acquire a lock on `shared.tryAppend()` (note that tryAppend() is a synchronized method). In such a case, if a NotALeaderException is thrown (as per code change in above), then resign should be ignored.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, Tom Bentley <tb...@redhat.com>, Jason Gustafson <ja...@confluent.io>
---
 .../org/apache/kafka/metalog/LocalLogManager.java  | 53 +++++++++++++++++-----
 .../java/org/apache/kafka/raft/RaftClient.java     |  5 +-
 .../kafka/raft/internals/BatchAccumulator.java     |  9 ++--
 3 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 36fe3ec2c1c..c8e39ae3289 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -31,6 +31,7 @@ import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.raft.internals.MemoryBatchReader;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.snapshot.MockRawSnapshotReader;
@@ -226,16 +227,20 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
         }
 
         synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) {
-            if (epoch != leader.epoch()) {
-                log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
-                    "match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
-                return Long.MAX_VALUE;
-            }
             if (!leader.isLeader(nodeId)) {
-                log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
-                    "match the current leader id of {}.", nodeId, epoch, leader.leaderId());
-                return Long.MAX_VALUE;
+                log.debug("tryAppend(nodeId={}, epoch={}): the given node id does not " +
+                        "match the current leader id of {}.", nodeId, epoch, leader.leaderId());
+                throw new NotLeaderException("Append failed because the replication is not the current leader");
+            }
+
+            if (epoch < leader.epoch()) {
+                throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. " +
+                        "Current leader epoch = " + leader.epoch());
+            } else if (epoch > leader.epoch()) {
+                throw new IllegalArgumentException("Attempt to append from epoch " + epoch +
+                        " which is larger than the current epoch " + leader.epoch());
             }
+
             log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
             long offset = append(batch);
             electLeaderIfNeeded();
@@ -723,9 +728,35 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
 
     @Override
     public void resign(int epoch) {
-        LeaderAndEpoch curLeader = leader;
-        LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), curLeader.epoch() + 1);
-        shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
+        if (epoch < 0) {
+            throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch);
+        }
+
+        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+        int currentEpoch = leaderAndEpoch.epoch();
+
+        if (epoch > currentEpoch) {
+            throw new IllegalArgumentException("Attempt to resign from epoch " + epoch +
+                    " which is larger than the current epoch " + currentEpoch);
+        } else if (epoch < currentEpoch) {
+            // If the passed epoch is smaller than the current epoch, then it might mean
+            // that the listener has not been notified about a leader change that already
+            // took place. In this case, we consider the call as already fulfilled and
+            // take no further action.
+            log.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
+                    "current epoch {}", epoch, currentEpoch);
+            return;
+        }
+
+        LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), currentEpoch + 1);
+        try {
+            shared.tryAppend(nodeId, currentEpoch, new LeaderChangeBatch(nextLeader));
+        } catch (NotLeaderException exp) {
+            // the leader epoch has already advanced. resign is a no op.
+            log.debug("Ignoring call to resign from epoch {}. Either we are not the leader or the provided epoch is " +
+                    "smaller than the current epoch {}", epoch, currentEpoch);
+            return;
+        }
     }
 
     @Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 952cc60a387..51f859c6c07 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -205,8 +205,11 @@ public interface RaftClient<T> extends AutoCloseable {
      * Notification of successful resignation can be observed through
      * {@link Listener#handleLeaderChange(LeaderAndEpoch)}.
      *
-     * @param epoch the epoch to resign from. If this does not match the current epoch, this
+     * @param epoch the epoch to resign from. If this epoch is smaller than the current epoch, this
      *              call will be ignored.
+     *
+     * @throws IllegalArgumentException - if the passed epoch is invalid (negative or greater than current) or
+     * if the listener is not the leader associated with this epoch.
      */
     void resign(int epoch);
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 77efd2f7e15..323f393aebe 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -104,7 +104,8 @@ public class BatchAccumulator<T> implements Closeable {
      * @throws RecordBatchTooLargeException if the size of one record T is greater than the maximum
      *         batch size; if this exception is throw some of the elements in records may have
      *         been committed
-     * @throws NotLeaderException if the epoch doesn't match the leader epoch
+     * @throws NotLeaderException if the epoch is less than the leader epoch
+     * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
      * @throws BufferAllocationException if we failed to allocate memory for the records
      */
     public long append(int epoch, List<T> records) {
@@ -123,7 +124,8 @@ public class BatchAccumulator<T> implements Closeable {
      * @throws RecordBatchTooLargeException if the size of the records is greater than the maximum
      *         batch size; if this exception is throw none of the elements in records were
      *         committed
-     * @throws NotLeaderException if the epoch doesn't match the leader epoch
+     * @throws NotLeaderException if the epoch is less than the leader epoch
+     * @throws IllegalArgumentException if the epoch is invalid (greater than the leader epoch)
      * @throws BufferAllocationException if we failed to allocate memory for the records
      */
     public long appendAtomic(int epoch, List<T> records) {
@@ -132,7 +134,8 @@ public class BatchAccumulator<T> implements Closeable {
 
     private long append(int epoch, List<T> records, boolean isAtomic) {
         if (epoch < this.epoch) {
-            throw new NotLeaderException("Append failed because the epoch doesn't match");
+            throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. " +
+                    "Current leader epoch = " + this.epoch());
         } else if (epoch > this.epoch) {
             throw new IllegalArgumentException("Attempt to append from epoch " + epoch +
                 " which is larger than the current epoch " + this.epoch);