You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/24 12:57:22 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

dajac commented on a change in pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#discussion_r694816464



##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -332,6 +329,9 @@ public FetchRequestData build() {
                     iter.remove();
                     // Indicate that we no longer want to listen to this partition.
                     removed.add(topicPartition);
+                    // If we do not have this topic ID in the session, we can not use topic IDs

Review comment:
       nit: `.` at the end of the sentence.

##########
File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
             // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
                     new FetchRequest.PartitionData(10, 110, 210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
             // Should have the same session ID and next epoch, but can no longer use topic IDs.
             // The receiving broker will close the session if we were previously using topic IDs.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType);
-            assertEquals(1, data2.metadata().epoch(), "Did not close session when " + testType);
+            assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @Test
+    public void testIdUsageWithAllForgottenPartitions() {
+        // We want to test when all topics are removed from the session
+        List<Boolean> useTopicIdsTests = Arrays.asList(true, false);

Review comment:
       It would be great if we could use `@ParameterizedTest` here/

##########
File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
             // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
                     new FetchRequest.PartitionData(10, 110, 210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
             // Should have the same session ID and next epoch, but can no longer use topic IDs.
             // The receiving broker will close the session if we were previously using topic IDs.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType);
-            assertEquals(1, data2.metadata().epoch(), "Did not close session when " + testType);
+            assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @Test
+    public void testIdUsageWithAllForgottenPartitions() {
+        // We want to test when all topics are removed from the session
+        List<Boolean> useTopicIdsTests = Arrays.asList(true, false);
+        useTopicIdsTests.forEach(useTopicIds -> {
+            Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+            Short respVer = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12;

Review comment:
       nit: `respVer` -> `responseVersion`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
             // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
                     new FetchRequest.PartitionData(10, 110, 210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
             // Should have the same session ID and next epoch, but can no longer use topic IDs.
             // The receiving broker will close the session if we were previously using topic IDs.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType);
-            assertEquals(1, data2.metadata().epoch(), "Did not close session when " + testType);
+            assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @Test
+    public void testIdUsageWithAllForgottenPartitions() {
+        // We want to test when all topics are removed from the session
+        List<Boolean> useTopicIdsTests = Arrays.asList(true, false);
+        useTopicIdsTests.forEach(useTopicIds -> {
+            Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+            Short respVer = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12;
+            Map<String, Uuid> topicIds = Collections.singletonMap("foo", topicId);
+            FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+            FetchSessionHandler.Builder builder = handler.newBuilder();

Review comment:
       nit: Could we add an empty line and a small comment before this `builder` similarly to what you did before `builder2`? It would make reading the test case a little easier.

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -332,6 +329,9 @@ public FetchRequestData build() {
                     iter.remove();
                     // Indicate that we no longer want to listen to this partition.
                     removed.add(topicPartition);
+                    // If we do not have this topic ID in the session, we can not use topic IDs
+                    if (canUseTopicIds && sessionTopicIds.get(topicPartition.topic()) == null)

Review comment:
       Should we use `containsKey` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org