You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2024/02/23 20:56:31 UTC

(kafka) branch trunk updated: KAFKA-16286; Notify listener of latest leader and epoch (#15397)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 474f8c1ad62 KAFKA-16286; Notify listener of latest leader and epoch (#15397)
474f8c1ad62 is described below

commit 474f8c1ad620d1f982ac15cd418ae8462dec4d9e
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Fri Feb 23 12:56:25 2024 -0800

    KAFKA-16286; Notify listener of latest leader and epoch (#15397)
    
    KRaft was only notifying listeners of the latest leader and epoch when the replica transition to a new state. This can result in the listener never getting notified if the registration happened after it had become a follower.
    
    This problem doesn't exists for the active leader because the KRaft implementation attempts to notified the listener of the latest leader and epoch when the replica is the active leader.
    
    This issue is fixed by notifying the listeners of the latest leader and epoch after processing the listener registration request.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  8 +++
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 60 ++++++++++++++++++++++
 .../apache/kafka/raft/RaftClientTestContext.java   |  6 +++
 3 files changed, 74 insertions(+)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index fd98774355c..0f4c7921321 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2202,6 +2202,14 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
             updateListenersProgress(highWatermarkMetadata.offset);
         });
+
+        // Notify the new listeners of the latest leader and epoch
+        Optional<LeaderState<T>> leaderState = quorum.maybeLeaderState();
+        if (leaderState.isPresent()) {
+            maybeFireLeaderChange(leaderState.get());
+        } else {
+            maybeFireLeaderChange();
+        }
     }
 
     private void processRegistration(Registration<T> registration) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 51a5938220f..f18a0ff2e88 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2958,6 +2958,66 @@ public class KafkaRaftClientTest {
         assertEquals(OptionalInt.empty(), secondListener.currentClaimedEpoch());
     }
 
+    @Test
+    public void testHandleLeaderChangeFiresAfterUnattachedRegistration() throws Exception {
+        // When registering a listener while the replica is unattached, it should get notified
+        // with the current epoch
+        // When transitioning to follower, expect another notification with the leader and epoch
+
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 7;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .build();
+
+        // Register another listener and verify that it is notified of latest epoch
+        RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(
+            OptionalInt.of(localId)
+        );
+        context.client.register(secondListener);
+        context.client.poll();
+
+        // Expected leader change notification
+        LeaderAndEpoch expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), epoch);
+        assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch());
+
+        // Transition to follower and the expect a leader changed notification
+        context.deliverRequest(context.beginEpochRequest(epoch, otherNodeId));
+        context.pollUntilResponse();
+
+        // Expected leader change notification
+        expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(otherNodeId), epoch);
+        assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch());
+    }
+
+    @Test
+    public void testHandleLeaderChangeFiresAfterFollowerRegistration() throws Exception {
+        // When registering a listener while the replica is a follower, it should get notified with
+        // the current leader and epoch
+
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 7;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, otherNodeId)
+            .build();
+
+        // Register another listener and verify that it is notified of latest leader and epoch
+        RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(
+            OptionalInt.of(localId)
+        );
+        context.client.register(secondListener);
+        context.client.poll();
+
+        LeaderAndEpoch expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.of(otherNodeId), epoch);
+        assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch());
+    }
+
     @Test
     public void testObserverFetchWithNoLocalId() throws Exception {
         // When no `localId` is defined, the client will behave as an observer.
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 68241ae70b1..ea4078b7c56 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -83,6 +83,7 @@ import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public final class RaftClientTestContext {
@@ -1195,6 +1196,11 @@ public final class RaftClientTestContext {
             // We record the current committed offset as the claimed epoch's start
             // offset. This is useful to verify that the `handleLeaderChange` callback
             // was not received early on the leader.
+            assertTrue(
+                leaderAndEpoch.epoch() >= currentLeaderAndEpoch.epoch(),
+                String.format("new epoch (%d) not >= than old epoch (%d)", leaderAndEpoch.epoch(), currentLeaderAndEpoch.epoch())
+            );
+            assertNotEquals(currentLeaderAndEpoch, leaderAndEpoch);
             this.currentLeaderAndEpoch = leaderAndEpoch;
 
             currentClaimedEpoch().ifPresent(claimedEpoch -> {