You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/18 08:55:04 UTC

[GitHub] [kafka] showuon opened a new pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

showuon opened a new pull request #8894:
URL: https://github.com/apache/kafka/pull/8894


   The test MirrorConnectorsIntegrationTest.testReplication failed too often recently. It failed on the build at least 6 times (I didn't check all failed builds) in today's(6/18) trunk build, and also failed my PR testing! I think it should be fixed soon to save developer's time. 
   
   The test is to test MM2 replication. And recently, it failed all because the Records were not replicated to primary/backup cluster yet, so that the consumer cannot retrieve the records in time.  In this PR, I add retries to these consumer.poll, to have 3 retries to poll the records, and keep the original. It should make the tests more stable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-646406717


   @ryannedolan @skaundinya15 , thanks for your good suggestion. Yes, what I did to add retries is the same as increasing the timeout value. 
   
   And I found the `waitForCondition` is not good in this case, because the `consume` method in `EmbeddedKafkaCluster.java` will not return the consumeRecords to let us increment the partial consumeRecords if it can't consume the expected records size in time. Instead, it'll throw exception directly. In other words, `comsume` method is already doing a `waitForCondition` job. 
   
   So, what we need to do is just to **increase the timeout value** to give the MM2 replication more time. It should make this test more stable. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] skaundinya15 commented on pull request #8894: KAFKA-9509: increase consume timeout to fix flaky test

Posted by GitBox <gi...@apache.org>.
skaundinya15 commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-648280522


   @showuon no more comments 
   
   > Hi @ryannedolan @skaundinya15 , do you have any other comments? I think we should merge this PR soon since the tests fails today's test again. Thanks.
   
   @showuon No more comments from my side, looks good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine merged pull request #8894: KAFKA-9509: Increase timeout when consuming records to fix flaky test in MM2

Posted by GitBox <gi...@apache.org>.
kkonstantine merged pull request #8894:
URL: https://github.com/apache/kafka/pull/8894


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-645886831


   @ryannedolan @skaundinya15 @kkonstantine , could you review this PR to fix flaky test? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: increase consume timeout to fix flaky test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-650205555


   Hi @kkonstantine , The PR already got 3 approves and the flaky tests affects many PR's tests right now (got mentioned in 3 PRs). I think we can merge it if you don't have other comments. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on a change in pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#discussion_r442314691



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -207,23 +212,45 @@ public void close() {
         backup.stop();
     }
 
+    // throw exception after 3 retries, and print expected error messages
+    private void assertEqualsWithConsumeRetries(final String errorMsg,
+                                                final int numRecordsProduces,
+                                                final int timeout,
+                                                final ClusterType clusterType,
+                                                final String... topics) throws InterruptedException {
+        int retries = 3;
+        while (retries-- > 0) {
+            try {
+                int actualNum = clusterType == ClusterType.PRIMARY ?
+                        primary.kafka().consume(numRecordsProduces, timeout, topics).count() :

Review comment:
       these are really strange side-effects to have in an assert statement. I see what you are trying to do, but this is probably not the way to do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-646406717


   @ryannedolan @skaundinya15 , thanks for your good suggestion. Yes, what I did to add retries is the same as increasing the timeout value. And after some investigation, I found the `waitForCondition` is not good in this case because the `consume` method in `EmbeddedKafkaCluster.java` will not return the consumeRecords if the consumeRecords doesn't >= expected records, it'll throw exception directly. In other words, `comsume` method is already doing a `waitForCondition` job. 
   
   So, what we need to do is just to **increase the timeout value** to give the MM2 replication more time. It should make this test more stable. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8894: KAFKA-9509: increase consume timeout to fix flaky test

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-647914019


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ryannedolan commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

Posted by GitBox <gi...@apache.org>.
ryannedolan commented on a change in pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#discussion_r442313614



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -207,23 +212,45 @@ public void close() {
         backup.stop();
     }
 
+    // throw exception after 3 retries, and print expected error messages
+    private void assertEqualsWithConsumeRetries(final String errorMsg,

Review comment:
       fwiw this doesn't adhere to kafka style guide (looks like Kafka Streams to me)

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -207,23 +212,45 @@ public void close() {
         backup.stop();
     }
 
+    // throw exception after 3 retries, and print expected error messages
+    private void assertEqualsWithConsumeRetries(final String errorMsg,
+                                                final int numRecordsProduces,
+                                                final int timeout,
+                                                final ClusterType clusterType,
+                                                final String... topics) throws InterruptedException {
+        int retries = 3;
+        while (retries-- > 0) {
+            try {
+                int actualNum = clusterType == ClusterType.PRIMARY ?
+                        primary.kafka().consume(numRecordsProduces, timeout, topics).count() :

Review comment:
       these are really strange side-effects to have an an assert statement. I see what you are trying to do, but this is probably not the way to do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: increase consume timeout to fix flaky test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-647888991


   Hi @ryannedolan  @skaundinya15 , do you have any other comments? I think we should merge this PR soon since the tests fails today's test again. Thanks.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] skaundinya15 edited a comment on pull request #8894: KAFKA-9509: increase consume timeout to fix flaky test

Posted by GitBox <gi...@apache.org>.
skaundinya15 edited a comment on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-648280522


   > Hi @ryannedolan @skaundinya15 , do you have any other comments? I think we should merge this PR soon since the tests fails today's test again. Thanks.
   
   @showuon No more comments from my side, looks good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-647219267


   Thanks, @chia7712 !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] skaundinya15 commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

Posted by GitBox <gi...@apache.org>.
skaundinya15 commented on a change in pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#discussion_r442445728



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -207,23 +212,45 @@ public void close() {
         backup.stop();
     }
 
+    // throw exception after 3 retries, and print expected error messages
+    private void assertEqualsWithConsumeRetries(final String errorMsg,
+                                                final int numRecordsProduces,
+                                                final int timeout,
+                                                final ClusterType clusterType,
+                                                final String... topics) throws InterruptedException {
+        int retries = 3;
+        while (retries-- > 0) {
+            try {
+                int actualNum = clusterType == ClusterType.PRIMARY ?
+                        primary.kafka().consume(numRecordsProduces, timeout, topics).count() :
+                        backup.kafka().consume(numRecordsProduces, timeout, topics).count();
+                if (numRecordsProduces == actualNum)
+                    return;
+            } catch (Throwable e) {
+                log.error("Could not find enough records with {} retries left", retries, e);
+            }
+        }
+        throw new InterruptedException(errorMsg);
+    }
+
     @Test
     public void testReplication() throws InterruptedException {
         MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary"));
         MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup"));
 
-        assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED,

Review comment:
       I'd agree with @ryannedolan here. We could use the `waitForCondition` in `TestUtils.java` instead to wait for the condition necessary instead. More details on that is here: https://github.com/apache/kafka/blob/d8cc6fe8e36329c647736773d9d66de89c447409/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L370-L371




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org