You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/10 07:16:26 UTC

[GitHub] [kafka] dengziming opened a new pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

dengziming opened a new pull request #10289:
URL: https://github.com/apache/kafka/pull/10289


   *More detailed description of your change*
   This pr follows up #10129 which add clusterId validation to FetchRequest.
   
   *Summary of testing strategy (including rationale)*
   Unit test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10289:
URL: https://github.com/apache/kafka/pull/10289


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r593541172



##########
File path: clients/src/main/resources/common/message/VoteRequest.json
##########
@@ -21,7 +21,7 @@
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
-    { "name": "ClusterId", "type": "string", "versions": "0+",
+    { "name": "ClusterId", "type": "string", "versions": "0+",  "ignorable": true,

Review comment:
       this was checked in by mistake when testing, I will revert it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r595378358



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse(
         );
     }
 
-    private boolean hasValidClusterId(FetchRequestData request) {
+    private boolean hasValidClusterId(ApiMessage request) {

Review comment:
       Yes. @dengziming in that example, the user has incorrectly configured the cluster. The user was configured it so that all of the controllers have each other's listener (connection) information but the cluster ids are different.
   
   The question is do we want to catch those misconfiguration early by shutting down the brokers/controllers? Or do we want to continue executing with the user potentially missing that the controllers/brokers are incorrectly configuration?
   
   There have been conversation of having the first controller leader generate the cluster id and replicate that information to all off the nodes. The currently implementation generate the cluster id in the `StorateTool` which the user has to run when configuring the controllers.
   
   I am okay leaving it as is and addressing this in a future PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r598043448



##########
File path: checkstyle/suppressions.xml
##########
@@ -68,7 +68,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/>

Review comment:
       Thank you, I will take some time to improve this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r597453270



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -814,6 +822,10 @@ private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
     ) throws IOException {
         EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data;
 
+        if (!hasValidClusterId(request)) {

Review comment:
       I tried this approach, it seems that voter and leader is partition level terminology so that `validateVoterOnlyRequest` is used to get a partition level error but cluster validate is a request level error, we'd better separate these 2 errors since we are making way for multi-raft. I changed the `getClusterId` method to pass the clusterId to it directly, WDYT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r591851864



##########
File path: checkstyle/suppressions.xml
##########
@@ -68,7 +68,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/>

Review comment:
       It is sad that we have to add `KafkaRaftClient` to this list. Do you know what exactly pushed this over the threshold? This would allow us to look into ways to re-organize the code so that it is not so complex.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r594127628



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse(
         );
     }
 
-    private boolean hasValidClusterId(FetchRequestData request) {
+    private boolean hasValidClusterId(ApiMessage request) {

Review comment:
       It's a bit difficult to figure out how to add the window, we could not simply rely on a fixed configuration, I add a ticket to track this problem: https://issues.apache.org/jira/browse/KAFKA-12465.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r592853088



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse(
         );
     }
 
-    private boolean hasValidClusterId(FetchRequestData request) {
+    private boolean hasValidClusterId(ApiMessage request) {

Review comment:
       What should we do if we see this error in a response? It looks like it would hit `handleUnexpectedError` currently which just logs an error. That might be ok for now. I think there is a window during startup when we could consider these errors to be fatal. This would be helpful detecting configuration problems. We probably do not want them to be fatal in all cases though because that might result in a misconfigured node killing a stable cluster.

##########
File path: clients/src/main/resources/common/message/VoteRequest.json
##########
@@ -21,7 +21,7 @@
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
-    { "name": "ClusterId", "type": "string", "versions": "0+",
+    { "name": "ClusterId", "type": "string", "versions": "0+",  "ignorable": true,

Review comment:
       Why do we need this to be ignorable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r594874464



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse(
         );
     }
 
-    private boolean hasValidClusterId(FetchRequestData request) {
+    private boolean hasValidClusterId(ApiMessage request) {

Review comment:
       @jsancio This is simple but not very perfect, consider a four-node cluster A-0(clusterId=A) A-1(clusterId=A) B-0(clusterId=B) B-1(clusterId=B), when starting, they all become candidate and send vote request to other nodes, if they all receive vote response from a node with the same clusterId to itself then they will all live, but if all receive vote response from a node with a different clusterId they will all be killed. It seems that the logic is similar to leader-election which should reach a consensus. So we'd better treat them as non-fatal currently and have some discussion to reach a consensus about wheater we should treat this as fatal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r597274570



##########
File path: checkstyle/suppressions.xml
##########
@@ -68,7 +68,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/>

Review comment:
       I agree it is unfortunate. There are probably ways we can improve this. For example, this logic smells a little bit:
   ```java
           if (quorum.isLeader()) {
               logger.debug("Rejecting vote request {} with epoch {} since we are already leader on that epoch",
                       request, candidateEpoch);
               voteGranted = false;
           } else if (quorum.isCandidate()) {
               logger.debug("Rejecting vote request {} with epoch {} since we are already candidate on that epoch",
                       request, candidateEpoch);
               voteGranted = false;
           } else if (quorum.isResigned()) {
               logger.debug("Rejecting vote request {} with epoch {} since we have resigned as candidate/leader in this epoch",
                   request, candidateEpoch);
               voteGranted = false;
           } else if (quorum.isFollower()) {
               FollowerState state = quorum.followerStateOrThrow();
               logger.debug("Rejecting vote request {} with epoch {} since we already have a leader {} on that epoch",
                   request, candidateEpoch, state.leaderId());
               voteGranted = false;
   ```
   It might be possible to push this logic into `EpochState` or at least to use make use of the `name()` method in the logging. @dengziming would you be interested in following up on this separately?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r592000720



##########
File path: checkstyle/suppressions.xml
##########
@@ -68,7 +68,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/>

Review comment:
       `handleVoteRequest()` method has too many if condition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r597453270



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -814,6 +822,10 @@ private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
     ) throws IOException {
         EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data;
 
+        if (!hasValidClusterId(request)) {

Review comment:
       I tried this approach, it seems that voter and leader is partition level terminology so that `validateVoterOnlyRequest` is used to get a partition level error but cluster validate is a request level error, we'd better separate these 2 errors since we are making way for multi-raft. I think we can add a `validateRequest` method to get request level error, WDYT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r597278840



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -814,6 +822,10 @@ private EndQuorumEpochResponseData handleEndQuorumEpochRequest(
     ) throws IOException {
         EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data;
 
+        if (!hasValidClusterId(request)) {

Review comment:
       I think a better way to do this is to modify `validateVoterOnlyRequest` and `validateLeaderOnlyRequest` so that we pass the clusterId. Then we can get rid of `getClusterId`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#issuecomment-795263178


   Hello, @dajac , PTAL.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r594490953



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse(
         );
     }
 
-    private boolean hasValidClusterId(FetchRequestData request) {
+    private boolean hasValidClusterId(ApiMessage request) {

Review comment:
       We can implement that when handling a response, invalid cluster id are fatal unless a previous response contained a valid cluster id.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org