You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/16 10:01:22 UTC

[GitHub] [kafka] showuon opened a new pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

showuon opened a new pull request #9029:
URL: https://github.com/apache/kafka/pull/9029


   In the original test, we will sleep for static 5 seconds to ensure the automated group offset sync is complete. It sometimes synced fast (less than 1 sec), and sometimes slow (~ 20 seconds). I rewrite the sleep to wait for specific condition:  
   1. `consumer.endOffsets` to make sure the topic partition metadata is synced
   2. `backupClient.listConsumerGroupOffsets` to make sure the consumerGroupOffset is also synced
   
   I've tested in my local environment a lot of times. It can make the test more stable.
   
   Thanks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-661675952


   Thanks @ning2008wisc ! @mimaison , do you have any comments for this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663329158


   @mimaison , thanks for your comments. I've updated in this commit: https://github.com/apache/kafka/pull/9029/commits/5345c6835ef42da973b794634d9b8d65f27ee80a. What I did are:
   1. remove unused `time` variable and import
   2. Use simple `for` loop instead of lambda loop
   3. We'll wait for the consumer consume all 100 records before continuing
   
   For other improvements, @ning2008wisc will address in KAFKA-10304. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-666526465


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-659310965


   @ning2008wisc @mimaison , could you help review this PR to fix the flaky test? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-666395292


   @mimaison , thanks for your good suggestion! I've updated in this commit: https://github.com/apache/kafka/pull/9029/commits/8bc4a543dda6ddd90d752f7e6a64c63d85a1de3f. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r462989546



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+            consumer.commitSync();
+            return NUM_RECORDS_PRODUCED == totalConsumedRecords.addAndGet(records.count());
+        }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the records in time");
+    }
+
     @Test
     public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
 
         // create consumers before starting the connectors so we don't need to wait for discovery
-        Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "test-topic-1");
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
-        consumer1.close();
+        try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-1")) {
+            // we need to wait for consuming all the records for MM2 replicaing the expected offsets

Review comment:
       `replicaing` -> `replicating`

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));

Review comment:
       Can we add the types `<byte[], byte[]>` to `ConsumerRecords`?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -387,11 +393,11 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
         }
 
         // create a consumer at primary cluster to consume the new topic
-        consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", "consumer-group-1"), "test-topic-2");
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
-        consumer1.close();
+        try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-2")) {
+            // we need to wait for consuming all the records for MM2 replicaing the expected offsets

Review comment:
       `replicaing` -> `replicating`

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+            consumer.commitSync();

Review comment:
       We can move that line after the `waitForCondition()` block to just commit once all records have been consumed.

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+            consumer.commitSync();
+            return NUM_RECORDS_PRODUCED == totalConsumedRecords.addAndGet(records.count());
+        }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all the records in time");

Review comment:
       nit: The sentence sounds slightly better if you remove `the`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647


   hi @ning2008wisc , thanks for your suggestion. but I don't think it's good to merge all the 4 commits into 1. I think the commit history is a kind of records to keep why the author did this change at that time. Someone can always know why we did this change by checking the commit history or the PR records. Also, the reviewer can know which commits have reviewed, and which one is new added to address reviewer's previous comments (like the above comments I left to @mimaison ).
   
   I know there's some debate over that, but I think if this project(Kafka) doesn't have this rule, it should be fine to keep it as is. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon removed a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon removed a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-659865858


   Thanks for your comments and suggestion! I've updated in this commit: https://github.com/apache/kafka/pull/9029/commits/88400bf350b60684db33828d690cfa238d056781. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ning2008wisc commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
