You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/22 17:02:46 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10913: KAFKA-12631; Implement `resign` API in `KafkaRaftClient`

jsancio commented on a change in pull request #10913:
URL: https://github.com/apache/kafka/pull/10913#discussion_r656397806



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -143,6 +144,7 @@
     public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
     public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;
 
+    private final AtomicInteger resignedEpoch = new AtomicInteger(-1);

Review comment:
       I am wondering if we should move this state to `LeaderState`. Based on the comments and implementation of `resign`, this operation is only valid when the raft client is the leader. I think the implementation in this PR is functionally correct but it looks like `resignedEpoch` is always increasing. The raft client never sets resignedEpoch back to `-1` after resigning.
   
   An additional benefit of moving it to `LeaderState` is that `resigndEpoch` will be reset to `-1` after it becomes leader again.

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);

Review comment:
       Hmm. The state would never change because `poll` was never called. Maybe we should sleep for more than election timeout and poll to make sure that the resign was ignore?

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);
+    }
+
+    @Test
+    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, remoteId1));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        context.assertElectedLeader(resignedEpoch + 1, remoteId1);

Review comment:
       This comment applies to a few other places.
   
   It looks like this checks `quorumStateStore` directly. Should we check that `Listener.handleLeaderChange` is called or do we want to rely on other tests verifying that this function is called when the epoch changes?

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -359,6 +360,150 @@ public void testResignWillCompleteFetchPurgatory() throws Exception {
         assertFalse(context.client.isShuttingDown());
     }
 
+    @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.assertElectedLeader(currentEpoch, localId);
+    }
+
+    @Test
+    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, remoteId1));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        context.assertElectedLeader(resignedEpoch + 1, remoteId1);
+    }
+
+    @Test
+    public void testElectionTimeoutAfterUserInitiatedResign() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.pollUntilRequest();
+        int correlationId = context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId);
+
+        EndQuorumEpochResponseData response = EndQuorumEpochResponse.singletonResponse(
+            Errors.NONE,
+            context.metadataPartition,
+            Errors.NONE,
+            resignedEpoch,
+            localId
+        );
+
+        context.deliverResponse(correlationId, otherNodeId, response);
+        context.client.poll();
+
+        // We do not resend `EndQuorumRequest` once the other voter has acknowledged it.
+        context.time.sleep(context.retryBackoffMs);
+        context.client.poll();
+        assertFalse(context.channel.hasSentRequests());
+
+        // Any `Fetch` received in the resigned st
+        // ate should result in a NOT_LEADER error.

Review comment:
       Let's move at least "ate" to the line above so that the word "state" is not split over two lines

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -171,9 +171,9 @@ default void beginShutdown() {}
     CompletableFuture<Void> shutdown(int timeoutMs);
 
     /**
-     * Resign the leadership. The leader will give up its leadership in the current epoch,
-     * and a new election will be held. Note that nothing prevents this leader from getting
-     * reelected.
+     * Resign the leadership. The leader will give up its leadership in the passed epoch
+     * (if it matches the current epoch), and a new election will be held. Note that nothing
+     * prevents this node from being reelected as the leader.

Review comment:
       Let's add a comment that the listener can learn about the "success" of this operation through `handleLeaderChange`

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1914,8 +1917,7 @@ private long pollLeader(long currentTimeMs) {
         LeaderState<T> state = quorum.leaderStateOrThrow();
         maybeFireLeaderChange(state);
 
-        GracefulShutdown shutdown = this.shutdown.get();
-        if (shutdown != null) {
+        if (shutdown.get() != null || resignedEpoch.get() == state.epoch()) {

Review comment:
       Adding this comment here but it applies to `pollUnattached` and `pollFollower`. This is a little unrelated but it looks like in the observer case we never compare the shutdown variable.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -2250,7 +2252,34 @@ private Long append(int epoch, List<T> records, boolean isAtomic) {
 
     @Override
     public void resign(int epoch) {
-        throw new UnsupportedOperationException();
+        if (epoch < 0) {

Review comment:
       `0` is also an invalid epoch, right? Or does the raft client send `handleLeaderChange(LeaderAndEpoch(OptionalInt.empty(), 0))` to the listener?
   
   I still think it is fair to say that the raft client will never send `handleLeaderChange(LeaderAndEpoch(OptionalInt.of(...), 0))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org