You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/04/28 19:28:03 UTC

[camel] branch main updated: (chores) camel-kafka: fix flaky idempotent test

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new cbe5c7ce615 (chores) camel-kafka: fix flaky idempotent test
cbe5c7ce615 is described below

commit cbe5c7ce6151fed406522c5f0d0391db18da70c0
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Apr 28 19:39:58 2022 +0200

    (chores) camel-kafka: fix flaky idempotent test
---
 .../kafka/KafkaIdempotentRepositoryPersistenceIT.java      | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 6ee92db8546..1a495cb7f01 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor.idempotent.kafka;
 
 import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import org.apache.camel.EndpointInject;
@@ -37,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -91,7 +93,8 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
         sendMessages(count);
 
         // all records sent initially
-        assertEquals(count, mockBefore.getReceivedCounter());
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(count, mockBefore.getReceivedCounter()));
 
         // filters second attempt with same value
         assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
@@ -110,7 +113,8 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
         sendMessages(count);
 
         // all records sent initially
-        assertEquals(count, mockBefore.getReceivedCounter());
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(count, mockBefore.getReceivedCounter()));
 
         // the state from the previous test guarantees that all attempts now are blocked
         assertEquals(count, kafkaIdempotentRepository.getDuplicateCount());
@@ -131,7 +135,8 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
         }
 
         // all records sent initially
-        assertEquals(count * passes, mockBefore.getReceivedCounter());
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(count * passes, mockBefore.getReceivedCounter()));
 
         // the state from the previous test guarantees that all attempts now are blocked
         assertEquals(count * passes, kafkaIdempotentRepository.getDuplicateCount());
@@ -158,7 +163,8 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
         }
 
         // all records sent initially
-        assertEquals(count, mockBefore.getReceivedCounter());
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(count, mockBefore.getReceivedCounter()));
 
         // there are no duplicate messages on this pass
         assertEquals(0, kafkaIdempotentRepository.getDuplicateCount());