You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/29 19:49:40 UTC

[GitHub] [kafka] hachikuji opened a new pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

hachikuji opened a new pull request #9531:
URL: https://github.com/apache/kafka/pull/9531


   When initializing the raft state machine after shutting down as a leader, we were previously entering the "unattached" state, which means we have no leader and no voted candidate. This was a bug because it allowed a reinitialized leader to cast a vote for a candidate in the same epoch that it was already the leader of. This patch fixes the problem by introducing a new "resigned" state which allows us to retain the leader state so that we cannot change our vote and we will not accept additional appends.
   
   This patch also revamps the shutdown logic to make use of the new "resigned" state. Previously we had a separate path in `KafkaRaftClient.poll` for the shutdown logic which resulted in some duplication. Instead now we incorporate shutdown behavior into each state's respective logic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r516198568



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1560,9 +1558,34 @@ private long pollLeader(long currentTimeMs) {
         return Math.min(timeUntilFlush, timeUntilSend);
     }
 
+    private long maybeSendVoteRequests(
+        CandidateState state,
+        long currentTimeMs
+    ) {
+        // Continue sending Vote requests as long as we still have a chance to win the election
+        if (!state.isVoteRejected()) {
+            return maybeSendRequests(
+                currentTimeMs,
+                state.unrecordedVoters(),
+                this::buildVoteRequest
+            );
+        }
+        return Long.MAX_VALUE;
+    }
+
     private long pollCandidate(long currentTimeMs) throws IOException {
         CandidateState state = quorum.candidateStateOrThrow();
-        if (state.isBackingOff()) {
+        GracefulShutdown shutdown = this.shutdown.get();
+
+        if (shutdown != null) {
+            // If we happen to shutdown while we are a candidate, we will continue
+            // with the current election until one of the following conditions is met:
+            //  1) we are elected as leader (which allows us to resign)
+            //  2) another leader is elected
+            //  3) the shutdown timer expires
+            long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
+            return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);

Review comment:
       The intent is to ignore the election timeout in order to prevent a shutting down broker from becoming a candidate. Does that make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r516385415



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1560,9 +1558,34 @@ private long pollLeader(long currentTimeMs) {
         return Math.min(timeUntilFlush, timeUntilSend);
     }
 
+    private long maybeSendVoteRequests(
+        CandidateState state,
+        long currentTimeMs
+    ) {
+        // Continue sending Vote requests as long as we still have a chance to win the election
+        if (!state.isVoteRejected()) {
+            return maybeSendRequests(
+                currentTimeMs,
+                state.unrecordedVoters(),
+                this::buildVoteRequest
+            );
+        }
+        return Long.MAX_VALUE;
+    }
+
     private long pollCandidate(long currentTimeMs) throws IOException {
         CandidateState state = quorum.candidateStateOrThrow();
-        if (state.isBackingOff()) {
+        GracefulShutdown shutdown = this.shutdown.get();
+
+        if (shutdown != null) {
+            // If we happen to shutdown while we are a candidate, we will continue
+            // with the current election until one of the following conditions is met:
+            //  1) we are elected as leader (which allows us to resign)
+            //  2) another leader is elected
+            //  3) the shutdown timer expires
+            long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
+            return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);

Review comment:
       Understand, The candidate will try to complete only the current election when shutting down, so just ignore the election timeout.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r519965008



##########
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##########
@@ -21,18 +21,21 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This class is responsible for managing the current state of this node and ensuring only
- * valid state transitions.
+ * This class is responsible for managing the current state of this node and ensuring
+ * only valid state transitions. Below we define the possible state transitions and
+ * how they are triggered:
  *
- * Unattached =>
+ * Unattached|Resigned =>

Review comment:
       Hmm.. The quorum size does not change because a leader resigns. If there are 2N nodes in the cluster, then we always need N + 1 votes, so I don't think this case is possible.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#issuecomment-724186172


   LGTM.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r514945125



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1543,9 +1516,40 @@ private long maybeAppendBatches(
         return timeUnitFlush;
     }
 
+    private long pollResigned(long currentTimeMs) throws IOException {
+        GracefulShutdown shutdown = this.shutdown.get();
+        ResignedState state = quorum.resignedStateOrThrow();
+
+        long endQuorumBackoffMs = maybeSendRequests(

Review comment:
       If the cluster resigns from candidateState, it will also send EndQuorumEpochRequest to all voters?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r520020029



##########
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##########
@@ -21,18 +21,21 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This class is responsible for managing the current state of this node and ensuring only
- * valid state transitions.
+ * This class is responsible for managing the current state of this node and ensuring
+ * only valid state transitions. Below we define the possible state transitions and
+ * how they are triggered:
  *
- * Unattached =>
+ * Unattached|Resigned =>

Review comment:
       Okay now I remembered what we discussed before.
   
   What I was wondering is, say with quorum size 6, we would need 4 votes to elect leader; if the current leader shutdown and before it is restarted, the quorum size is 5 so logically we only need 3 votes --- but as long as we require that during this transition we still require 4 votes even with 5 alive quorum members we are fine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r519969208



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1601,7 +1618,12 @@ private long pollFollower(long currentTimeMs) throws IOException {
     }
 
     private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) throws IOException {
-        if (state.hasFetchTimeoutExpired(currentTimeMs)) {
+        GracefulShutdown shutdown = this.shutdown.get();
+        if (shutdown != null) {
+            // If we are a follower, then we can shutdown immediately. We want to
+            // skip the transition to candidate in any case.
+            return 0;

Review comment:
       If we are a follower, then there is no election in progress to help with. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9531:
URL: https://github.com/apache/kafka/pull/9531


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r519968145



##########
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##########
@@ -21,18 +21,21 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This class is responsible for managing the current state of this node and ensuring only
- * valid state transitions.
+ * This class is responsible for managing the current state of this node and ensuring
+ * only valid state transitions. Below we define the possible state transitions and
+ * how they are triggered:
  *
- * Unattached =>
+ * Unattached|Resigned =>

Review comment:
       In the future, when we have reassignment, we will still have to protect every quorum change with a majority of the current nodes. The proposal we had previously only allowed single-node changes, which meant that any majority was a majority before and after the state was applied.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#issuecomment-723747753


   > One thing I am strongly considering, however, is changing this state machine so that the resigned state is only for leaders. That would definitely simplify the logic. The optimization mentioned above in response to @dengziming 's question seems unlikely to have much benefit in practice. We could always reconsider it in the future of course.
   
   Sounds good!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r519551394



##########
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##########
@@ -21,18 +21,21 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This class is responsible for managing the current state of this node and ensuring only
- * valid state transitions.
+ * This class is responsible for managing the current state of this node and ensuring
+ * only valid state transitions. Below we define the possible state transitions and
+ * how they are triggered:
  *
- * Unattached =>
+ * Unattached|Resigned =>

Review comment:
       Makes me thinking: if we have an even number sized quorum (say 2N), and the leader is resigning. Then before the leader shutdown we need N+1 votes, while after the leader shutdown, the quorum size shrink to 2N-1 and we would only need N votes. So if the resigning leader gives it a vote to a candidate and then shutdown, while the candidates thinks they only need N votes, would that potentially result in two candidates claiming victory --- somehow this sounds quite close to the real world :P --- each with N votes while one of them has the vote from the resigned leader?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1601,7 +1618,12 @@ private long pollFollower(long currentTimeMs) throws IOException {
     }
 
     private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) throws IOException {
-        if (state.hasFetchTimeoutExpired(currentTimeMs)) {
+        GracefulShutdown shutdown = this.shutdown.get();
+        if (shutdown != null) {
+            // If we are a follower, then we can shutdown immediately. We want to
+            // skip the transition to candidate in any case.
+            return 0;

Review comment:
       Why the behavior of `pollFollowerAsVoter` and `pollVoted` are different when shutting down? Could the former case still help in casting and completing a vote as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org