ning2008wisc commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r455902217



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, List<String> topics)
+            throws InterruptedException {
+        Admin backupClient = backup.kafka().createAdminClient();
+        List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+        IntStream.range(0, NUM_PARTITIONS).forEach(
+            partitionInd -> {

Review comment:
       could we have a more intuitive variable name for `partitionInd`? e.g. `partitionId` or `partitionIndex`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-660131258


   hi @ning2008wisc , thanks for your comments and suggestion! I've updated in this commit: https://github.com/apache/kafka/pull/9029/commits/88400bf350b60684db33828d690cfa238d056781. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r459822704



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, List<String> topics, String consumerGroupId)
+            throws InterruptedException {
+        Admin backupClient = backup.kafka().createAdminClient();
+        List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+        IntStream.range(0, NUM_PARTITIONS).forEach(

Review comment:
       I don't have any preference. I'll change to simple `for` loop. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r456227484



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, List<String> topics)
+            throws InterruptedException {
+        Admin backupClient = backup.kafka().createAdminClient();
+        List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+        IntStream.range(0, NUM_PARTITIONS).forEach(
+            partitionInd -> {

Review comment:
       I'll rename to `partitionIndex`. Thanks.

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, List<String> topics)
+            throws InterruptedException {
+        Admin backupClient = backup.kafka().createAdminClient();
+        List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+        IntStream.range(0, NUM_PARTITIONS).forEach(
+            partitionInd -> {
+                for (String topic: topics) {
+                    tps.add(new TopicPartition(topic, partitionInd));
+                }
+            }
+        );
+        long expectedTotalOffsets = NUM_RECORDS_PRODUCED * topics.size();
+
+        waitForCondition(() -> {
+            Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
+                backupClient.listConsumerGroupOffsets("consumer-group-1").partitionsToOffsetAndMetadata().get();

Review comment:
       Good suggestion. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-666512651


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #9029:
URL: https://github.com/apache/kafka/pull/9029


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647


   hi @ning2008wisc , thanks for your suggestion. but I don't think it's good to merge all the 4 commits into 1. I think the commit history is a kind of records to keep why the author did this change at that time. Someone can always know why we did this change by checking the commit history or the PR records. Also, the reviewer can know which commits have reviewed, and which one is new added (like the above comments I left to @mimaison ).
   
   I know there's some debate over that, but I think if this project(Kafka) doesn't have this rule, I'd prefer to keep it as is. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r459818087



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -62,6 +65,7 @@
     private static final int NUM_PARTITIONS = 10;
     private static final int RECORD_TRANSFER_DURATION_MS = 20_000;
     private static final int CHECKPOINT_DURATION_MS = 20_000;
+    private static final int OFFSET_SYNC_DURATION_MS = 30_000;
 
     private Time time = Time.SYSTEM;

Review comment:
       Nice catch! I also removed the unused import `import org.apache.kafka.common.utils.Time;` Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r455672187



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, List<String> topics)
+            throws InterruptedException {
+        Admin backupClient = backup.kafka().createAdminClient();
+        List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+        IntStream.range(0, NUM_PARTITIONS).forEach(
+            partitionInd -> {
+                for (String topic: topics) {
+                    tps.add(new TopicPartition(topic, partitionInd));
+                }
+            }
+        );
+        long expectedTotalOffsets = NUM_RECORDS_PRODUCED * topics.size();
+
+        waitForCondition(() -> {
+            Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
+                backupClient.listConsumerGroupOffsets("consumer-group-1").partitionsToOffsetAndMetadata().get();
+            long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream().mapToLong(metadata -> metadata.offset()).sum();
+
+            Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, Duration.ofMillis(500));
+            long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
+
+            // make sure the consumer group offsets are synced to expected number
+            return totalOffsets == expectedTotalOffsets && consumerGroupOffsetTotal > 0;
+        }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
+    }
 
     @Test
-    public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedException {
+    public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {

Review comment:
       rename the typo test name 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ning2008wisc commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
ning2008wisc commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r455903116



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, List<String> topics)
+            throws InterruptedException {
+        Admin backupClient = backup.kafka().createAdminClient();
+        List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+        IntStream.range(0, NUM_PARTITIONS).forEach(
+            partitionInd -> {
+                for (String topic: topics) {
+                    tps.add(new TopicPartition(topic, partitionInd));
+                }
+            }
+        );
+        long expectedTotalOffsets = NUM_RECORDS_PRODUCED * topics.size();
+
+        waitForCondition(() -> {
+            Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
+                backupClient.listConsumerGroupOffsets("consumer-group-1").partitionsToOffsetAndMetadata().get();

Review comment:
       could we consider to pass in the consumer group name as a input variable of `waitForConsumerGroupOffsetSync`, so that `waitForConsumerGroupOffsetSync` looks more generic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r463028636



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
+            consumer.commitSync();

Review comment:
       Good suggestion! Thanks.

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -345,15 +342,24 @@ private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, L
         }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
     }
 
+    private void waitForConsumingAllRecords(Consumer<byte[], byte[]> consumer) throws InterruptedException {
+        final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+        waitForCondition(() -> {
+            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));

Review comment:
       Done. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663094946


   While not part of your changes, I noticed the tests assumptions are pretty loose. For example, we assume https://github.com/apache/kafka/pull/9029/files#diff-a03d58195cfe119d0b1ed2693cd0d691L362 always consume all the 100 messages. The test also assumes there are no duplicates. While this may be fine when running in memory, Connect semantics are at least once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r459460614



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -62,6 +65,7 @@
     private static final int NUM_PARTITIONS = 10;
     private static final int RECORD_TRANSFER_DURATION_MS = 20_000;
     private static final int CHECKPOINT_DURATION_MS = 20_000;
+    private static final int OFFSET_SYNC_DURATION_MS = 30_000;
 
     private Time time = Time.SYSTEM;

Review comment:
       We can remove this field now that it's unused

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private void waitForConsumerGroupOffsetSync(Consumer<byte[], byte[]> consumer, List<String> topics, String consumerGroupId)
+            throws InterruptedException {
+        Admin backupClient = backup.kafka().createAdminClient();
+        List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+        IntStream.range(0, NUM_PARTITIONS).forEach(

Review comment:
       I'm not sure this is much better than a simple `for` loop. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
ning2008wisc commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-661219342


   Probably @mimaison could take the final review and approve it. Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-659865858


   Thanks for your comments and suggestion! I've updated in this commit: https://github.com/apache/kafka/pull/9029/commits/88400bf350b60684db33828d690cfa238d056781. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
ning2008wisc commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-664045085


   Hi @showuon thanks for your work, a minor thing - do you mind to consolidate / merge the current 4 commits into 1 commit? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
ning2008wisc commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663147296


   totally agree that it may be better to revisit the tests in MM2 and I created a ticket https://issues.apache.org/jira/browse/KAFKA-10304 and assign it to myself. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
ning2008wisc commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-659516303


   He @showuon thanks for the fix, it looks a good start. Another minor and non-blocking comment may be: if it is a small fix, probably 1 commit in the PR looks more neat.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ning2008wisc edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
ning2008wisc edited a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-659516303


   Hi @showuon thanks for the fix, it looks a good start. Another minor and non-blocking comment may be: if it is a small fix, probably 1 commit in the PR looks more neat.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647


   hi @ning2008wisc , thanks for your suggestion. but I don't think it's good to merge all the 4 commits into 1. I think the commit history is a kind of records to keep why the author did this change at that time. Someone can always know why we did this change by checking the commit history or the PR records. Also, the reviewer can know which commits have reviewed, and which one is new added (like the above comments I left to @mimaison ).
   
   I know there's some debate over that, but I think if this project(Kafka) doesn't have this rule, it should be fine to keep it as is. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663329158


   @mimaison , thanks for your comments. I've updated in this commit: https://github.com/apache/kafka/pull/9029/commits/c2e2d70c1a2317794ce49292024a977a02159445. What I did are:
   1. remove unused `time` variable and import
   2. Use simple `for` loop instead of lambda loop
   3. We'll wait for the consumer consume all 100 records before continuing
   
   For other improvements, @ning2008wisc will address in KAFKA-10304. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org