You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/14 15:36:14 UTC
[camel] branch main updated: CAMEL-18608: Fix the tests of type KafkaConsumerIdempotentTestSupport
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new d1fe71af618 CAMEL-18608: Fix the tests of type KafkaConsumerIdempotentTestSupport
d1fe71af618 is described below
commit d1fe71af618395f4be4686be43c0f934d2fb2258
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Fri Oct 14 17:35:37 2022 +0200
CAMEL-18608: Fix the tests of type KafkaConsumerIdempotentTestSupport
---
.../integration/KafkaConsumerIdempotentIT.java | 4 ++--
.../KafkaConsumerIdempotentTestSupport.java | 24 +++++++---------------
...kaConsumerIdempotentWithCustomSerializerIT.java | 4 ++--
.../KafkaConsumerIdempotentWithProcessorIT.java | 4 ++--
4 files changed, 13 insertions(+), 23 deletions(-)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
index b27194a1511..682774dc765 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -33,7 +33,7 @@ import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import static org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHeader;
@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "false")
-public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSupport {
+class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSupport {
public static final String TOPIC = "idempt";
@@ -82,7 +82,7 @@ public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSuppor
@Test
@DisplayName("Numeric headers is consumable when using idempotent (CAMEL-16914)")
- public void kafkaIdempotentMessageIsConsumedByCamel() throws InterruptedException {
+ void kafkaIdempotentMessageIsConsumedByCamel() {
doRun(to, size);
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
index f42f4788c57..7e6dc62ffd2 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentTestSupport.java
@@ -18,15 +18,15 @@
package org.apache.camel.component.kafka.integration;
import java.math.BigInteger;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
-import org.apache.camel.Exchange;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -34,10 +34,8 @@ public abstract class KafkaConsumerIdempotentTestSupport extends BaseEmbeddedKaf
protected void doSend(int size, String topic) {
Properties props = getDefaultProperties();
- org.apache.kafka.clients.producer.KafkaProducer<String, String> producer
- = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
-
- try {
+ try (org.apache.kafka.clients.producer.KafkaProducer<String, String> producer
+ = new org.apache.kafka.clients.producer.KafkaProducer<>(props)) {
for (int k = 0; k < size; k++) {
String msg = "message-" + k;
ProducerRecord<String, String> data = new ProducerRecord<>(topic, String.valueOf(k), msg);
@@ -45,21 +43,13 @@ public abstract class KafkaConsumerIdempotentTestSupport extends BaseEmbeddedKaf
data.headers().add(new RecordHeader("id", BigInteger.valueOf(k).toByteArray()));
producer.send(data);
}
- } finally {
- if (producer != null) {
- producer.close();
- }
}
}
- protected void doRun(MockEndpoint mockEndpoint, int size) throws InterruptedException {
- mockEndpoint.expectedMessageCount(size);
-
- List<Exchange> exchangeList = mockEndpoint.getReceivedExchanges();
-
- mockEndpoint.assertIsSatisfied(10000);
+ protected void doRun(MockEndpoint mockEndpoint, int size) {
- assertEquals(size, exchangeList.size());
+ await().atMost(1, TimeUnit.MINUTES).untilAsserted(
+ () -> assertEquals(size, mockEndpoint.getReceivedExchanges().size()));
Map<String, Object> headers = mockEndpoint.getExchanges().get(0).getIn().getHeaders();
assertTrue(headers.containsKey("id"), "0");
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
index a1c864f7daa..f50f90c2ba7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumerIdempotentTestSupport {
+class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumerIdempotentTestSupport {
public static final String TOPIC = "idempt2";
@@ -77,7 +77,7 @@ public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumer
}
@Test
- public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+ void kafkaMessageIsConsumedByCamel() {
doRun(to, size);
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
index 55f0859fae5..2f270b99fab 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempotentTestSupport {
+class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempotentTestSupport {
public static final String TOPIC = "testidemp3";
@BindToRegistry("kafkaIdempotentRepository")
@@ -85,7 +85,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempot
}
@Test
- public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+ void kafkaMessageIsConsumedByCamel() {
doRun(to, size);
}
}