You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/05/21 12:42:52 UTC
[camel] 01/02: (chores) camel-kafka: fix async test not correctly reporting commit failures
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8c5c0d43774eb48eae5db272f9e4f868e76bfeb0
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat May 21 09:54:07 2022 +0200
(chores) camel-kafka: fix async test not correctly reporting commit failures
---
.../integration/KafkaConsumerAsyncManualCommitIT.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index dc82ca2104c..222d326692e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -34,10 +34,14 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class);
public static final String TOPIC = "testManualCommitTest";
@@ -57,6 +61,8 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+ private volatile int failCount;
+
@BeforeEach
public void before() {
Properties props = getDefaultProperties();
@@ -92,7 +98,13 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
KafkaManualCommit manual = e.getMessage().getBody(Exchange.class)
.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
assertNotNull(manual);
- manual.commit();
+
+ try {
+ manual.commit();
+ } catch (Exception commitException) {
+ LOG.error("Failed to commit: {}", commitException.getMessage(), commitException);
+ failCount++;
+ }
});
from(from).routeId("bar").autoStartup(false).to(toBar);
}
@@ -139,6 +151,8 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7");
to.assertIsSatisfied(3000);
+
+ assertEquals(0, failCount, "There should have been 0 commit failures");
}
}