You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/04/05 16:29:12 UTC
[kafka] branch trunk updated: KAFKA-12539;
Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic
complexity (#10393)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4f47a56 KAFKA-12539; Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity (#10393)
4f47a56 is described below
commit 4f47a565e29539d1c252c36c59f5f24d105cec4b
Author: dengziming <sw...@163.com>
AuthorDate: Tue Apr 6 00:27:50 2021 +0800
KAFKA-12539; Refactor KafkaRaftCllient handleVoteRequest to reduce cyclomatic complexity (#10393)
1. Add `canGrantVote` to `EpochState`
2. Move the if-else in `KafkaRaftCllient.handleVoteRequest` to `EpochState`
3. Add unit tests for `canGrantVote`
Reviewers: Jason Gustafson <ja...@confluent.io>
---
checkstyle/suppressions.xml | 2 +-
.../java/org/apache/kafka/raft/CandidateState.java | 16 ++++-
.../java/org/apache/kafka/raft/EpochState.java | 11 ++++
.../java/org/apache/kafka/raft/FollowerState.java | 15 ++++-
.../org/apache/kafka/raft/KafkaRaftClient.java | 40 ++----------
.../java/org/apache/kafka/raft/LeaderState.java | 7 ++
.../java/org/apache/kafka/raft/QuorumState.java | 37 +++++++----
.../java/org/apache/kafka/raft/ResignedState.java | 14 +++-
.../org/apache/kafka/raft/UnattachedState.java | 16 ++++-
.../java/org/apache/kafka/raft/VotedState.java | 18 ++++-
.../org/apache/kafka/raft/CandidateStateTest.java | 76 +++++++++++++++-------
.../org/apache/kafka/raft/FollowerStateTest.java | 58 ++++++++++-------
.../org/apache/kafka/raft/LeaderStateTest.java | 13 ++++
.../org/apache/kafka/raft/ResignedStateTest.java | 45 ++++++++++---
.../org/apache/kafka/raft/UnattachedStateTest.java | 43 +++++++++---
.../java/org/apache/kafka/raft/VotedStateTest.java | 44 ++++++++++---
16 files changed, 330 insertions(+), 125 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0496d2f..5473b03 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -68,7 +68,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
- files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/>
+ files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
diff --git a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
index 08f6906..001fc19 100644
--- a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
import java.util.HashMap;
import java.util.Map;
@@ -34,6 +36,7 @@ public class CandidateState implements EpochState {
private final int electionTimeoutMs;
private final Timer electionTimer;
private final Timer backoffTimer;
+ private final Logger log;
/**
* The life time of a candidate state is the following:
@@ -52,7 +55,8 @@ public class CandidateState implements EpochState {
Set<Integer> voters,
Optional<LogOffsetMetadata> highWatermark,
int retries,
- int electionTimeoutMs
+ int electionTimeoutMs,
+ LogContext logContext
) {
this.localId = localId;
this.epoch = epoch;
@@ -62,6 +66,7 @@ public class CandidateState implements EpochState {
this.electionTimeoutMs = electionTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
this.backoffTimer = time.timer(0);
+ this.log = logContext.logger(CandidateState.class);
for (Integer voterId : voters) {
voteStates.put(voterId, State.UNRECORDED);
@@ -236,6 +241,15 @@ public class CandidateState implements EpochState {
}
@Override
+ public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+ // Still reject vote request even candidateId = localId, Although the candidate votes for
+ // itself, this vote is implicit and not "granted".
+ log.debug("Rejecting vote request from candidate {} since we are already candidate in epoch {}",
+ candidateId, epoch);
+ return false;
+ }
+
+ @Override
public String toString() {
return "Candidate(" +
"localId=" + localId +
diff --git a/raft/src/main/java/org/apache/kafka/raft/EpochState.java b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
index b32a200..89e8f0a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
@@ -26,6 +26,17 @@ public interface EpochState extends Closeable {
}
/**
+ * Decide whether to grant a vote to a candidate, it is the responsibility of the caller to invoke
+ * {@link QuorumState##transitionToVoted(int, int)} if vote is granted.
+ *
+ * @param candidateId The ID of the voter who attempt to become leader
+ * @param isLogUpToDate Whether the candidate’s log is at least as up-to-date as receiver’s log, it
+ * is the responsibility of the caller to compare the log in advance
+ * @return true If grant vote.
+ */
+ boolean canGrantVote(int candidateId, boolean isLogUpToDate);
+
+ /**
* Get the current election state, which is guaranteed to be immutable.
*/
ElectionState election();
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index e1ef7aa..8bfad3a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.slf4j.Logger;
import java.io.IOException;
import java.util.Optional;
@@ -39,13 +41,16 @@ public class FollowerState implements EpochState {
*/
private Optional<RawSnapshotWriter> fetchingSnapshot;
+ private final Logger log;
+
public FollowerState(
Time time,
int epoch,
int leaderId,
Set<Integer> voters,
Optional<LogOffsetMetadata> highWatermark,
- int fetchTimeoutMs
+ int fetchTimeoutMs,
+ LogContext logContext
) {
this.fetchTimeoutMs = fetchTimeoutMs;
this.epoch = epoch;
@@ -54,6 +59,7 @@ public class FollowerState implements EpochState {
this.fetchTimer = time.timer(fetchTimeoutMs);
this.highWatermark = highWatermark;
this.fetchingSnapshot = Optional.empty();
+ this.log = logContext.logger(FollowerState.class);
}
@Override
@@ -140,6 +146,13 @@ public class FollowerState implements EpochState {
}
@Override
+ public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+ log.debug("Rejecting vote request from candidate {} since we already have a leader {} in epoch {}",
+ candidateId, leaderId(), epoch);
+ return false;
+ }
+
+ @Override
public String toString() {
return "FollowerState(" +
"fetchTimeoutMs=" + fetchTimeoutMs +
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 2cde196..a3dbbdd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -592,44 +592,14 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
transitionToUnattached(candidateEpoch);
}
- final boolean voteGranted;
- if (quorum.isLeader()) {
- logger.debug("Rejecting vote request {} with epoch {} since we are already leader on that epoch",
- request, candidateEpoch);
- voteGranted = false;
- } else if (quorum.isCandidate()) {
- logger.debug("Rejecting vote request {} with epoch {} since we are already candidate on that epoch",
- request, candidateEpoch);
- voteGranted = false;
- } else if (quorum.isResigned()) {
- logger.debug("Rejecting vote request {} with epoch {} since we have resigned as candidate/leader in this epoch",
- request, candidateEpoch);
- voteGranted = false;
- } else if (quorum.isFollower()) {
- FollowerState state = quorum.followerStateOrThrow();
- logger.debug("Rejecting vote request {} with epoch {} since we already have a leader {} on that epoch",
- request, candidateEpoch, state.leaderId());
- voteGranted = false;
- } else if (quorum.isVoted()) {
- VotedState state = quorum.votedStateOrThrow();
- voteGranted = state.votedId() == candidateId;
+ OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
+ boolean voteGranted = quorum.canGrantVote(candidateId, lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0);
- if (!voteGranted) {
- logger.debug("Rejecting vote request {} with epoch {} since we already have voted for " +
- "another candidate {} on that epoch", request, candidateEpoch, state.votedId());
- }
- } else if (quorum.isUnattached()) {
- OffsetAndEpoch lastEpochEndOffsetAndEpoch = new OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
- voteGranted = lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0;
-
- if (voteGranted) {
- transitionToVoted(candidateId, candidateEpoch);
- }
- } else {
- throw new IllegalStateException("Unexpected quorum state " + quorum);
+ if (voteGranted && quorum.isUnattached()) {
+ transitionToVoted(candidateId, candidateEpoch);
}
- logger.info("Vote request {} is {}", request, voteGranted ? "granted" : "rejected");
+ logger.info("Vote request {} with epoch {} is {}", request, candidateEpoch, voteGranted ? "granted" : "rejected");
return buildVoteResponse(Errors.NONE, voteGranted);
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index d939aac..95c5628 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -305,6 +305,13 @@ public class LeaderState implements EpochState {
}
@Override
+ public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+ log.debug("Rejecting vote request from candidate {} since we are already leader in epoch {}",
+ candidateId, epoch);
+ return false;
+ }
+
+ @Override
public String toString() {
return "Leader(" +
"localId=" + localId +
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 6216d0f..86f8c18 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -146,7 +146,8 @@ public class QuorumState {
logEndOffsetAndEpoch.epoch,
voters,
Optional.empty(),
- randomElectionTimeoutMs()
+ randomElectionTimeoutMs(),
+ logContext
);
} else if (localId.isPresent() && election.isLeader(localId.getAsInt())) {
// If we were previously a leader, then we will start out as resigned
@@ -161,7 +162,8 @@ public class QuorumState {
election.epoch,
voters,
randomElectionTimeoutMs(),
- Collections.emptyList()
+ Collections.emptyList(),
+ logContext
);
} else if (localId.isPresent() && election.isVotedCandidate(localId.getAsInt())) {
initialState = new CandidateState(
@@ -171,7 +173,8 @@ public class QuorumState {
voters,
Optional.empty(),
1,
- randomElectionTimeoutMs()
+ randomElectionTimeoutMs(),
+ logContext
);
} else if (election.hasVoted()) {
initialState = new VotedState(
@@ -180,7 +183,8 @@ public class QuorumState {
election.votedId(),
voters,
Optional.empty(),
- randomElectionTimeoutMs()
+ randomElectionTimeoutMs(),
+ logContext
);
} else if (election.hasLeader()) {
initialState = new FollowerState(
@@ -189,7 +193,8 @@ public class QuorumState {
election.leaderId(),
voters,
Optional.empty(),
- fetchTimeoutMs
+ fetchTimeoutMs,
+ logContext
);
} else {
initialState = new UnattachedState(
@@ -197,7 +202,8 @@ public class QuorumState {
election.epoch,
voters,
Optional.empty(),
- randomElectionTimeoutMs()
+ randomElectionTimeoutMs(),
+ logContext
);
}
@@ -271,7 +277,8 @@ public class QuorumState {
epoch,
voters,
randomElectionTimeoutMs(),
- preferredSuccessors
+ preferredSuccessors,
+ logContext
);
log.info("Completed transition to {}", state);
}
@@ -305,7 +312,8 @@ public class QuorumState {
epoch,
voters,
state.highWatermark(),
- electionTimeoutMs
+ electionTimeoutMs,
+ logContext
));
}
@@ -348,7 +356,8 @@ public class QuorumState {
candidateId,
voters,
state.highWatermark(),
- randomElectionTimeoutMs()
+ randomElectionTimeoutMs(),
+ logContext
));
}
@@ -383,7 +392,8 @@ public class QuorumState {
leaderId,
voters,
state.highWatermark(),
- fetchTimeoutMs
+ fetchTimeoutMs,
+ logContext
));
}
@@ -407,7 +417,8 @@ public class QuorumState {
voters,
state.highWatermark(),
retries,
- electionTimeoutMs
+ electionTimeoutMs,
+ logContext
));
}
@@ -460,6 +471,10 @@ public class QuorumState {
return electionTimeoutMs + random.nextInt(electionTimeoutMs);
}
+ public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+ return state.canGrantVote(candidateId, isLogUpToDate);
+ }
+
public FollowerState followerStateOrThrow() {
if (isFollower())
return (FollowerState) state;
diff --git a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
index c1608aa..899823a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
import java.util.HashSet;
import java.util.List;
@@ -45,6 +47,7 @@ public class ResignedState implements EpochState {
private final Set<Integer> unackedVoters;
private final Timer electionTimer;
private final List<Integer> preferredSuccessors;
+ private final Logger log;
public ResignedState(
Time time,
@@ -52,7 +55,8 @@ public class ResignedState implements EpochState {
int epoch,
Set<Integer> voters,
long electionTimeoutMs,
- List<Integer> preferredSuccessors
+ List<Integer> preferredSuccessors,
+ LogContext logContext
) {
this.localId = localId;
this.epoch = epoch;
@@ -62,6 +66,7 @@ public class ResignedState implements EpochState {
this.electionTimeoutMs = electionTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
this.preferredSuccessors = preferredSuccessors;
+ this.log = logContext.logger(ResignedState.class);
}
@Override
@@ -126,6 +131,13 @@ public class ResignedState implements EpochState {
}
@Override
+ public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+ log.debug("Rejecting vote request from candidate {} since we have resigned as candidate/leader in epoch {}",
+ candidateId, epoch);
+ return false;
+ }
+
+ @Override
public String name() {
return "Resigned";
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index 62b82e2..4dc5fc7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
import java.util.Optional;
import java.util.OptionalInt;
@@ -34,19 +36,22 @@ public class UnattachedState implements EpochState {
private final long electionTimeoutMs;
private final Timer electionTimer;
private final Optional<LogOffsetMetadata> highWatermark;
+ private final Logger log;
public UnattachedState(
Time time,
int epoch,
Set<Integer> voters,
Optional<LogOffsetMetadata> highWatermark,
- long electionTimeoutMs
+ long electionTimeoutMs,
+ LogContext logContext
) {
this.epoch = epoch;
this.voters = voters;
this.highWatermark = highWatermark;
this.electionTimeoutMs = electionTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
+ this.log = logContext.logger(UnattachedState.class);
}
@Override
@@ -89,6 +94,15 @@ public class UnattachedState implements EpochState {
}
@Override
+ public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+ if (!isLogUpToDate) {
+ log.debug("Rejecting vote request from candidate {} since candidate epoch/offset is not up to date with us",
+ candidateId);
+ }
+ return isLogUpToDate;
+ }
+
+ @Override
public String toString() {
return "Unattached(" +
"epoch=" + epoch +
diff --git a/raft/src/main/java/org/apache/kafka/raft/VotedState.java b/raft/src/main/java/org/apache/kafka/raft/VotedState.java
index 4138176..2ae5026 100644
--- a/raft/src/main/java/org/apache/kafka/raft/VotedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/VotedState.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
import java.util.Optional;
import java.util.OptionalInt;
@@ -36,6 +38,7 @@ public class VotedState implements EpochState {
private final int electionTimeoutMs;
private final Timer electionTimer;
private final Optional<LogOffsetMetadata> highWatermark;
+ private final Logger log;
public VotedState(
Time time,
@@ -43,7 +46,8 @@ public class VotedState implements EpochState {
int votedId,
Set<Integer> voters,
Optional<LogOffsetMetadata> highWatermark,
- int electionTimeoutMs
+ int electionTimeoutMs,
+ LogContext logContext
) {
this.epoch = epoch;
this.votedId = votedId;
@@ -51,6 +55,7 @@ public class VotedState implements EpochState {
this.highWatermark = highWatermark;
this.electionTimeoutMs = electionTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
+ this.log = logContext.logger(VotedState.class);
}
@Override
@@ -93,6 +98,17 @@ public class VotedState implements EpochState {
}
@Override
+ public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
+ if (votedId() == candidateId) {
+ return true;
+ }
+
+ log.debug("Rejecting vote request from candidate {} since we already have voted for " +
+ "another candidate {} in epoch {}", candidateId, votedId(), epoch);
+ return false;
+ }
+
+ @Override
public Optional<LogOffsetMetadata> highWatermark() {
return highWatermark;
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java b/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
index 8c4633b..71a2375 100644
--- a/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
@@ -16,12 +16,16 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collections;
import java.util.Optional;
+import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -33,11 +37,27 @@ public class CandidateStateTest {
private final int epoch = 5;
private final MockTime time = new MockTime();
private final int electionTimeoutMs = 5000;
+ private final LogContext logContext = new LogContext();
+
+ private CandidateState newCandidateState(
+ Set<Integer> voters,
+ Optional<LogOffsetMetadata> highWatermark
+ ) {
+ return new CandidateState(
+ time,
+ localId,
+ epoch,
+ voters,
+ highWatermark,
+ 0,
+ electionTimeoutMs,
+ logContext
+ );
+ }
@Test
public void testSingleNodeQuorum() {
- CandidateState state = new CandidateState(time, localId, epoch,
- Collections.singleton(localId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(Collections.singleton(localId), Optional.empty());
assertTrue(state.isVoteGranted());
assertFalse(state.isVoteRejected());
assertEquals(Collections.emptySet(), state.unrecordedVoters());
@@ -46,8 +66,7 @@ public class CandidateStateTest {
@Test
public void testTwoNodeQuorumVoteRejected() {
int otherNodeId = 1;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(Utils.mkSet(localId, otherNodeId), Optional.empty());
assertFalse(state.isVoteGranted());
assertFalse(state.isVoteRejected());
assertEquals(Collections.singleton(otherNodeId), state.unrecordedVoters());
@@ -59,8 +78,8 @@ public class CandidateStateTest {
@Test
public void testTwoNodeQuorumVoteGranted() {
int otherNodeId = 1;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Utils.mkSet(localId, otherNodeId), Optional.empty());
assertFalse(state.isVoteGranted());
assertFalse(state.isVoteRejected());
assertEquals(Collections.singleton(otherNodeId), state.unrecordedVoters());
@@ -74,8 +93,8 @@ public class CandidateStateTest {
public void testThreeNodeQuorumVoteGranted() {
int node1 = 1;
int node2 = 2;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, node1, node2), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Utils.mkSet(localId, node1, node2), Optional.empty());
assertFalse(state.isVoteGranted());
assertFalse(state.isVoteRejected());
assertEquals(Utils.mkSet(node1, node2), state.unrecordedVoters());
@@ -93,8 +112,8 @@ public class CandidateStateTest {
public void testThreeNodeQuorumVoteRejected() {
int node1 = 1;
int node2 = 2;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, node1, node2), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Utils.mkSet(localId, node1, node2), Optional.empty());
assertFalse(state.isVoteGranted());
assertFalse(state.isVoteRejected());
assertEquals(Utils.mkSet(node1, node2), state.unrecordedVoters());
@@ -111,16 +130,16 @@ public class CandidateStateTest {
@Test
public void testCannotRejectVoteFromLocalId() {
int otherNodeId = 1;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Utils.mkSet(localId, otherNodeId), Optional.empty());
assertThrows(IllegalArgumentException.class, () -> state.recordRejectedVote(localId));
}
@Test
public void testCannotChangeVoteGrantedToRejected() {
int otherNodeId = 1;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Utils.mkSet(localId, otherNodeId), Optional.empty());
assertTrue(state.recordGrantedVote(otherNodeId));
assertThrows(IllegalArgumentException.class, () -> state.recordRejectedVote(otherNodeId));
assertTrue(state.isVoteGranted());
@@ -129,8 +148,8 @@ public class CandidateStateTest {
@Test
public void testCannotChangeVoteRejectedToGranted() {
int otherNodeId = 1;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Utils.mkSet(localId, otherNodeId), Optional.empty());
assertTrue(state.recordRejectedVote(otherNodeId));
assertThrows(IllegalArgumentException.class, () -> state.recordGrantedVote(otherNodeId));
assertTrue(state.isVoteRejected());
@@ -139,8 +158,8 @@ public class CandidateStateTest {
@Test
public void testCannotGrantOrRejectNonVoters() {
int nonVoterId = 1;
- CandidateState state = new CandidateState(time, localId, epoch,
- Collections.singleton(localId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Collections.singleton(localId), Optional.empty());
assertThrows(IllegalArgumentException.class, () -> state.recordGrantedVote(nonVoterId));
assertThrows(IllegalArgumentException.class, () -> state.recordRejectedVote(nonVoterId));
}
@@ -148,8 +167,8 @@ public class CandidateStateTest {
@Test
public void testIdempotentGrant() {
int otherNodeId = 1;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Utils.mkSet(localId, otherNodeId), Optional.empty());
assertTrue(state.recordGrantedVote(otherNodeId));
assertFalse(state.recordGrantedVote(otherNodeId));
}
@@ -157,10 +176,23 @@ public class CandidateStateTest {
@Test
public void testIdempotentReject() {
int otherNodeId = 1;
- CandidateState state = new CandidateState(time, localId, epoch,
- Utils.mkSet(localId, otherNodeId), Optional.empty(), 0, electionTimeoutMs);
+ CandidateState state = newCandidateState(
+ Utils.mkSet(localId, otherNodeId), Optional.empty());
assertTrue(state.recordRejectedVote(otherNodeId));
assertFalse(state.recordRejectedVote(otherNodeId));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGrantVote(boolean isLogUpToDate) {
+ CandidateState state = newCandidateState(
+ Utils.mkSet(1, 2, 3),
+ Optional.empty()
+ );
+
+ assertFalse(state.canGrantVote(1, isLogUpToDate));
+ assertFalse(state.canGrantVote(2, isLogUpToDate));
+ assertFalse(state.canGrantVote(3, isLogUpToDate));
+ }
+
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java b/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
index 2965f05..42c6bc9 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
@@ -16,12 +16,16 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -30,21 +34,29 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class FollowerStateTest {
private final MockTime time = new MockTime();
+ private final LogContext logContext = new LogContext();
+ private final int epoch = 5;
+ private final int fetchTimeoutMs = 15000;
+ int leaderId = 3;
- @Test
- public void testFetchTimeoutExpiration() {
- int epoch = 5;
- int leaderId = 3;
- int fetchTimeoutMs = 15000;
-
- FollowerState state = new FollowerState(
+ private FollowerState newFollowerState(
+ Set<Integer> voters,
+ Optional<LogOffsetMetadata> highWatermark
+ ) {
+ return new FollowerState(
time,
epoch,
leaderId,
- Utils.mkSet(1, 2, 3),
- Optional.empty(),
- fetchTimeoutMs
+ voters,
+ highWatermark,
+ fetchTimeoutMs,
+ logContext
);
+ }
+
+ @Test
+ public void testFetchTimeoutExpiration() {
+ FollowerState state = newFollowerState(Utils.mkSet(1, 2, 3), Optional.empty());
assertFalse(state.hasFetchTimeoutExpired(time.milliseconds()));
assertEquals(fetchTimeoutMs, state.remainingFetchTimeMs(time.milliseconds()));
@@ -60,18 +72,7 @@ public class FollowerStateTest {
@Test
public void testMonotonicHighWatermark() {
- int epoch = 5;
- int leaderId = 3;
- int fetchTimeoutMs = 15000;
-
- FollowerState state = new FollowerState(
- time,
- epoch,
- leaderId,
- Utils.mkSet(1, 2, 3),
- Optional.empty(),
- fetchTimeoutMs
- );
+ FollowerState state = newFollowerState(Utils.mkSet(1, 2, 3), Optional.empty());
OptionalLong highWatermark = OptionalLong.of(15L);
state.updateHighWatermark(highWatermark);
@@ -81,4 +82,17 @@ public class FollowerStateTest {
assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGrantVote(boolean isLogUpToDate) {
+ FollowerState state = newFollowerState(
+ Utils.mkSet(1, 2, 3),
+ Optional.empty()
+ );
+
+ assertFalse(state.canGrantVote(1, isLogUpToDate));
+ assertFalse(state.canGrantVote(2, isLogUpToDate));
+ assertFalse(state.canGrantVote(3, isLogUpToDate));
+ }
+
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index cdadf6e..c7490b5 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -18,7 +18,10 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
@@ -254,6 +257,16 @@ public class LeaderStateTest {
assertEquals(emptySet(), state.getObserverStates(time.milliseconds()).keySet());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGrantVote(boolean isLogUpToDate) {
+ LeaderState state = newLeaderState(Utils.mkSet(1, 2, 3), 1);
+
+ assertFalse(state.canGrantVote(1, isLogUpToDate));
+ assertFalse(state.canGrantVote(2, isLogUpToDate));
+ assertFalse(state.canGrantVote(3, isLogUpToDate));
+ }
+
private static class MockOffsetMetadata implements OffsetMetadata {
private final String value;
diff --git a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
index 20e2e84..770297b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
@@ -16,11 +16,15 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,23 +34,32 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class ResignedStateTest {
private final MockTime time = new MockTime();
+ private final LogContext logContext = new LogContext();
+ int electionTimeoutMs = 5000;
+ int localId = 0;
+ int epoch = 5;
- @Test
- public void testResignedState() {
- int electionTimeoutMs = 5000;
- int localId = 0;
- int remoteId = 1;
- int epoch = 5;
- Set<Integer> voters = Utils.mkSet(localId, remoteId);
-
- ResignedState state = new ResignedState(
+ private ResignedState newResignedState(
+ Set<Integer> voters,
+ List<Integer> preferredSuccessors
+ ) {
+ return new ResignedState(
time,
localId,
epoch,
voters,
electionTimeoutMs,
- Collections.emptyList()
+ preferredSuccessors,
+ logContext
);
+ }
+
+ @Test
+ public void testResignedState() {
+ int remoteId = 1;
+ Set<Integer> voters = Utils.mkSet(localId, remoteId);
+
+ ResignedState state = newResignedState(voters, Collections.emptyList());
assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), state.election());
assertEquals(epoch, state.epoch());
@@ -65,4 +78,16 @@ class ResignedStateTest {
assertTrue(state.hasElectionTimeoutExpired(time.milliseconds()));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGrantVote(boolean isLogUpToDate) {
+ ResignedState state = newResignedState(
+ Utils.mkSet(1, 2, 3),
+ Collections.emptyList()
+ );
+
+ assertFalse(state.canGrantVote(1, isLogUpToDate));
+ assertFalse(state.canGrantVote(2, isLogUpToDate));
+ assertFalse(state.canGrantVote(3, isLogUpToDate));
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
index bd00baa..96f2a52 100644
--- a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Optional;
import java.util.Set;
@@ -30,19 +33,28 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class UnattachedStateTest {
private final MockTime time = new MockTime();
+ private final LogContext logContext = new LogContext();
+ private final int epoch = 5;
+ private final int electionTimeoutMs = 10000;
+
+ private UnattachedState newUnattachedState(Set<Integer> voters, Optional<LogOffsetMetadata> highWatermark) {
+ return new UnattachedState(
+ time,
+ epoch,
+ voters,
+ highWatermark,
+ electionTimeoutMs,
+ logContext
+ );
+ }
@Test
public void testElectionTimeout() {
Set<Integer> voters = Utils.mkSet(1, 2, 3);
- int epoch = 1;
- int electionTimeoutMs = 10000;
-
- UnattachedState state = new UnattachedState(
- time,
- epoch,
- voters,
- Optional.empty(),
- electionTimeoutMs
+
+ UnattachedState state = newUnattachedState(
+ voters,
+ Optional.empty()
);
assertEquals(epoch, state.epoch());
@@ -59,4 +71,17 @@ public class UnattachedStateTest {
assertEquals(0, state.remainingElectionTimeMs(time.milliseconds()));
assertTrue(state.hasElectionTimeoutExpired(time.milliseconds()));
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGrantVote(boolean isLogUpToDate) {
+ UnattachedState state = newUnattachedState(
+ Utils.mkSet(1, 2, 3),
+ Optional.empty()
+ );
+
+ assertEquals(isLogUpToDate, state.canGrantVote(1, isLogUpToDate));
+ assertEquals(isLogUpToDate, state.canGrantVote(2, isLogUpToDate));
+ assertEquals(isLogUpToDate, state.canGrantVote(3, isLogUpToDate));
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java
index 843855b..317b80f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Optional;
import java.util.Set;
@@ -30,22 +33,31 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class VotedStateTest {
private final MockTime time = new MockTime();
+ private final LogContext logContext = new LogContext();
+ private final int epoch = 5;
+ private final int votedId = 1;
+ private final int electionTimeoutMs = 10000;
- @Test
- public void testElectionTimeout() {
- Set<Integer> voters = Utils.mkSet(1, 2, 3);
- int epoch = 5;
- int votedId = 1;
- int electionTimeoutMs = 10000;
-
- VotedState state = new VotedState(
+ private VotedState newVotedState(
+ Set<Integer> voters,
+ Optional<LogOffsetMetadata> highWatermark
+ ) {
+ return new VotedState(
time,
epoch,
votedId,
voters,
- Optional.empty(),
- electionTimeoutMs
+ highWatermark,
+ electionTimeoutMs,
+ logContext
);
+ }
+
+ @Test
+ public void testElectionTimeout() {
+ Set<Integer> voters = Utils.mkSet(1, 2, 3);
+
+ VotedState state = newVotedState(voters, Optional.empty());
assertEquals(epoch, state.epoch());
assertEquals(votedId, state.votedId());
@@ -62,4 +74,16 @@ class VotedStateTest {
assertTrue(state.hasElectionTimeoutExpired(time.milliseconds()));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGrantVote(boolean isLogUpToDate) {
+ VotedState state = newVotedState(
+ Utils.mkSet(1, 2, 3),
+ Optional.empty()
+ );
+
+ assertTrue(state.canGrantVote(1, isLogUpToDate));
+ assertFalse(state.canGrantVote(2, isLogUpToDate));
+ assertFalse(state.canGrantVote(3, isLogUpToDate));
+ }
}