You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/21 22:30:12 UTC

[GitHub] [kafka] jolshan opened a new pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

jolshan opened a new pull request #11104:
URL: https://github.com/apache/kafka/pull/11104


   The FetchSessionHandler had a small bug in the session build method where we did not consider building a session where no partitions were added and the session previously did not use topic IDs. (ie, it was relying on at least one partition being added to signify whether topic IDs were present)
   
   Due to this, we could send forgotten partitions with the zero UUID. This would always result in an exception and closed session.
   
   This PR fixes the logic to check that any forgotten partitions have topic IDs. There is also a test added for the empty session situation when topic IDs are used and when topic names are used.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905684062


   We can merge once we get clean builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11104:
URL: https://github.com/apache/kafka/pull/11104


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905509721


   @jolshan Some tests failed and it seems related to the changes done in this PR. Could you check them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905656665


   Looks like it was a really subtle bug. Consider the case where we have a partition in the session without a topic ID. We then add a new partition from that topic with an ID, but remove the partition that didn't have an ID.
   
   Here, we want to use IDs. When the receiving broker sees the topic IDs and currently has a session without them open, it will close that session so we can switch over to fully using topic IDs.
   
   Let me know if this makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-906181988


   Started a new build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905758403


   Hmmm. Not sure what to make of this exit code 1:
   ```
   * What went wrong:
   
   Execution failed for task ':storage:unitTest'.
   
    > Process 'Gradle Test Executor 133' finished with non-zero exit value 1
    ```
    
    The other two failures look unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#discussion_r694816464



##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -332,6 +329,9 @@ public FetchRequestData build() {
                     iter.remove();
                     // Indicate that we no longer want to listen to this partition.
                     removed.add(topicPartition);
+                    // If we do not have this topic ID in the session, we can not use topic IDs

Review comment:
       nit: `.` at the end of the sentence.

##########
File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
             // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
                     new FetchRequest.PartitionData(10, 110, 210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
             // Should have the same session ID and next epoch, but can no longer use topic IDs.
             // The receiving broker will close the session if we were previously using topic IDs.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType);
-            assertEquals(1, data2.metadata().epoch(), "Did not close session when " + testType);
+            assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @Test
+    public void testIdUsageWithAllForgottenPartitions() {
+        // We want to test when all topics are removed from the session
+        List<Boolean> useTopicIdsTests = Arrays.asList(true, false);

Review comment:
       It would be great if we could use `@ParameterizedTest` here/

##########
File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
             // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
                     new FetchRequest.PartitionData(10, 110, 210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
             // Should have the same session ID and next epoch, but can no longer use topic IDs.
             // The receiving broker will close the session if we were previously using topic IDs.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType);
-            assertEquals(1, data2.metadata().epoch(), "Did not close session when " + testType);
+            assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @Test
+    public void testIdUsageWithAllForgottenPartitions() {
+        // We want to test when all topics are removed from the session
+        List<Boolean> useTopicIdsTests = Arrays.asList(true, false);
+        useTopicIdsTests.forEach(useTopicIds -> {
+            Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+            Short respVer = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12;

Review comment:
       nit: `respVer` -> `responseVersion`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##########
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
             // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID.
             FetchSessionHandler.Builder builder2 = handler.newBuilder();
-            builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+            builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
                     new FetchRequest.PartitionData(10, 110, 210, Optional.empty()));
             FetchSessionHandler.FetchRequestData data2 = builder2.build();
             // Should have the same session ID and next epoch, but can no longer use topic IDs.
             // The receiving broker will close the session if we were previously using topic IDs.
             assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType);
-            assertEquals(1, data2.metadata().epoch(), "Did not close session when " + testType);
+            assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType);
             assertFalse(data2.canUseTopicIds());
         });
     }
 
+    @Test
+    public void testIdUsageWithAllForgottenPartitions() {
+        // We want to test when all topics are removed from the session
+        List<Boolean> useTopicIdsTests = Arrays.asList(true, false);
+        useTopicIdsTests.forEach(useTopicIds -> {
+            Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+            Short respVer = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12;
+            Map<String, Uuid> topicIds = Collections.singletonMap("foo", topicId);
+            FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
+            FetchSessionHandler.Builder builder = handler.newBuilder();

Review comment:
       nit: Could we add an empty line and a small comment before this `builder` similarly to what you did before `builder2`? It would make reading the test case a little easier.

##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -332,6 +329,9 @@ public FetchRequestData build() {
                     iter.remove();
                     // Indicate that we no longer want to listen to this partition.
                     removed.add(topicPartition);
+                    // If we do not have this topic ID in the session, we can not use topic IDs
+                    if (canUseTopicIds && sessionTopicIds.get(topicPartition.topic()) == null)

Review comment:
       Should we use `containsKey` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-907313567


   The build for `JDK 11 and Scala 2.13` hangs on `kafka.server.ServerShutdownTest.testCleanShutdownWithZkUnavailable`. It seems that the test is stuck. All the other tests have passed successfully. I am investigating it further but I think that we can safely merged this PR because it is not related at all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#discussion_r674387908



##########
File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##########
@@ -332,6 +329,9 @@ public FetchRequestData build() {
                     iter.remove();
                     // Indicate that we no longer want to listen to this partition.
                     removed.add(topicPartition);
+                    // If we do not have this topic ID in the session, we can not use topic IDs

Review comment:
       One alternative to this approach is to only do this check when no partitions were added to the builder, but I'm not sure this added complexity is worth it. The method here is also safer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#issuecomment-905656665


   Looks like it was a really subtle bug. Consider the case where we have a partition in the session without a topic ID. We then add a new partition from that topic with an ID, but remove the partition that didn't have an ID.
   
   Here, we want to use IDs. When the receiving broker sees the topic IDs and currently has a session without them open, it will close that session so we can switch over to fully using topic IDs. Otherwise, we may get stuck not using topic IDs a little longer than we want to.
   
   I've added a check to see if the topic ID can be found in the current builder. Let me know if this makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org