You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/05/05 07:29:34 UTC

[GitHub] [kafka] Hangleton commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to the OffsetCommit API version 9

Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1185779424


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_MEMBER_ID);
         assertThrows(CommitFailedException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If there is no available
+        // response to consume, its internal poll loop never completes. Hence, the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there are enough responses
+        // queued in the MockClient to satisfy all invocations of the ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, time.timer(timeoutMs)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testRetryCommitUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, Errors.NONE)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(offsets, time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) -> {
+                // Unlike the commit offset sync API, the async API does not retry.

Review Comment:
   That is right. Christo, maybe you can create two separate tests for these cases and factor in common code in a method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org