You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/06/14 16:39:50 UTC

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13838: MINOR: Reduce MM2 integration test flakiness due to missing dummy offset commits

gharris1727 commented on code in PR #13838:
URL: https://github.com/apache/kafka/pull/13838#discussion_r1229900453


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -1183,17 +1187,31 @@ private void createTopics() {
         }
     }
 
-    /*
-     * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
+    /**
+     * Commit offset 0 for all partitions of test-topic-1 for the specified consumer groups on primary and backup clusters.
+     * <p>This is done to force the MirrorCheckpointConnector to start at a task which checkpoints this group.
+     * Must be called before {@link #waitUntilMirrorMakerIsRunning} to prevent that method from timing out.
      */
-    protected void warmUpConsumer(Map<String, Object> consumerProps) {
-        try (Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
-            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            dummyConsumer.commitSync();
-        }
-        try (Consumer<byte[], byte[]> dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
-            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            dummyConsumer.commitSync();
+    protected void prepareConsumerGroup(Map<String, Object> consumerProps) {
+        prepareConsumerGroup(primary.kafka(), consumerProps, "test-topic-1");
+        prepareConsumerGroup(backup.kafka(), consumerProps, "test-topic-1");
+    }
+
+    private void prepareConsumerGroup(EmbeddedKafkaCluster cluster, Map<String, Object> consumerProps, String topic) {
+        try (Admin client = cluster.createAdminClient()) {
+            Map<String, TopicDescription> topics = client.describeTopics(Collections.singleton(topic))
+                    .allTopicNames()
+                    .get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS);
+            Map<TopicPartition, OffsetAndMetadata> collect = topics.get(topic)
+                    .partitions()
+                    .stream()
+                    .collect(Collectors.toMap(
+                            tpi -> new TopicPartition(topic, tpi.partition()),
+                            ignored -> new OffsetAndMetadata(0L)));
+            AlterConsumerGroupOffsetsResult alterResult = client.alterConsumerGroupOffsets((String) consumerProps.get("group.id"), collect);
+            alterResult.all().get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException | InterruptedException | TimeoutException e) {

Review Comment:
   I was just avoiding changing the throws signatures, but I see now that it's just 3 sites. I'll change this to avoid wrapping the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org