You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/05/21 12:42:52 UTC

[camel] 01/02: (chores) camel-kafka: fix async test not correctly reporting commit failures

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8c5c0d43774eb48eae5db272f9e4f868e76bfeb0
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat May 21 09:54:07 2022 +0200

    (chores) camel-kafka: fix async test not correctly reporting commit failures
---
 .../integration/KafkaConsumerAsyncManualCommitIT.java    | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index dc82ca2104c..222d326692e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -34,10 +34,14 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class);
 
     public static final String TOPIC = "testManualCommitTest";
 
@@ -57,6 +61,8 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
 
     private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
 
+    private volatile int failCount;
+
     @BeforeEach
     public void before() {
         Properties props = getDefaultProperties();
@@ -92,7 +98,13 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
                             KafkaManualCommit manual = e.getMessage().getBody(Exchange.class)
                                     .getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
                             assertNotNull(manual);
-                            manual.commit();
+
+                            try {
+                                manual.commit();
+                            } catch (Exception commitException) {
+                                LOG.error("Failed to commit: {}", commitException.getMessage(), commitException);
+                                failCount++;
+                            }
                         });
                 from(from).routeId("bar").autoStartup(false).to(toBar);
             }
@@ -139,6 +151,8 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
         to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7");
 
         to.assertIsSatisfied(3000);
+
+        assertEquals(0, failCount, "There should have been 0 commit failures");
     }
 
 }