You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/05/21 12:42:51 UTC

[camel] branch main updated (c2f6d79bc65 -> 398888d47cd)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from c2f6d79bc65 Upgrade to roaster 2.24.0
     new 8c5c0d43774 (chores) camel-kafka: fix async test not correctly reporting commit failures
     new 398888d47cd CAMEL-18135: fix incorrect sync commit of async manual commit

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/consumer/DefaultKafkaManualAsyncCommit.java  |  2 +-
 .../integration/KafkaConsumerAsyncManualCommitIT.java  | 18 ++++++++++++++++--
 2 files changed, 17 insertions(+), 3 deletions(-)


[camel] 02/02: CAMEL-18135: fix incorrect sync commit of async manual commit

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 398888d47cd0db2179c7e55991d2181671650cf7
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat May 21 09:58:10 2022 +0200

    CAMEL-18135: fix incorrect sync commit of async manual commit
---
 .../camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java | 2 +-
 .../component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
index fbc4d0610a6..e38761c4ef7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/DefaultKafkaManualAsyncCommit.java
@@ -29,6 +29,6 @@ public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit impl
 
     @Override
     public void commit() {
-        commitManager.forceCommit(getPartition(), getRecordOffset());
+        commitManager.recordOffset(getPartition(), getRecordOffset());
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index 222d326692e..f1e007482d5 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -41,10 +41,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class);
-
     public static final String TOPIC = "testManualCommitTest";
 
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class);
+
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
                     + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory")


[camel] 01/02: (chores) camel-kafka: fix async test not correctly reporting commit failures

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8c5c0d43774eb48eae5db272f9e4f868e76bfeb0
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat May 21 09:54:07 2022 +0200

    (chores) camel-kafka: fix async test not correctly reporting commit failures
---
 .../integration/KafkaConsumerAsyncManualCommitIT.java    | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index dc82ca2104c..222d326692e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -34,10 +34,14 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class);
 
     public static final String TOPIC = "testManualCommitTest";
 
@@ -57,6 +61,8 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
 
     private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
 
+    private volatile int failCount;
+
     @BeforeEach
     public void before() {
         Properties props = getDefaultProperties();
@@ -92,7 +98,13 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
                             KafkaManualCommit manual = e.getMessage().getBody(Exchange.class)
                                     .getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
                             assertNotNull(manual);
-                            manual.commit();
+
+                            try {
+                                manual.commit();
+                            } catch (Exception commitException) {
+                                LOG.error("Failed to commit: {}", commitException.getMessage(), commitException);
+                                failCount++;
+                            }
                         });
                 from(from).routeId("bar").autoStartup(false).to(toBar);
             }
@@ -139,6 +151,8 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
         to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7");
 
         to.assertIsSatisfied(3000);
+
+        assertEquals(0, failCount, "There should have been 0 commit failures");
     }
 
 }