You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/04/05 16:29:12 UTC

[kafka] branch trunk updated: KAFKA-12539; Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity (#10393)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f47a56  KAFKA-12539; Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity (#10393)
4f47a56 is described below

commit 4f47a565e29539d1c252c36c59f5f24d105cec4b
Author: dengziming <sw...@163.com>
AuthorDate: Tue Apr 6 00:27:50 2021 +0800

    KAFKA-12539; Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity (#10393)
    
    1. Add `canGrantVote` to `EpochState`
    2. Move the if-else in `KafkaRaftCllient.handleVoteRequest` to `EpochState`
    3. Add unit tests for `canGrantVote`
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../java/org/apache/kafka/raft/CandidateState.java | 16 ++++-
 .../java/org/apache/kafka/raft/EpochState.java     | 11 ++++
 .../java/org/apache/kafka/raft/FollowerState.java  | 15 ++++-
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 40 ++----------
 .../java/org/apache/kafka/raft/LeaderState.java    |  7 ++
 .../java/org/apache/kafka/raft/QuorumState.java    | 37 +++++++----
 .../java/org/apache/kafka/raft/ResignedState.java  | 14 +++-
 .../org/apache/kafka/raft/UnattachedState.java     | 16 ++++-
 .../java/org/apache/kafka/raft/VotedState.java     | 18 ++++-
 .../org/apache/kafka/raft/CandidateStateTest.java  | 76 +++++++++++++++-------
 .../org/apache/kafka/raft/FollowerStateTest.java   | 58 ++++++++++-------
 .../org/apache/kafka/raft/LeaderStateTest.java     | 13 ++++
 .../org/apache/kafka/raft/ResignedStateTest.java   | 45 ++++++++++---
 .../org/apache/kafka/raft/UnattachedStateTest.java | 43 +++++++++---
 .../java/org/apache/kafka/raft/VotedStateTest.java | 44 ++++++++++---
 16 files changed, 330 insertions(+), 125 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0496d2f..5473b03 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -68,7 +68,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
 
     <suppress checks="JavaNCSS"
               files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
diff --git a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
index 08f6906..001fc19 100644
--- a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -34,6 +36,7 @@ public class CandidateState implements EpochState {
     private final int electionTimeoutMs;
     private final Timer electionTimer;
     private final Timer backoffTimer;
+    private final Logger log;
 
     /**
      * The life time of a candidate state is the following:
@@ -52,7 +55,8 @@ public class CandidateState implements EpochState {
         Set<Integer> voters,
         Optional<LogOffsetMetadata> highWatermark,
         int retries,
-        int electionTimeoutMs
+        int electionTimeoutMs,
+        LogContext logContext
     ) {
         this.localId = localId;
         this.epoch = epoch;
@@ -62,6 +66,7 @@ public class CandidateState implements EpochState {
         this.electionTimeoutMs = electionTimeoutMs;
         this.electionTimer = time.timer(electionTimeoutMs);
         this.backoffTimer = time.timer(0);
+        this.log = logContext.logger(CandidateState.class);
 
         for (Integer voterId : voters) {
             voteStates.put(voterId, State.UNRECORDED);
@@ -236,6 +241,15 @@ public class CandidateState implements EpochState {
     }
 
     @Override
+    public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+        // Still reject vote request even candidateId = localId, Although the candidate votes for
+        // itself, this vote is implicit and not "granted".
+        log.debug("Rejecting vote request from candidate {} since we are already candidate in epoch {}",
+            candidateId, epoch);
+        return false;
+    }
+
+    @Override
     public String toString() {
         return "Candidate(" +
             "localId=" + localId +
diff --git a/raft/src/main/java/org/apache/kafka/raft/EpochState.java b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
index b32a200..89e8f0a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
@@ -26,6 +26,17 @@ public interface EpochState extends Closeable {
     }
 
     /**
+     * Decide whether to grant a vote to a candidate, it is the responsibility of the caller to invoke
+     * {@link QuorumState##transitionToVoted(int, int)} if vote is granted.
+     *
+     * @param candidateId The ID of the voter who attempt to become leader
+     * @param isLogUpToDate Whether the candidate’s log is at least as up-to-date as receiver’s log, it
+     *                      is the responsibility of the caller to compare the log in advance
+     * @return true If grant vote.
+     */
+    boolean canGrantVote(int candidateId, boolean isLogUpToDate);
+
+    /**
      * Get the current election state, which is guaranteed to be immutable.
      */
     ElectionState election();
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index e1ef7aa..8bfad3a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -39,13 +41,16 @@ public class FollowerState implements EpochState {
      */
     private Optional<RawSnapshotWriter> fetchingSnapshot;
 
+    private final Logger log;
+
     public FollowerState(
         Time time,
         int epoch,
         int leaderId,
         Set<Integer> voters,
         Optional<LogOffsetMetadata> highWatermark,
-        int fetchTimeoutMs
+        int fetchTimeoutMs,
+        LogContext logContext
     ) {
         this.fetchTimeoutMs = fetchTimeoutMs;
         this.epoch = epoch;
@@ -54,6 +59,7 @@ public class FollowerState implements EpochState {
         this.fetchTimer = time.timer(fetchTimeoutMs);
         this.highWatermark = highWatermark;
         this.fetchingSnapshot = Optional.empty();
+        this.log = logContext.logger(FollowerState.class);
     }
 
     @Override
@@ -140,6 +146,13 @@ public class FollowerState implements EpochState {
     }
 
     @Override
+    public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+        log.debug("Rejecting vote request from candidate {} since we already have a leader {} in epoch {}",
+                candidateId, leaderId(), epoch);
+        return false;
+    }
+
+    @Override
     public String toString() {
         return "FollowerState(" +
             "fetchTimeoutMs=" + fetchTimeoutMs +
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 2cde196..a3dbbdd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -592,44 +592,14 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             transitionToUnattached(candidateEpoch);
         }
 
-        final boolean voteGranted;
-        if (quorum.isLeader()) {
-            logger.debug("Rejecting vote request {} with epoch {} since we are already leader on that epoch",
-                    request, candidateEpoch);
-            voteGranted = false;
-        } else if (quorum.isCandidate()) {
-            logger.debug("Rejecting vote request {} with epoch {} since we are already candidate on that epoch",
-                    request, candidateEpoch);
-            voteGranted = false;
-        } else if (quorum.isResigned()) {
-            logger.debug("Rejecting vote request {} with epoch {} since we have resigned as candidate/leader in this epoch",
-                request, candidateEpoch);
-            voteGranted = false;
-        } else if (quorum.isFollower()) {
-            FollowerState state = quorum.followerStateOrThrow();
-            logger.debug("Rejecting vote request {} with epoch {} since we already have a leader {} on that epoch",
-                request, candidateEpoch, state.leaderId());
-            voteGranted = false;
-        } else if (quorum.isVoted()) {
-            VotedState state = quorum.votedStateOrThrow();
-            voteGranted = state.votedId() == candidateId;
+        OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
+        boolean voteGranted = quorum.canGrantVote(candidateId, lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0);
 
-            if (!voteGranted) {
-                logger.debug("Rejecting vote request {} with epoch {} since we already have voted for " +
-                    "another candidate {} on that epoch", request, candidateEpoch, state.votedId());
-            }
-        } else if (quorum.isUnattached()) {
-            OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
-            voteGranted = lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0;
-
-            if (voteGranted) {
-                transitionToVoted(candidateId, candidateEpoch);
-            }
-        } else {
-            throw new IllegalStateException("Unexpected quorum state " + quorum);
+        if (voteGranted && quorum.isUnattached()) {
+            transitionToVoted(candidateId, candidateEpoch);
         }
 
-        logger.info("Vote request {} is {}", request, voteGranted ? "granted" : "rejected");
+        logger.info("Vote request {} with epoch {} is {}", request, candidateEpoch, voteGranted ? "granted" : "rejected");
         return buildVoteResponse(Errors.NONE, voteGranted);
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index d939aac..95c5628 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -305,6 +305,13 @@ public class LeaderState implements EpochState {
     }
 
     @Override
+    public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+        log.debug("Rejecting vote request from candidate {} since we are already leader in epoch {}",
+            candidateId, epoch);
+        return false;
+    }
+
+    @Override
     public String toString() {
         return "Leader(" +
             "localId=" + localId +
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 6216d0f..86f8c18 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -146,7 +146,8 @@ public class QuorumState {
                 logEndOffsetAndEpoch.epoch,
                 voters,
                 Optional.empty(),
-                randomElectionTimeoutMs()
+                randomElectionTimeoutMs(),
+                logContext
             );
         } else if (localId.isPresent() && election.isLeader(localId.getAsInt())) {
             // If we were previously a leader, then we will start out as resigned
@@ -161,7 +162,8 @@ public class QuorumState {
                 election.epoch,
                 voters,
                 randomElectionTimeoutMs(),
-                Collections.emptyList()
+                Collections.emptyList(),
+                logContext
             );
         } else if (localId.isPresent() && election.isVotedCandidate(localId.getAsInt())) {
             initialState = new CandidateState(
@@ -171,7 +173,8 @@ public class QuorumState {
                 voters,
                 Optional.empty(),
                 1,
-                randomElectionTimeoutMs()
+                randomElectionTimeoutMs(),
+                logContext
             );
         } else if (election.hasVoted()) {
             initialState = new VotedState(
@@ -180,7 +183,8 @@ public class QuorumState {
                 election.votedId(),
                 voters,
                 Optional.empty(),
-                randomElectionTimeoutMs()
+                randomElectionTimeoutMs(),
+                logContext
             );
         } else if (election.hasLeader()) {
             initialState = new FollowerState(
@@ -189,7 +193,8 @@ public class QuorumState {
                 election.leaderId(),
                 voters,
                 Optional.empty(),
-                fetchTimeoutMs
+                fetchTimeoutMs,
+                logContext
             );
         } else {
             initialState = new UnattachedState(
@@ -197,7 +202,8 @@ public class QuorumState {
                 election.epoch,
                 voters,
                 Optional.empty(),
-                randomElectionTimeoutMs()
+                randomElectionTimeoutMs(),
+                logContext
             );
         }
 
@@ -271,7 +277,8 @@ public class QuorumState {
             epoch,
             voters,
             randomElectionTimeoutMs(),
-            preferredSuccessors
+            preferredSuccessors,
+            logContext
         );
         log.info("Completed transition to {}", state);
     }
@@ -305,7 +312,8 @@ public class QuorumState {
             epoch,
             voters,
             state.highWatermark(),
-            electionTimeoutMs
+            electionTimeoutMs,
+            logContext
         ));
     }
 
@@ -348,7 +356,8 @@ public class QuorumState {
             candidateId,
             voters,
             state.highWatermark(),
-            randomElectionTimeoutMs()
+            randomElectionTimeoutMs(),
+            logContext
         ));
     }
 
@@ -383,7 +392,8 @@ public class QuorumState {
             leaderId,
             voters,
             state.highWatermark(),
-            fetchTimeoutMs
+            fetchTimeoutMs,
+            logContext
         ));
     }
 
@@ -407,7 +417,8 @@ public class QuorumState {
             voters,
             state.highWatermark(),
             retries,
-            electionTimeoutMs
+            electionTimeoutMs,
+            logContext
         ));
     }
 
