You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/13 02:35:47 UTC

[GitHub] [kafka] dengziming opened a new pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

dengziming opened a new pull request #11209:
URL: https://github.com/apache/kafka/pull/11209


   *More detailed description of your change*
   When handling a response, invalid cluster id are fatal unless a previous response contained a valid cluster id.
   Note that this is not a perfect, see https://github.com/apache/kafka/pull/10289#discussion_r595378358
   but this is the best as far as I can see because we can catch misconfiguration early.
   
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#issuecomment-898134690


   Hi @hachikuji @jsancio , PTAL.  I also moved the logic for `UNKNOWN_TOPIC_OR_PARTITION`  in handleFetchSnapshot to `RaftUtil.java`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#discussion_r800268813



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1535,6 +1535,46 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
         context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testInconsistentClusterIdInFetchSnapshotResponse() throws Exception {
+        int localId = 0;
+        int leaderId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, leaderId);
+        int epoch = 2;
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        // Send a request
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+
+        // Firstly receive a response with a valid cluster id
+        context.deliverResponse(
+            fetchRequest.correlationId,
+            fetchRequest.destinationId(),
+            snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L)
+        );
+
+        // Send fetch snapshot request
+        context.pollUntilRequest();
+        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
+
+        // Secondly receive a response with an inconsistent cluster id
+        context.deliverResponse(
+            snapshotRequest.correlationId,
+            snapshotRequest.destinationId(),
+            new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())
+        );
+
+        // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id
+        assertDoesNotThrow(context.client::poll);
+
+        // It's impossible to receive a be begin quorum response before any other request so we don't test

Review comment:
       I am trying to understand this comment. Can you please explain why this is true? And why do you think that this comment is important in this test?
   
   This comment applies to a few places.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
##########
@@ -159,4 +161,18 @@ static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicParti
                    data.topics().get(0).partitions().size() == 1 &&
                    data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
     }
+
+    static boolean hasValidTopicPartition(FetchSnapshotRequestData data, TopicPartition topicPartition) {

Review comment:
       How about changing this to return an `Errors`?
   1. `INVALID_REQUEST` if there is more than one topic partition
   2. `UNKNOWN_TOPIC_OR_PARTITION` if the topic partition doesn't match the log's name and partition
   3. `NONE` otherwise




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#discussion_r800646883



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
##########
@@ -159,4 +161,18 @@ static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicParti
                    data.topics().get(0).partitions().size() == 1 &&
                    data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
     }
+
+    static boolean hasValidTopicPartition(FetchSnapshotRequestData data, TopicPartition topicPartition) {

Review comment:
       I think this is a viable improvement, there are 11 similar methods here, so I changed them all and add a unit test for them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#discussion_r800655163



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1535,6 +1535,46 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
         context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testInconsistentClusterIdInFetchSnapshotResponse() throws Exception {
+        int localId = 0;
+        int leaderId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, leaderId);
+        int epoch = 2;
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        // Send a request
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+
+        // Firstly receive a response with a valid cluster id
+        context.deliverResponse(
+            fetchRequest.correlationId,
+            fetchRequest.destinationId(),
+            snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L)
+        );
+
+        // Send fetch snapshot request
+        context.pollUntilRequest();
+        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
+
+        // Secondly receive a response with an inconsistent cluster id
+        context.deliverResponse(
+            snapshotRequest.correlationId,
+            snapshotRequest.destinationId(),
+            new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())
+        );
+
+        // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id
+        assertDoesNotThrow(context.client::poll);
+
+        // It's impossible to receive a be begin quorum response before any other request so we don't test

Review comment:
       Sorry I made a wrong comment here, I tested 2 cases here: 
   
   1. Receive INCONSISTENT_CLUSTER_ID in the first response after starting, which is fatal.
   2. Receive INCONSISTENT_CLUSTER_ID in the second response after starting, which is not fatal.
   
   However, the first response after starting can't be FetchSnapshotResponse, so I added a comment here, so do BeginQuorumResponse and EndQuorumResponse.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org