You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/07 01:45:54 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

jsancio commented on a change in pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#discussion_r800268813



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1535,6 +1535,46 @@ public void testFetchSnapshotRequestClusterIdValidation() throws Exception {
         context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testInconsistentClusterIdInFetchSnapshotResponse() throws Exception {
+        int localId = 0;
+        int leaderId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, leaderId);
+        int epoch = 2;
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        // Send a request
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+
+        // Firstly receive a response with a valid cluster id
+        context.deliverResponse(
+            fetchRequest.correlationId,
+            fetchRequest.destinationId(),
+            snapshotFetchResponse(context.metadataPartition, context.metadataTopicId, epoch, leaderId, snapshotId, 200L)
+        );
+
+        // Send fetch snapshot request
+        context.pollUntilRequest();
+        RaftRequest.Outbound snapshotRequest = context.assertSentFetchSnapshotRequest();
+
+        // Secondly receive a response with an inconsistent cluster id
+        context.deliverResponse(
+            snapshotRequest.correlationId,
+            snapshotRequest.destinationId(),
+            new FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())
+        );
+
+        // Inconsistent cluster id are not fatal if a previous response contained a valid cluster id
+        assertDoesNotThrow(context.client::poll);
+
+        // It's impossible to receive a be begin quorum response before any other request so we don't test

Review comment:
       I am trying to understand this comment. Can you please explain why this is true? And why do you think that this comment is important in this test?
   
   This comment applies to a few places.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
##########
@@ -159,4 +161,18 @@ static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicParti
                    data.topics().get(0).partitions().size() == 1 &&
                    data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
     }
+
+    static boolean hasValidTopicPartition(FetchSnapshotRequestData data, TopicPartition topicPartition) {

Review comment:
       How about changing this to return an `Errors`?
   1. `INVALID_REQUEST` if there is more than one topic partition
   2. `UNKNOWN_TOPIC_OR_PARTITION` if the topic partition doesn't match the log's name and partition
   3. `NONE` otherwise




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org