You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/04/28 19:28:03 UTC
[camel] branch main updated: (chores) camel-kafka: fix flaky idempotent test
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new cbe5c7ce615 (chores) camel-kafka: fix flaky idempotent test
cbe5c7ce615 is described below
commit cbe5c7ce6151fed406522c5f0d0391db18da70c0
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Apr 28 19:39:58 2022 +0200
(chores) camel-kafka: fix flaky idempotent test
---
.../kafka/KafkaIdempotentRepositoryPersistenceIT.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 6ee92db8546..1a495cb7f01 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor.idempotent.kafka;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.camel.EndpointInject;
@@ -37,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -91,7 +93,8 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
sendMessages(count);
// all records sent initially
- assertEquals(count, mockBefore.getReceivedCounter());
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(count, mockBefore.getReceivedCounter()));
// filters second attempt with same value
assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
@@ -110,7 +113,8 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
sendMessages(count);
// all records sent initially
- assertEquals(count, mockBefore.getReceivedCounter());
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(count, mockBefore.getReceivedCounter()));
// the state from the previous test guarantees that all attempts now are blocked
assertEquals(count, kafkaIdempotentRepository.getDuplicateCount());
@@ -131,7 +135,8 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
}
// all records sent initially
- assertEquals(count * passes, mockBefore.getReceivedCounter());
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(count * passes, mockBefore.getReceivedCounter()));
// the state from the previous test guarantees that all attempts now are blocked
assertEquals(count * passes, kafkaIdempotentRepository.getDuplicateCount());
@@ -158,7 +163,8 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
}
// all records sent initially
- assertEquals(count, mockBefore.getReceivedCounter());
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(count, mockBefore.getReceivedCounter()));
// there are no duplicate messages on this pass
assertEquals(0, kafkaIdempotentRepository.getDuplicateCount());