You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/14 15:36:14 UTC

[camel] branch main updated: CAMEL-18608: Fix the tests of type KafkaConsumerIdempotentTestSupport

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new d1fe71af618 CAMEL-18608: Fix the tests of type KafkaConsumerIdempotentTestSupport
d1fe71af618 is described below

commit d1fe71af618395f4be4686be43c0f934d2fb2258
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Fri Oct 14 17:35:37 2022 +0200

    CAMEL-18608: Fix the tests of type KafkaConsumerIdempotentTestSupport
---
 .../integration/KafkaConsumerIdempotentIT.java     |  4 ++--
 .../KafkaConsumerIdempotentTestSupport.java        | 24 +++++++---------------
 ...kaConsumerIdempotentWithCustomSerializerIT.java |  4 ++--
 .../KafkaConsumerIdempotentWithProcessorIT.java    |  4 ++--
 4 files changed, 13 insertions(+), 23 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
index b27194a1511..682774dc765 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -33,7 +33,7 @@ import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import static org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader;
 
 @DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "false")
-public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSupport {
+class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSupport {
 
     public static final String TOPIC = "idempt";
 
@@ -82,7 +82,7 @@ public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSuppor
 
     @Test
     @DisplayName("Numeric headers is consumable when using idempotent (CAMEL-16914)")
-    public void kafkaIdempotentMessageIsConsumedByCamel() throws InterruptedException {
+    void kafkaIdempotentMessageIsConsumedByCamel() {
         doRun(to, size);
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
index f42f4788c57..7e6dc62ffd2 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
@@ -18,15 +18,15 @@
 package org.apache.camel.component.kafka.integration;
 
 import java.math.BigInteger;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.internals.RecordHeader;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -34,10 +34,8 @@ public abstract class KafkaConsumerIdempotentTestSupport extends BaseEmbeddedKaf
 
     protected void doSend(int size, String topic) {
         Properties props = getDefaultProperties();
-        org.apache.kafka.clients.producer.KafkaProducer<String, String> producer
-                = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
-
-        try {
+        try (org.apache.kafka.clients.producer.KafkaProducer<String, String> producer
+                = new org.apache.kafka.clients.producer.KafkaProducer<>(props)) {
             for (int k = 0; k < size; k++) {
                 String msg = "message-" + k;
                 ProducerRecord<String, String> data = new ProducerRecord<>(topic, String.valueOf(k), msg);
@@ -45,21 +43,13 @@ public abstract class KafkaConsumerIdempotentTestSupport extends BaseEmbeddedKaf
                 data.headers().add(new RecordHeader("id", BigInteger.valueOf(k).toByteArray()));
                 producer.send(data);
             }
-        } finally {
-            if (producer != null) {
-                producer.close();
-            }
         }
     }
 
-    protected void doRun(MockEndpoint mockEndpoint, int size) throws InterruptedException {
-        mockEndpoint.expectedMessageCount(size);
-
-        List<Exchange> exchangeList = mockEndpoint.getReceivedExchanges();
-
-        mockEndpoint.assertIsSatisfied(10000);
+    protected void doRun(MockEndpoint mockEndpoint, int size) {
 
-        assertEquals(size, exchangeList.size());
+        await().atMost(1, TimeUnit.MINUTES).untilAsserted(
+                () -> assertEquals(size, mockEndpoint.getReceivedExchanges().size()));
 
         Map<String, Object> headers = mockEndpoint.getExchanges().get(0).getIn().getHeaders();
         assertTrue(headers.containsKey("id"), "0");
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
index a1c864f7daa..f50f90c2ba7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumerIdempotentTestSupport {
+class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumerIdempotentTestSupport {
 
     public static final String TOPIC = "idempt2";
 
@@ -77,7 +77,7 @@ public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumer
     }
 
     @Test
-    public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+    void kafkaMessageIsConsumedByCamel() {
         doRun(to, size);
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
index 55f0859fae5..2f270b99fab 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempotentTestSupport {
+class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempotentTestSupport {
     public static final String TOPIC = "testidemp3";
 
     @BindToRegistry("kafkaIdempotentRepository")
@@ -85,7 +85,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempot
     }
 
     @Test
-    public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+    void kafkaMessageIsConsumedByCamel() {
         doRun(to, size);
     }
 }