You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/29 01:01:49 UTC
[kafka] branch trunk updated: KAFKA-12631;
Implement `resign` API in `KafkaRaftClient` (#10913)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f86cb1d KAFKA-12631; Implement `resign` API in `KafkaRaftClient` (#10913)
f86cb1d is described below
commit f86cb1d1da4f73bf92b0fcfd5ef75b3ce77658cf
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 28 18:00:19 2021 -0700
KAFKA-12631; Implement `resign` API in `KafkaRaftClient` (#10913)
This patch adds an implementation of the `resign()` API which allows the controller to proactively resign leadership in case it encounters an unrecoverable situation. There was not a lot to do here because we already supported a `Resigned` state to facilitate graceful shutdown.
Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, David Arthur <mu...@gmail.com>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 59 ++++++--
.../java/org/apache/kafka/raft/LeaderState.java | 12 +-
.../java/org/apache/kafka/raft/QuorumState.java | 10 ++
.../java/org/apache/kafka/raft/RaftClient.java | 9 +-
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 150 ++++++++++++++++++++-
.../apache/kafka/raft/RaftClientTestContext.java | 44 +++---
6 files changed, 252 insertions(+), 32 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 31f4f0e..f1915b5 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -473,7 +473,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void transitionToResigned(List<Integer> preferredSuccessors) {
- fetchPurgatory.completeAllExceptionally(Errors.BROKER_NOT_AVAILABLE.exception("The broker is shutting down"));
+ fetchPurgatory.completeAllExceptionally(
+ Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
quorum.transitionToResigned(preferredSuccessors);
maybeFireLeaderChange();
resetConnections();
@@ -1914,8 +1915,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
LeaderState<T> state = quorum.leaderStateOrThrow();
maybeFireLeaderChange(state);
- GracefulShutdown shutdown = this.shutdown.get();
- if (shutdown != null) {
+ if (shutdown.get() != null || state.isResignRequested()) {
transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
return 0L;
}
@@ -2213,13 +2213,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private Long append(int epoch, List<T> records, boolean isAtomic) {
- BatchAccumulator<T> accumulator;
- try {
- accumulator = quorum.<T>leaderStateOrThrow().accumulator();
- } catch (IllegalStateException ise) {
+ Optional<LeaderState<T>> leaderStateOpt = quorum.maybeLeaderState();
+ if (!leaderStateOpt.isPresent()) {
return Long.MAX_VALUE;
}
+ BatchAccumulator<T> accumulator = leaderStateOpt.get().accumulator();
boolean isFirstAppend = accumulator.isEmpty();
final Long offset;
if (isAtomic) {
@@ -2250,7 +2249,51 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
@Override
public void resign(int epoch) {
- throw new UnsupportedOperationException();
+ if (epoch < 0) {
+ throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch);
+ }
+
+ if (!quorum.isVoter()) {
+ throw new IllegalStateException("Attempt to resign by a non-voter");
+ }
+
+ LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+ int currentEpoch = leaderAndEpoch.epoch();
+
+ if (epoch > currentEpoch) {
+ throw new IllegalArgumentException("Attempt to resign from epoch " + epoch +
+ " which is larger than the current epoch " + currentEpoch);
+ } else if (epoch < currentEpoch) {
+ // If the passed epoch is smaller than the current epoch, then it might mean
+ // that the listener has not been notified about a leader change that already
+ // took place. In this case, we consider the call as already fulfilled and
+ // take no further action.
+ logger.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
+ "current epoch {}", epoch, currentEpoch);
+ return;
+ } else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) {
+ throw new IllegalArgumentException("Cannot resign from epoch " + epoch +
+ " since we are not the leader");
+ } else {
+ // Note that if we transition to another state before we have a chance to
+ // request resignation, then we consider the call fulfilled.
+ Optional<LeaderState<Object>> leaderStateOpt = quorum.maybeLeaderState();
+ if (!leaderStateOpt.isPresent()) {
+ logger.debug("Ignoring call to resign from epoch {} since this node is " +
+ "no longer the leader", epoch);
+ return;
+ }
+
+ LeaderState<Object> leaderState = leaderStateOpt.get();
+ if (leaderState.epoch() != epoch) {
+ logger.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
+ "current epoch {}", epoch, leaderState.epoch());
+ } else {
+ logger.info("Received user request to resign from the current epoch {}", currentEpoch);
+ leaderState.requestResign();
+ wakeup();
+ }
+ }
}
@Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 4af1821..ea895de 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -51,9 +51,11 @@ public class LeaderState<T> implements EpochState {
private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
private final Set<Integer> grantingVoters = new HashSet<>();
private final Logger log;
-
private final BatchAccumulator<T> accumulator;
+ // This is volatile because resignation can be requested from an external thread.
+ private volatile boolean resignRequested = false;
+
protected LeaderState(
int localId,
int epoch,
@@ -100,6 +102,14 @@ public class LeaderState<T> implements EpochState {
accumulator.forceDrain();
}
+ public boolean isResignRequested() {
+ return resignRequested;
+ }
+
+ public void requestResign() {
+ this.resignRequested = true;
+ }
+
@Override
public Optional<LogOffsetMetadata> highWatermark() {
return highWatermark;
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 75c4691..23447a9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -514,6 +514,16 @@ public class QuorumState {
throw new IllegalStateException("Expected to be Leader, but current state is " + state);
}
+ @SuppressWarnings("unchecked")
+ public <T> Optional<LeaderState<T>> maybeLeaderState() {
+ EpochState state = this.state;
+ if (state instanceof LeaderState) {
+ return Optional.of((LeaderState<T>) state);
+ } else {
+ return Optional.empty();
+ }
+ }
+
public ResignedState resignedStateOrThrow() {
if (isResigned())
return (ResignedState) state;
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 3a46b46..2fe8fd3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -171,9 +171,12 @@ public interface RaftClient<T> extends AutoCloseable {
CompletableFuture<Void> shutdown(int timeoutMs);
/**
- * Resign the leadership. The leader will give up its leadership in the current epoch,
- * and a new election will be held. Note that nothing prevents this leader from getting
- * reelected.
+ * Resign the leadership. The leader will give up its leadership in the passed epoch
+ * (if it matches the current epoch), and a new election will be held. Note that nothing
+ * prevents this node from being reelected as the leader.
+ *
+ * Notification of successful resignation can be observed through
+ * {@link Listener#handleLeaderChange(LeaderAndEpoch)}.
*
* @param epoch the epoch to resign from. If this does not match the current epoch, this
* call will be ignored.
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index b000764..a70ad93 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
+import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -342,14 +343,14 @@ public class KafkaRaftClientTest {
// append some record, but the fetch in purgatory will still fail
context.log.appendAsLeader(
- context.buildBatch(context.log.endOffset().offset, epoch, Arrays.asList("raft")),
+ context.buildBatch(context.log.endOffset().offset, epoch, singletonList("raft")),
epoch
);
// when transition to resign, all request in fetchPurgatory will fail
context.client.shutdown(1000);
context.client.poll();
- context.assertSentFetchPartitionResponse(Errors.BROKER_NOT_AVAILABLE, epoch, OptionalInt.of(localId));
+ context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, epoch, OptionalInt.of(localId));
context.assertResignedLeader(epoch, localId);
// shutting down finished
@@ -360,6 +361,151 @@ public class KafkaRaftClientTest {
}
@Test
+ public void testResignInOlderEpochIgnored() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+ context.becomeLeader();
+ assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+ int currentEpoch = context.currentEpoch();
+ context.client.resign(currentEpoch - 1);
+ context.client.poll();
+
+ // Ensure we are still leader even after expiration of the election timeout.
+ context.time.sleep(context.electionTimeoutMs() * 2);
+ context.client.poll();
+ context.assertElectedLeader(currentEpoch, localId);
+ }
+
+ @Test
+ public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exception {
+ int localId = 0;
+ int remoteId1 = 1;
+ int remoteId2 = 2;
+ Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+ context.becomeLeader();
+ assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+ int resignedEpoch = context.currentEpoch();
+
+ context.client.resign(resignedEpoch);
+ context.pollUntil(context.client.quorum()::isResigned);
+
+ context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, remoteId1));
+ context.pollUntilResponse();
+ context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+ context.assertElectedLeader(resignedEpoch + 1, remoteId1);
+ assertEquals(new LeaderAndEpoch(OptionalInt.of(remoteId1), resignedEpoch + 1),
+ context.listener.currentLeaderAndEpoch());
+ }
+
+ @Test
+ public void testElectionTimeoutAfterUserInitiatedResign() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+ context.becomeLeader();
+ assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+ int resignedEpoch = context.currentEpoch();
+
+ context.client.resign(resignedEpoch);
+ context.pollUntil(context.client.quorum()::isResigned);
+
+ context.pollUntilRequest();
+ int correlationId = context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId);
+
+ EndQuorumEpochResponseData response = EndQuorumEpochResponse.singletonResponse(
+ Errors.NONE,
+ context.metadataPartition,
+ Errors.NONE,
+ resignedEpoch,
+ localId
+ );
+
+ context.deliverResponse(correlationId, otherNodeId, response);
+ context.client.poll();
+
+ // We do not resend `EndQuorumRequest` once the other voter has acknowledged it.
+ context.time.sleep(context.retryBackoffMs);
+ context.client.poll();
+ assertFalse(context.channel.hasSentRequests());
+
+ // Any `Fetch` received in the resigned state should result in a NOT_LEADER error.
+ context.deliverRequest(context.fetchRequest(1, -1, 0, 0, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER,
+ resignedEpoch, OptionalInt.of(localId));
+
+ // After the election timer, we should become a candidate.
+ context.time.sleep(2 * context.electionTimeoutMs());
+ context.pollUntil(context.client.quorum()::isCandidate);
+ assertEquals(resignedEpoch + 1, context.currentEpoch());
+ assertEquals(new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1),
+ context.listener.currentLeaderAndEpoch());
+ }
+
+ @Test
+ public void testCannotResignWithLargerEpochThanCurrentEpoch() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+ context.becomeLeader();
+
+ assertThrows(IllegalArgumentException.class,
+ () -> context.client.resign(context.currentEpoch() + 1));
+ }
+
+ @Test
+ public void testCannotResignIfNotLeader() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ int leaderEpoch = 2;
+ Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(leaderEpoch, otherNodeId)
+ .build();
+
+ assertEquals(OptionalInt.of(otherNodeId), context.currentLeader());
+ assertThrows(IllegalArgumentException.class, () -> context.client.resign(leaderEpoch));
+ }
+
+ @Test
+ public void testCannotResignIfObserver() throws Exception {
+ int leaderId = 1;
+ int otherNodeId = 2;
+ int epoch = 5;
+ Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(OptionalInt.empty(), voters).build();
+ context.pollUntilRequest();
+
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ assertTrue(voters.contains(fetchRequest.destinationId()));
+ context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+
+ context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(),
+ context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
+
+ context.client.poll();
+ context.assertElectedLeader(epoch, leaderId);
+ assertThrows(IllegalStateException.class, () -> context.client.resign(epoch));
+ }
+
+ @Test
public void testInitializeAsCandidateFromStateStore() throws Exception {
int localId = 0;
// Need 3 node to require a 2-node majority
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 755e0db..2f644e4 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -21,15 +21,15 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
-import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
-import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
@@ -670,14 +670,14 @@ public final class RaftClientTestContext {
return response.responses().get(0).partitions().get(0);
}
- void assertSentFetchPartitionResponse(Errors error) {
+ void assertSentFetchPartitionResponse(Errors topLevelError) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH);
assertEquals(
1, sentMessages.size(), "Found unexpected sent messages " + sentMessages);
RaftResponse.Outbound raftMessage = sentMessages.get(0);
assertEquals(ApiKeys.FETCH.id, raftMessage.data.apiKey());
FetchResponseData response = (FetchResponseData) raftMessage.data();
- assertEquals(error, Errors.forCode(response.errorCode()));
+ assertEquals(topLevelError, Errors.forCode(response.errorCode()));
}
@@ -1069,7 +1069,7 @@ public final class RaftClientTestContext {
private final List<Batch<String>> commits = new ArrayList<>();
private final List<BatchReader<String>> savedBatches = new ArrayList<>();
private final Map<Integer, Long> claimedEpochStartOffsets = new HashMap<>();
- private OptionalInt currentClaimedEpoch = OptionalInt.empty();
+ private LeaderAndEpoch currentLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);
private final OptionalInt localId;
private Optional<SnapshotReader<String>> snapshot = Optional.empty();
private boolean readCommit = true;
@@ -1086,6 +1086,10 @@ public final class RaftClientTestContext {
return claimedEpochStartOffsets.get(epoch);
}
+ LeaderAndEpoch currentLeaderAndEpoch() {
+ return currentLeaderAndEpoch;
+ }
+
Batch<String> lastCommit() {
if (commits.isEmpty()) {
return null;
@@ -1103,7 +1107,11 @@ public final class RaftClientTestContext {
}
OptionalInt currentClaimedEpoch() {
- return currentClaimedEpoch;
+ if (localId.isPresent() && currentLeaderAndEpoch.isLeader(localId.getAsInt())) {
+ return OptionalInt.of(currentLeaderAndEpoch.epoch());
+ } else {
+ return OptionalInt.empty();
+ }
}
List<String> commitWithBaseOffset(long baseOffset) {
@@ -1159,18 +1167,19 @@ public final class RaftClientTestContext {
}
@Override
- public void handleLeaderChange(LeaderAndEpoch leader) {
+ public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
// We record the next expected offset as the claimed epoch's start
- // offset. This is useful to verify that the `handleClaim` callback
+ // offset. This is useful to verify that the `handleLeaderChange` callback
// was not received early.
- if (localId.isPresent() && leader.isLeader(localId.getAsInt())) {
- long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
- lastCommitOffset().getAsLong() + 1 : 0L;
- this.currentClaimedEpoch = OptionalInt.of(leader.epoch());
- this.claimedEpochStartOffsets.put(leader.epoch(), claimedEpochStartOffset);
- } else {
- this.currentClaimedEpoch = OptionalInt.empty();
- }
+ this.currentLeaderAndEpoch = leaderAndEpoch;
+
+ currentClaimedEpoch().ifPresent(claimedEpoch -> {
+ if (claimedEpoch == leaderAndEpoch.epoch()) {
+ long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
+ lastCommitOffset().getAsLong() + 1 : 0L;
+ this.claimedEpochStartOffsets.put(leaderAndEpoch.epoch(), claimedEpochStartOffset);
+ }
+ });
}
@Override
@@ -1184,8 +1193,7 @@ public final class RaftClientTestContext {
@Override
public void handleSnapshot(SnapshotReader<String> reader) {
- snapshot.ifPresent(snapshot -> assertDoesNotThrow(() -> snapshot.close()));
-
+ snapshot.ifPresent(snapshot -> assertDoesNotThrow(snapshot::close));
commits.clear();
savedBatches.clear();
snapshot = Optional.of(reader);