You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/04/10 02:17:09 UTC

[kafka] branch trunk updated: KAFKA-12607; Test case for resigned state vote granting (#10510)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db688b1  KAFKA-12607; Test case for resigned state vote granting (#10510)
db688b1 is described below

commit db688b1a5e5904eed5849fbab4940585e5a3d646
Author: dengziming <sw...@163.com>
AuthorDate: Sat Apr 10 10:15:11 2021 +0800

    KAFKA-12607; Test case for resigned state vote granting (#10510)
    
    This patch adds unit tests to verify vote behavior when in the "resigned" state.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/raft/CandidateState.java |   2 +-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 112 +++++++++++++++++++++
 2 files changed, 113 insertions(+), 1 deletion(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
index 001fc19..e9e1e0e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
@@ -251,7 +251,7 @@ public class CandidateState implements EpochState {
 
     @Override
     public String toString() {
-        return "Candidate(" +
+        return "CandidateState(" +
             "localId=" + localId +
             ", epoch=" + epoch +
             ", retries=" + retries +
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 54c38dd..55d4e16 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -138,6 +138,118 @@ public class KafkaRaftClientTest {
     }
 
     @Test
+    public void testGrantVotesFromHigherEpochAfterResigningLeadership() throws Exception {
+        int localId = 0;
+        int remoteId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId);
+        int epoch = 2;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+                .updateRandom(random -> {
+                    Mockito.doReturn(0).when(random).nextInt(DEFAULT_ELECTION_TIMEOUT_MS);
+                })
+                .withElectedLeader(epoch, localId)
+                .build();
+
+        // Resign from leader, will restart in resigned state
+        assertTrue(context.client.quorum().isResigned());
+        assertEquals(0L, context.log.endOffset().offset);
+        context.assertElectedLeader(epoch, localId);
+
+        // Send vote request with higher epoch
+        context.deliverRequest(context.voteRequest(epoch + 1, remoteId,
+                context.log.lastFetchedEpoch(), context.log.endOffset().offset));
+        context.client.poll();
+
+        // We will first transition to unattached and then grant vote and then transition to voted
+        assertTrue(context.client.quorum().isVoted());
+        context.assertVotedCandidate(epoch + 1, remoteId);
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true);
+    }
+
+    @Test
+    public void testGrantVotesFromHigherEpochAfterResigningCandidacy() throws Exception {
+        int localId = 0;
+        int remoteId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId);
+        int epoch = 2;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+                .updateRandom(random -> {
+                    Mockito.doReturn(0).when(random).nextInt(DEFAULT_ELECTION_TIMEOUT_MS);
+                })
+                .withVotedCandidate(epoch, localId)
+                .build();
+
+        // Resign from candidate, will restart in candidate state
+        assertTrue(context.client.quorum().isCandidate());
+        assertEquals(0L, context.log.endOffset().offset);
+        context.assertVotedCandidate(epoch, localId);
+
+        // Send vote request with higher epoch
+        context.deliverRequest(context.voteRequest(epoch + 1, remoteId,
+                context.log.lastFetchedEpoch(), context.log.endOffset().offset));
+        context.client.poll();
+
+        // We will first transition to unattached and then grant vote and then transition to voted
+        assertTrue(context.client.quorum().isVoted());
+        context.assertVotedCandidate(epoch + 1, remoteId);
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true);
+    }
+
+    @Test
+    public void testGrantVotesWhenShuttingDown() throws Exception {
+        int localId = 0;
+        int remoteId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId);
+        int epoch = 2;
+
+        RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // Beginning shutdown
+        context.client.shutdown(1000);
+        assertTrue(context.client.isShuttingDown());
+
+        // Send vote request with higher epoch
+        context.deliverRequest(context.voteRequest(epoch + 1, remoteId,
+                context.log.lastFetchedEpoch(), context.log.endOffset().offset));
+        context.client.poll();
+
+        // We will first transition to unattached and then grant vote and then transition to voted
+        assertTrue(context.client.quorum().isVoted());
+        context.assertVotedCandidate(epoch + 1, remoteId);
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true);
+    }
+
+    @Test
+    public void testInitializeAsResignedAndBecomeCandidate() throws Exception {
+        int localId = 0;
+        int remoteId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId);
+        int epoch = 2;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+                .updateRandom(random -> {
+                    Mockito.doReturn(0).when(random).nextInt(DEFAULT_ELECTION_TIMEOUT_MS);
+                })
+                .withElectedLeader(epoch, localId)
+                .build();
+
+        // Resign from leader, will restart in resigned state
+        assertTrue(context.client.quorum().isResigned());
+        assertEquals(0L, context.log.endOffset().offset);
+        context.assertElectedLeader(epoch, localId);
+
+        // Election timeout
+        context.time.sleep(context.electionTimeoutMs());
+        context.client.poll();
+
+        // Become candidate in a new epoch
+        assertTrue(context.client.quorum().isCandidate());
+        context.assertVotedCandidate(epoch + 1, localId);
+    }
+
+    @Test
     public void testInitializeAsResignedLeaderFromStateStore() throws Exception {
         int localId = 0;
         Set<Integer> voters = Utils.mkSet(localId, 1);