You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/29 01:01:49 UTC

[kafka] branch trunk updated: KAFKA-12631; Implement `resign` API in `KafkaRaftClient` (#10913)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f86cb1d  KAFKA-12631; Implement `resign` API in `KafkaRaftClient` (#10913)
f86cb1d is described below

commit f86cb1d1da4f73bf92b0fcfd5ef75b3ce77658cf
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 28 18:00:19 2021 -0700

    KAFKA-12631; Implement `resign` API in `KafkaRaftClient` (#10913)
    
    This patch adds an implementation of the `resign()` API which allows the controller to proactively resign leadership in case it encounters an unrecoverable situation. There was not a lot to do here because we already supported a `Resigned` state to facilitate graceful shutdown.
    
    Reviewers: José Armando García Sancio <js...@users.noreply.github.com>, David Arthur <mu...@gmail.com>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  59 ++++++--
 .../java/org/apache/kafka/raft/LeaderState.java    |  12 +-
 .../java/org/apache/kafka/raft/QuorumState.java    |  10 ++
 .../java/org/apache/kafka/raft/RaftClient.java     |   9 +-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 150 ++++++++++++++++++++-
 .../apache/kafka/raft/RaftClientTestContext.java   |  44 +++---
 6 files changed, 252 insertions(+), 32 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 31f4f0e..f1915b5 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -473,7 +473,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     }
 
     private void transitionToResigned(List<Integer> preferredSuccessors) {
-        fetchPurgatory.completeAllExceptionally(Errors.BROKER_NOT_AVAILABLE.exception("The broker is shutting down"));
+        fetchPurgatory.completeAllExceptionally(
+            Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
         quorum.transitionToResigned(preferredSuccessors);
         maybeFireLeaderChange();
         resetConnections();
@@ -1914,8 +1915,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         LeaderState<T> state = quorum.leaderStateOrThrow();
         maybeFireLeaderChange(state);
 
-        GracefulShutdown shutdown = this.shutdown.get();
-        if (shutdown != null) {
+        if (shutdown.get() != null || state.isResignRequested()) {
             transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
             return 0L;
         }
@@ -2213,13 +2213,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
     }
 
     private Long append(int epoch, List<T> records, boolean isAtomic) {
-        BatchAccumulator<T> accumulator;
-        try {
-            accumulator = quorum.<T>leaderStateOrThrow().accumulator();
-        } catch (IllegalStateException ise) {
+        Optional<LeaderState<T>> leaderStateOpt = quorum.maybeLeaderState();
+        if (!leaderStateOpt.isPresent()) {
             return Long.MAX_VALUE;
         }
 
+        BatchAccumulator<T> accumulator = leaderStateOpt.get().accumulator();
         boolean isFirstAppend = accumulator.isEmpty();
         final Long offset;
         if (isAtomic) {
@@ -2250,7 +2249,51 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
     @Override
     public void resign(int epoch) {
-        throw new UnsupportedOperationException();
+        if (epoch < 0) {
+            throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch);
+        }
+
+        if (!quorum.isVoter()) {
+            throw new IllegalStateException("Attempt to resign by a non-voter");
+        }
+
+        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+        int currentEpoch = leaderAndEpoch.epoch();
+
+        if (epoch > currentEpoch) {
+            throw new IllegalArgumentException("Attempt to resign from epoch " + epoch +
+                " which is larger than the current epoch " + currentEpoch);
+        } else if (epoch < currentEpoch) {
+            // If the passed epoch is smaller than the current epoch, then it might mean
+            // that the listener has not been notified about a leader change that already
+            // took place. In this case, we consider the call as already fulfilled and
+            // take no further action.
+            logger.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
+                "current epoch {}", epoch, currentEpoch);
+            return;
+        } else if (!leaderAndEpoch.isLeader(quorum.localIdOrThrow())) {
+            throw new IllegalArgumentException("Cannot resign from epoch " + epoch +
+                " since we are not the leader");
+        } else {
+            // Note that if we transition to another state before we have a chance to
+            // request resignation, then we consider the call fulfilled.
+            Optional<LeaderState<Object>> leaderStateOpt = quorum.maybeLeaderState();
+            if (!leaderStateOpt.isPresent()) {
+                logger.debug("Ignoring call to resign from epoch {} since this node is " +
+                    "no longer the leader", epoch);
+                return;
+            }
+
+            LeaderState<Object> leaderState = leaderStateOpt.get();
+            if (leaderState.epoch() != epoch) {
+                logger.debug("Ignoring call to resign from epoch {} since it is smaller than the " +
+                    "current epoch {}", epoch, leaderState.epoch());
+            } else {
+                logger.info("Received user request to resign from the current epoch {}", currentEpoch);
+                leaderState.requestResign();
+                wakeup();
+            }
+        }
     }
 
     @Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 4af1821..ea895de 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -51,9 +51,11 @@ public class LeaderState<T> implements EpochState {
     private final Map<Integer, ReplicaState> observerStates = new HashMap<>();
     private final Set<Integer> grantingVoters = new HashSet<>();
     private final Logger log;
-
     private final BatchAccumulator<T> accumulator;
 
+    // This is volatile because resignation can be requested from an external thread.
+    private volatile boolean resignRequested = false;
+
     protected LeaderState(
         int localId,
         int epoch,
@@ -100,6 +102,14 @@ public class LeaderState<T> implements EpochState {
         accumulator.forceDrain();
     }
 
+    public boolean isResignRequested() {
+        return resignRequested;
+    }
+
+    public void requestResign() {
+        this.resignRequested = true;
+    }
+
     @Override
     public Optional<LogOffsetMetadata> highWatermark() {
         return highWatermark;
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 75c4691..23447a9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -514,6 +514,16 @@ public class QuorumState {
         throw new IllegalStateException("Expected to be Leader, but current state is " + state);
     }
 
+    @SuppressWarnings("unchecked")
+    public <T> Optional<LeaderState<T>> maybeLeaderState() {
+        EpochState state = this.state;
+        if (state instanceof  LeaderState) {
+            return Optional.of((LeaderState<T>) state);
+        } else {
+            return Optional.empty();
+        }
+    }
+
     public ResignedState resignedStateOrThrow() {
         if (isResigned())
             return (ResignedState) state;
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 3a46b46..2fe8fd3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -171,9 +171,12 @@ public interface RaftClient<T> extends AutoCloseable {
     CompletableFuture<Void> shutdown(int timeoutMs);
 
     /**
-     * Resign the leadership. The leader will give up its leadership in the current epoch,
-     * and a new election will be held. Note that nothing prevents this leader from getting
-     * reelected.
+     * Resign the leadership. The leader will give up its leadership in the passed epoch
+     * (if it matches the current epoch), and a new election will be held. Note that nothing
+     * prevents this node from being reelected as the leader.
+     *
+     * Notification of successful resignation can be observed through
+     * {@link Listener#handleLeaderChange(LeaderAndEpoch)}.
      *
      * @param epoch the epoch to resign from. If this does not match the current epoch, this
      *              call will be ignored.
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index b000764..a70ad93 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.requests.DescribeQuorumRequest;
+import org.apache.kafka.common.requests.EndQuorumEpochResponse;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
@@ -342,14 +343,14 @@ public class KafkaRaftClientTest {
 
         // append some record, but the fetch in purgatory will still fail
         context.log.appendAsLeader(
-            context.buildBatch(context.log.endOffset().offset, epoch, Arrays.asList("raft")),
+            context.buildBatch(context.log.endOffset().offset, epoch, singletonList("raft")),
             epoch
         );
 
         // when transition to resign, all request in fetchPurgatory will fail
         context.client.shutdown(1000);
         context.client.poll();
-        context.assertSentFetchPartitionResponse(Errors.BROKER_NOT_AVAILABLE, epoch, OptionalInt.of(localId));
+        context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, epoch, OptionalInt.of(localId));
         context.assertResignedLeader(epoch, localId);
 
         // shutting down finished
@@ -360,6 +361,151 @@ public class KafkaRaftClientTest {
     }
 
     @Test
+    public void testResignInOlderEpochIgnored() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int currentEpoch = context.currentEpoch();
+        context.client.resign(currentEpoch - 1);
+        context.client.poll();
+
+        // Ensure we are still leader even after expiration of the election timeout.
+        context.time.sleep(context.electionTimeoutMs() * 2);
+        context.client.poll();
+        context.assertElectedLeader(currentEpoch, localId);
+    }
+
+    @Test
+    public void testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exception {
+        int localId = 0;
+        int remoteId1 = 1;
+        int remoteId2 = 2;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.deliverRequest(context.beginEpochRequest(resignedEpoch + 1, remoteId1));
+        context.pollUntilResponse();
+        context.assertSentBeginQuorumEpochResponse(Errors.NONE);
+        context.assertElectedLeader(resignedEpoch + 1, remoteId1);
+        assertEquals(new LeaderAndEpoch(OptionalInt.of(remoteId1), resignedEpoch + 1),
+            context.listener.currentLeaderAndEpoch());
+    }
+
+    @Test
+    public void testElectionTimeoutAfterUserInitiatedResign() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+
+        context.becomeLeader();
+        assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+        int resignedEpoch = context.currentEpoch();
+
+        context.client.resign(resignedEpoch);
+        context.pollUntil(context.client.quorum()::isResigned);
+
+        context.pollUntilRequest();
+        int correlationId = context.assertSentEndQuorumEpochRequest(resignedEpoch, otherNodeId);
+
+        EndQuorumEpochResponseData response = EndQuorumEpochResponse.singletonResponse(
+            Errors.NONE,
+            context.metadataPartition,
+            Errors.NONE,
+            resignedEpoch,
+            localId
+        );
+
+        context.deliverResponse(correlationId, otherNodeId, response);
+        context.client.poll();
+
+        // We do not resend `EndQuorumRequest` once the other voter has acknowledged it.
+        context.time.sleep(context.retryBackoffMs);
+        context.client.poll();
+        assertFalse(context.channel.hasSentRequests());
+
+        // Any `Fetch` received in the resigned state should result in a NOT_LEADER error.
+        context.deliverRequest(context.fetchRequest(1, -1, 0, 0, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER,
+            resignedEpoch, OptionalInt.of(localId));
+
+        // After the election timer, we should become a candidate.
+        context.time.sleep(2 * context.electionTimeoutMs());
+        context.pollUntil(context.client.quorum()::isCandidate);
+        assertEquals(resignedEpoch + 1, context.currentEpoch());
+        assertEquals(new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1),
+            context.listener.currentLeaderAndEpoch());
+    }
+
+    @Test
+    public void testCannotResignWithLargerEpochThanCurrentEpoch() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
+        context.becomeLeader();
+
+        assertThrows(IllegalArgumentException.class,
+            () -> context.client.resign(context.currentEpoch() + 1));
+    }
+
+    @Test
+    public void testCannotResignIfNotLeader() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int leaderEpoch = 2;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(leaderEpoch, otherNodeId)
+            .build();
+
+        assertEquals(OptionalInt.of(otherNodeId), context.currentLeader());
+        assertThrows(IllegalArgumentException.class, () -> context.client.resign(leaderEpoch));
+    }
+
+    @Test
+    public void testCannotResignIfObserver() throws Exception {
+        int leaderId = 1;
+        int otherNodeId = 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(OptionalInt.empty(), voters).build();
+        context.pollUntilRequest();
+
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        assertTrue(voters.contains(fetchRequest.destinationId()));
+        context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
+
+        context.deliverResponse(fetchRequest.correlationId, fetchRequest.destinationId(),
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH));
+
+        context.client.poll();
+        context.assertElectedLeader(epoch, leaderId);
+        assertThrows(IllegalStateException.class, () -> context.client.resign(epoch));
+    }
+
+    @Test
     public void testInitializeAsCandidateFromStateStore() throws Exception {
         int localId = 0;
         // Need 3 node to require a 2-node majority
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 755e0db..2f644e4 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -21,15 +21,15 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
-import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
 import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
 import org.apache.kafka.common.message.EndQuorumEpochRequestData;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.FetchSnapshotResponseData;
-import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
 import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
 import org.apache.kafka.common.message.VoteRequestData;
 import org.apache.kafka.common.message.VoteResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -670,14 +670,14 @@ public final class RaftClientTestContext {
         return response.responses().get(0).partitions().get(0);
     }
 
-    void assertSentFetchPartitionResponse(Errors error) {
+    void assertSentFetchPartitionResponse(Errors topLevelError) {
         List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH);
         assertEquals(
             1, sentMessages.size(), "Found unexpected sent messages " + sentMessages);
         RaftResponse.Outbound raftMessage = sentMessages.get(0);
         assertEquals(ApiKeys.FETCH.id, raftMessage.data.apiKey());
         FetchResponseData response = (FetchResponseData) raftMessage.data();
-        assertEquals(error, Errors.forCode(response.errorCode()));
+        assertEquals(topLevelError, Errors.forCode(response.errorCode()));
     }
 
 
@@ -1069,7 +1069,7 @@ public final class RaftClientTestContext {
         private final List<Batch<String>> commits = new ArrayList<>();
         private final List<BatchReader<String>> savedBatches = new ArrayList<>();
         private final Map<Integer, Long> claimedEpochStartOffsets = new HashMap<>();
-        private OptionalInt currentClaimedEpoch = OptionalInt.empty();
+        private LeaderAndEpoch currentLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);
         private final OptionalInt localId;
         private Optional<SnapshotReader<String>> snapshot = Optional.empty();
         private boolean readCommit = true;
@@ -1086,6 +1086,10 @@ public final class RaftClientTestContext {
             return claimedEpochStartOffsets.get(epoch);
         }
 
+        LeaderAndEpoch currentLeaderAndEpoch() {
+            return currentLeaderAndEpoch;
+        }
+
         Batch<String> lastCommit() {
             if (commits.isEmpty()) {
                 return null;
@@ -1103,7 +1107,11 @@ public final class RaftClientTestContext {
         }
 
         OptionalInt currentClaimedEpoch() {
-            return currentClaimedEpoch;
+            if (localId.isPresent() && currentLeaderAndEpoch.isLeader(localId.getAsInt())) {
+                return OptionalInt.of(currentLeaderAndEpoch.epoch());
+            } else {
+                return OptionalInt.empty();
+            }
         }
 
         List<String> commitWithBaseOffset(long baseOffset) {
@@ -1159,18 +1167,19 @@ public final class RaftClientTestContext {
         }
 
         @Override
-        public void handleLeaderChange(LeaderAndEpoch leader) {
+        public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
             // We record the next expected offset as the claimed epoch's start
-            // offset. This is useful to verify that the `handleClaim` callback
+            // offset. This is useful to verify that the `handleLeaderChange` callback
             // was not received early.
-            if (localId.isPresent() && leader.isLeader(localId.getAsInt())) {
-                long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
-                    lastCommitOffset().getAsLong() + 1 : 0L;
-                this.currentClaimedEpoch = OptionalInt.of(leader.epoch());
-                this.claimedEpochStartOffsets.put(leader.epoch(), claimedEpochStartOffset);
-            } else {
-                this.currentClaimedEpoch = OptionalInt.empty();
-            }
+            this.currentLeaderAndEpoch = leaderAndEpoch;
+
+            currentClaimedEpoch().ifPresent(claimedEpoch -> {
+                if (claimedEpoch == leaderAndEpoch.epoch()) {
+                    long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
+                        lastCommitOffset().getAsLong() + 1 : 0L;
+                    this.claimedEpochStartOffsets.put(leaderAndEpoch.epoch(), claimedEpochStartOffset);
+                }
+            });
         }
 
         @Override
@@ -1184,8 +1193,7 @@ public final class RaftClientTestContext {
 
         @Override
         public void handleSnapshot(SnapshotReader<String> reader) {
-            snapshot.ifPresent(snapshot -> assertDoesNotThrow(() -> snapshot.close()));
-
+            snapshot.ifPresent(snapshot -> assertDoesNotThrow(snapshot::close));
             commits.clear();
             savedBatches.clear();
             snapshot = Optional.of(reader);