@@ -460,6 +471,10 @@ public class QuorumState {
         return electionTimeoutMs + random.nextInt(electionTimeoutMs);
     }
 
+    public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+        return state.canGrantVote(candidateId, isLogUpToDate);
+    }
+
     public FollowerState followerStateOrThrow() {
         if (isFollower())
             return (FollowerState) state;
diff --git a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
index c1608aa..899823a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
 
 import java.util.HashSet;
 import java.util.List;
@@ -45,6 +47,7 @@ public class ResignedState implements EpochState {
     private final Set<Integer> unackedVoters;
     private final Timer electionTimer;
     private final List<Integer> preferredSuccessors;
+    private final Logger log;
 
     public ResignedState(
         Time time,
@@ -52,7 +55,8 @@ public class ResignedState implements EpochState {
         int epoch,
         Set<Integer> voters,
         long electionTimeoutMs,
-        List<Integer> preferredSuccessors
+        List<Integer> preferredSuccessors,
+        LogContext logContext
     ) {
         this.localId = localId;
         this.epoch = epoch;
@@ -62,6 +66,7 @@ public class ResignedState implements EpochState {
         this.electionTimeoutMs = electionTimeoutMs;
         this.electionTimer = time.timer(electionTimeoutMs);
         this.preferredSuccessors = preferredSuccessors;
+        this.log = logContext.logger(ResignedState.class);
     }
 
     @Override
@@ -126,6 +131,13 @@ public class ResignedState implements EpochState {
     }
 
     @Override
+    public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+        log.debug("Rejecting vote request from candidate {} since we have resigned as candidate/leader in epoch {}",
+            candidateId, epoch);
+        return false;
+    }
+
+    @Override
     public String name() {
         return "Resigned";
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index 62b82e2..4dc5fc7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 import java.util.OptionalInt;
@@ -34,19 +36,22 @@ public class UnattachedState implements EpochState {
     private final long electionTimeoutMs;
     private final Timer electionTimer;
     private final Optional<LogOffsetMetadata> highWatermark;
+    private final Logger log;
 
     public UnattachedState(
         Time time,
         int epoch,
         Set<Integer> voters,
         Optional<LogOffsetMetadata> highWatermark,
-        long electionTimeoutMs
+        long electionTimeoutMs,
+        LogContext logContext
     ) {
         this.epoch = epoch;
         this.voters = voters;
         this.highWatermark = highWatermark;
         this.electionTimeoutMs = electionTimeoutMs;
         this.electionTimer = time.timer(electionTimeoutMs);
+        this.log = logContext.logger(UnattachedState.class);
     }
 
     @Override
@@ -89,6 +94,15 @@ public class UnattachedState implements EpochState {
     }
 
     @Override
+    public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+        if (!isLogUpToDate) {
+            log.debug("Rejecting vote request from candidate {} since candidate epoch/offset is not up to date with us",
+                candidateId);
+        }
+        return isLogUpToDate;
+    }
+
+    @Override
     public String toString() {
         return "Unattached(" +
             "epoch=" + epoch +
diff --git a/raft/src/main/java/org/apache/kafka/raft/VotedState.java b/raft/src/main/java/org/apache/kafka/raft/VotedState.java
index 4138176..2ae5026 100644
--- a/raft/src/main/java/org/apache/kafka/raft/VotedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/VotedState.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 import java.util.OptionalInt;
@@ -36,6 +38,7 @@ public class VotedState implements EpochState {
     private final int electionTimeoutMs;
     private final Timer electionTimer;
     private final Optional<LogOffsetMetadata> highWatermark;
+    private final Logger log;
 
     public VotedState(
         Time time,
@@ -43,7 +46,8 @@ public class VotedState implements EpochState {
         int votedId,
         Set<Integer> voters,
         Optional<LogOffsetMetadata> highWatermark,
-        int electionTimeoutMs
+        int electionTimeoutMs,
+        LogContext logContext
     ) {
         this.epoch = epoch;
         this.votedId = votedId;
@@ -51,6 +55,7 @@ public class VotedState implements EpochState {
         this.highWatermark = highWatermark;
         this.electionTimeoutMs = electionTimeoutMs;
         this.electionTimer = time.timer(electionTimeoutMs);
+        this.log = logContext.logger(VotedState.class);
     }
 
     @Override
@@ -93,6 +98,17 @@ public class VotedState implements EpochState {
     }
 
     @Override
+    public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+        if (votedId() == candidateId) {
+            return true;
+        }
+
+        log.debug("Rejecting vote request from candidate {} since we already have voted for " +
+            "another candidate {} in epoch {}", candidateId, votedId(), epoch);
+        return false;
+    }
+
+    @Override
     public Optional<LogOffsetMetadata> highWatermark() {
         return highWatermark;
     }
diff --git a/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java b/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
index 8c4633b..71a2375 100644
--- a/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collections;
 import java.util.Optional;
+import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -33,11 +37,27 @@ public class CandidateStateTest {
     private final int epoch = 5;
     private final MockTime time = new MockTime();
     private final int electionTimeoutMs = 5000;
+    private final LogContext logContext = new LogContext();
+
+    private CandidateState newCandidateState(
+            Set<Integer> voters,
+            Optional<LogOffsetMetadata> highWatermark
+    ) {
+        return new CandidateState(
+                time,
+                localId,
+                epoch,
+                voters,
+                highWatermark,
+                0,
+                electionTimeoutMs,
+                logContext
+        );
+    }
 
     @Test
     public void testSingleNodeQuorum() {
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Collections.singleton(localId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(Collections.singleton(localId), Optional.empty());
         assertTrue(state.isVoteGranted());
         assertFalse(state.isVoteRejected());
         assertEquals(Collections.emptySet(), state.unrecordedVoters());
@@ -46,8 +66,7 @@ public class CandidateStateTest {
     @Test
     public void testTwoNodeQuorumVoteRejected() {
         int otherNodeId = 1;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(Utils.mkSet(localId, otherNodeId), Optional.empty());
         assertFalse(state.isVoteGranted());
         assertFalse(state.isVoteRejected());
         assertEquals(Collections.singleton(otherNodeId), state.unrecordedVoters());
@@ -59,8 +78,8 @@ public class CandidateStateTest {
     @Test
     public void testTwoNodeQuorumVoteGranted() {
         int otherNodeId = 1;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Utils.mkSet(localId, otherNodeId), Optional.empty());
         assertFalse(state.isVoteGranted());
         assertFalse(state.isVoteRejected());
         assertEquals(Collections.singleton(otherNodeId), state.unrecordedVoters());
@@ -74,8 +93,8 @@ public class CandidateStateTest {
     public void testThreeNodeQuorumVoteGranted() {
         int node1 = 1;
         int node2 = 2;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, node1, node2), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Utils.mkSet(localId, node1, node2), Optional.empty());
         assertFalse(state.isVoteGranted());
         assertFalse(state.isVoteRejected());
         assertEquals(Utils.mkSet(node1, node2), state.unrecordedVoters());
@@ -93,8 +112,8 @@ public class CandidateStateTest {
     public void testThreeNodeQuorumVoteRejected() {
         int node1 = 1;
         int node2 = 2;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, node1, node2), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Utils.mkSet(localId, node1, node2), Optional.empty());
         assertFalse(state.isVoteGranted());
         assertFalse(state.isVoteRejected());
         assertEquals(Utils.mkSet(node1, node2), state.unrecordedVoters());
@@ -111,16 +130,16 @@ public class CandidateStateTest {
     @Test
     public void testCannotRejectVoteFromLocalId() {
         int otherNodeId = 1;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Utils.mkSet(localId, otherNodeId), Optional.empty());
         assertThrows(IllegalArgumentException.class, () -> state.recordRejectedVote(localId));
     }
 
     @Test
     public void testCannotChangeVoteGrantedToRejected() {
         int otherNodeId = 1;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Utils.mkSet(localId, otherNodeId), Optional.empty());
         assertTrue(state.recordGrantedVote(otherNodeId));
         assertThrows(IllegalArgumentException.class, () -> state.recordRejectedVote(otherNodeId));
         assertTrue(state.isVoteGranted());
@@ -129,8 +148,8 @@ public class CandidateStateTest {
     @Test
     public void testCannotChangeVoteRejectedToGranted() {
         int otherNodeId = 1;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Utils.mkSet(localId, otherNodeId), Optional.empty());
         assertTrue(state.recordRejectedVote(otherNodeId));
         assertThrows(IllegalArgumentException.class, () -> state.recordGrantedVote(otherNodeId));
         assertTrue(state.isVoteRejected());
@@ -139,8 +158,8 @@ public class CandidateStateTest {
     @Test
     public void testCannotGrantOrRejectNonVoters() {
         int nonVoterId = 1;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Collections.singleton(localId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Collections.singleton(localId), Optional.empty());
         assertThrows(IllegalArgumentException.class, () -> state.recordGrantedVote(nonVoterId));
         assertThrows(IllegalArgumentException.class, () -> state.recordRejectedVote(nonVoterId));
     }
@@ -148,8 +167,8 @@ public class CandidateStateTest {
     @Test
     public void testIdempotentGrant() {
         int otherNodeId = 1;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Utils.mkSet(localId, otherNodeId), Optional.empty());
         assertTrue(state.recordGrantedVote(otherNodeId));
         assertFalse(state.recordGrantedVote(otherNodeId));
     }
@@ -157,10 +176,23 @@ public class CandidateStateTest {
     @Test
     public void testIdempotentReject() {
         int otherNodeId = 1;
-        CandidateState state = new CandidateState(time, localId, epoch,
-            Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+        CandidateState state = newCandidateState(
+            Utils.mkSet(localId, otherNodeId), Optional.empty());
         assertTrue(state.recordRejectedVote(otherNodeId));
         assertFalse(state.recordRejectedVote(otherNodeId));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testGrantVote(boolean isLogUpToDate) {
+        CandidateState state = newCandidateState(
+            Utils.mkSet(1, 2, 3),
+            Optional.empty()
+        );
+
+        assertFalse(state.canGrantVote(1, isLogUpToDate));
+        assertFalse(state.canGrantVote(2, isLogUpToDate));
+        assertFalse(state.canGrantVote(3, isLogUpToDate));
+    }
+
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java b/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
index 2965f05..42c6bc9 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -30,21 +34,29 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class FollowerStateTest {
     private final MockTime time = new MockTime();
+    private final LogContext logContext = new LogContext();
+    private final int epoch = 5;
+    private final int fetchTimeoutMs = 15000;
+    int leaderId = 3;
 
-    @Test
-    public void testFetchTimeoutExpiration() {
-        int epoch = 5;
-        int leaderId = 3;
-        int fetchTimeoutMs = 15000;
-
-        FollowerState state = new FollowerState(
+    private FollowerState newFollowerState(
+        Set<Integer> voters,
+        Optional<LogOffsetMetadata> highWatermark
+    ) {
+        return new FollowerState(
             time,
             epoch,
             leaderId,
-            Utils.mkSet(1, 2, 3),
-            Optional.empty(),
-            fetchTimeoutMs
+            voters,
+            highWatermark,
+            fetchTimeoutMs,
+            logContext
         );
+    }
+
+    @Test
+    public void testFetchTimeoutExpiration() {
+        FollowerState state = newFollowerState(Utils.mkSet(1, 2, 3), Optional.empty());
 
         assertFalse(state.hasFetchTimeoutExpired(time.milliseconds()));
         assertEquals(fetchTimeoutMs, state.remainingFetchTimeMs(time.milliseconds()));
@@ -60,18 +72,7 @@ public class FollowerStateTest {
 
     @Test
     public void testMonotonicHighWatermark() {
-        int epoch = 5;
-        int leaderId = 3;
-        int fetchTimeoutMs = 15000;
-
-        FollowerState state = new FollowerState(
-            time,
-            epoch,
-            leaderId,
-            Utils.mkSet(1, 2, 3),
-            Optional.empty(),
-            fetchTimeoutMs
-        );
+        FollowerState state = newFollowerState(Utils.mkSet(1, 2, 3), Optional.empty());
 
         OptionalLong highWatermark = OptionalLong.of(15L);
         state.updateHighWatermark(highWatermark);
@@ -81,4 +82,17 @@ public class FollowerStateTest {
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testGrantVote(boolean isLogUpToDate) {
+        FollowerState state = newFollowerState(
+            Utils.mkSet(1, 2, 3),
+            Optional.empty()
+        );
+
+        assertFalse(state.canGrantVote(1, isLogUpToDate));
+        assertFalse(state.canGrantVote(2, isLogUpToDate));
+        assertFalse(state.canGrantVote(3, isLogUpToDate));
+    }
+
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index cdadf6e..c7490b5 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -18,7 +18,10 @@ package org.apache.kafka.raft;
 
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -254,6 +257,16 @@ public class LeaderStateTest {
         assertEquals(emptySet(), state.getObserverStates(time.milliseconds()).keySet());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testGrantVote(boolean isLogUpToDate) {
+        LeaderState state = newLeaderState(Utils.mkSet(1, 2, 3), 1);
+
+        assertFalse(state.canGrantVote(1, isLogUpToDate));
+        assertFalse(state.canGrantVote(2, isLogUpToDate));
+        assertFalse(state.canGrantVote(3, isLogUpToDate));
+    }
+
     private static class MockOffsetMetadata implements OffsetMetadata {
         private final String value;
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
index 20e2e84..770297b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
@@ -16,11 +16,15 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,23 +34,32 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 class ResignedStateTest {
 
     private final MockTime time = new MockTime();
+    private final LogContext logContext = new LogContext();
+    int electionTimeoutMs = 5000;
+    int localId = 0;
+    int epoch = 5;
 
-    @Test
-    public void testResignedState() {
-        int electionTimeoutMs = 5000;
-        int localId = 0;
-        int remoteId = 1;
-        int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(localId, remoteId);
-
-        ResignedState state = new ResignedState(
+    private ResignedState newResignedState(
+        Set<Integer> voters,
+        List<Integer> preferredSuccessors
+    ) {
+        return new ResignedState(
             time,
             localId,
             epoch,
             voters,
             electionTimeoutMs,
-            Collections.emptyList()
+            preferredSuccessors,
+            logContext
         );
+    }
+
+    @Test
+    public void testResignedState() {
+        int remoteId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId);
+
+        ResignedState state = newResignedState(voters, Collections.emptyList());
 
         assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), state.election());
         assertEquals(epoch, state.epoch());
@@ -65,4 +78,16 @@ class ResignedStateTest {
         assertTrue(state.hasElectionTimeoutExpired(time.milliseconds()));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testGrantVote(boolean isLogUpToDate) {
+        ResignedState state = newResignedState(
+            Utils.mkSet(1, 2, 3),
+            Collections.emptyList()
+        );
+
+        assertFalse(state.canGrantVote(1, isLogUpToDate));
+        assertFalse(state.canGrantVote(2, isLogUpToDate));
+        assertFalse(state.canGrantVote(3, isLogUpToDate));
+    }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
index bd00baa..96f2a52 100644
--- a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
@@ -16,9 +16,12 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Optional;
 import java.util.Set;
@@ -30,19 +33,28 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class UnattachedStateTest {
 
     private final MockTime time = new MockTime();
+    private final LogContext logContext = new LogContext();
+    private final int epoch = 5;
+    private final int electionTimeoutMs = 10000;
+
+    private UnattachedState newUnattachedState(Set<Integer> voters, Optional<LogOffsetMetadata> highWatermark) {
+        return new UnattachedState(
+            time,
+            epoch,
+            voters,
+            highWatermark,
+            electionTimeoutMs,
+            logContext
+        );
+    }
 
     @Test
     public void testElectionTimeout() {
         Set<Integer> voters = Utils.mkSet(1, 2, 3);
-        int epoch = 1;
-        int electionTimeoutMs = 10000;
-
-        UnattachedState state = new UnattachedState(
-                time,
-                epoch,
-                voters,
-                Optional.empty(),
-                electionTimeoutMs
+
+        UnattachedState state = newUnattachedState(
+            voters,
+            Optional.empty()
         );
 
         assertEquals(epoch, state.epoch());
@@ -59,4 +71,17 @@ public class UnattachedStateTest {
         assertEquals(0, state.remainingElectionTimeMs(time.milliseconds()));
         assertTrue(state.hasElectionTimeoutExpired(time.milliseconds()));
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testGrantVote(boolean isLogUpToDate) {
+        UnattachedState state = newUnattachedState(
+                Utils.mkSet(1, 2, 3),
+                Optional.empty()
+        );
+
+        assertEquals(isLogUpToDate, state.canGrantVote(1, isLogUpToDate));
+        assertEquals(isLogUpToDate, state.canGrantVote(2, isLogUpToDate));
+        assertEquals(isLogUpToDate, state.canGrantVote(3, isLogUpToDate));
+    }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java
index 843855b..317b80f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java
@@ -16,9 +16,12 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Optional;
 import java.util.Set;
@@ -30,22 +33,31 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 class VotedStateTest {
 
     private final MockTime time = new MockTime();
+    private final LogContext logContext = new LogContext();
+    private final int epoch = 5;
+    private final int votedId = 1;
+    private final int electionTimeoutMs = 10000;
 
-    @Test
-    public void testElectionTimeout() {
-        Set<Integer> voters = Utils.mkSet(1, 2, 3);
-        int epoch = 5;
-        int votedId = 1;
-        int electionTimeoutMs = 10000;
-
-        VotedState state = new VotedState(
+    private VotedState newVotedState(
+        Set<Integer> voters,
+        Optional<LogOffsetMetadata> highWatermark
+    ) {
+        return new VotedState(
             time,
             epoch,
             votedId,
             voters,
-            Optional.empty(),
-            electionTimeoutMs
+            highWatermark,
+            electionTimeoutMs,
+            logContext
         );
+    }
+
+    @Test
+    public void testElectionTimeout() {
+        Set<Integer> voters = Utils.mkSet(1, 2, 3);
+
+        VotedState state = newVotedState(voters, Optional.empty());
 
         assertEquals(epoch, state.epoch());
         assertEquals(votedId, state.votedId());
@@ -62,4 +74,16 @@ class VotedStateTest {
         assertTrue(state.hasElectionTimeoutExpired(time.milliseconds()));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testGrantVote(boolean isLogUpToDate) {
+        VotedState state = newVotedState(
+            Utils.mkSet(1, 2, 3),
+            Optional.empty()
+        );
+
+        assertTrue(state.canGrantVote(1, isLogUpToDate));
+        assertFalse(state.canGrantVote(2, isLogUpToDate));
+        assertFalse(state.canGrantVote(3, isLogUpToDate));
+    }
 }