You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/19 16:35:30 UTC

(camel) branch main updated (0b36287df94 -> 372ec8c5372)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 0b36287df94 CAMEL-20297 camel-facebook: update documentation
     new 9771c705a89 CAMEL-16044: fixed integration tests
     new d8978186f54 CAMEL-16044: added documentation
     new 372ec8c5372 CAMEL-16044: do not enable auto-commit when using batching mode

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel-kafka/src/main/docs/kafka-component.adoc | 40 +++++++++++++
 .../camel/component/kafka/KafkaConfiguration.java  |  6 +-
 .../batching/BatchingProcessingITSupport.java      | 62 +++++++++++++++++++-
 .../batching/KafkaBatchingProcessingIT.java        | 67 +++++++++++-----------
 .../ROOT/pages/camel-4x-upgrade-guide-4_4.adoc     |  4 ++
 5 files changed, 140 insertions(+), 39 deletions(-)


(camel) 01/03: CAMEL-16044: fixed integration tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9771c705a89ff3e016fe51183310dd66fb863453
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jan 19 15:55:29 2024 +0100

    CAMEL-16044: fixed integration tests
---
 .../batching/BatchingProcessingITSupport.java      | 62 +++++++++++++++++++-
 .../batching/KafkaBatchingProcessingIT.java        | 67 +++++++++++-----------
 2 files changed, 91 insertions(+), 38 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
index ccadf3bc114..363efb0f1ea 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/BatchingProcessingITSupport.java
@@ -18,15 +18,26 @@
 package org.apache.camel.component.kafka.integration.batching;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.integration.BaseEmbeddedKafkaTestSupport;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 abstract class BatchingProcessingITSupport extends BaseEmbeddedKafkaTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(BatchingProcessingITSupport.class);
 
     @EndpointInject("mock:result")
     protected MockEndpoint to;
@@ -53,17 +64,25 @@ abstract class BatchingProcessingITSupport extends BaseEmbeddedKafkaTestSupport
     public void kafkaManualCommitTest(String topic) throws Exception {
         setupPreExecutionExpectations();
 
+        LOG.debug("Starting the first step");
         sendRecords(0, 5, topic);
 
         to.assertIsSatisfied(3000);
+        to.expectedMessageCount(1);
+
+        final List<Exchange> firstExchangeBatch = to.getExchanges();
+
+        validateReceivedExchanges(5, firstExchangeBatch);
 
         to.reset();
 
+        LOG.debug("Starting the second step");
         // Second step: We shut down our route, we expect nothing will be recovered by our route
-        contextExtension.getContext().getRouteController().stopRoute("foo");
-        to.expectedMessageCount(0);
+        contextExtension.getContext().getRouteController().stopRoute("batching");
 
         // Third step: While our route is stopped, we send 3 records more to a Kafka test topic
+        LOG.debug("Starting the third step");
+        to.expectedMessageCount(1);
         sendRecords(5, 8, topic);
 
         to.assertIsSatisfied(3000);
@@ -72,10 +91,47 @@ abstract class BatchingProcessingITSupport extends BaseEmbeddedKafkaTestSupport
 
         // Fourth step: We start again our route, since we have been committing the offsets from the first step,
         // we will expect to consume from the latest committed offset (e.g.: from offset 5()
-        contextExtension.getContext().getRouteController().startRoute("foo");
+        contextExtension.getContext().getRouteController().startRoute("batching");
         setupPostExecutionExpectations();
 
         to.assertIsSatisfied(3000);
+
+        final List<Exchange> secondExchangeBatch = to.getExchanges();
+        validateReceivedExchanges(3, secondExchangeBatch);
+    }
+
+    private static void validateReceivedExchanges(int expectedCount, List<Exchange> exchanges) {
+        assertNotNull(exchanges, "The exchange should not be null");
+
+        final Exchange parentExchange = exchanges.get(0);
+        final Message message = parentExchange.getMessage();
+
+        assertNotNull(message, "The message body should not be null");
+
+        final Object body = message.getBody();
+        final List<?> list = assertInstanceOf(List.class, body, "The body should be a list");
+
+        //        assertEquals(expectedCount, list.size(), "The should be 5 messages on the list");
+
+        for (var object : list) {
+            final Exchange exchange = assertInstanceOf(Exchange.class, object, "The list content should be an exchange");
+
+            final Message messageInList = exchange.getMessage();
+            LOG.info("Received message {}", messageInList);
+
+            final Object bodyInMessage = messageInList.getBody();
+            assertNotNull(bodyInMessage, "The body in message should not be null");
+
+            final String messageBodyStr = assertInstanceOf(String.class, bodyInMessage, "The body should be a string");
+            LOG.info("Received message body {}", messageBodyStr);
+
+            assertTrue(messageBodyStr.contains("message-"), "The message body should start with message-");
+            assertTrue(messageInList.hasHeaders(), "The message in list should have headers");
+            assertNotNull(messageInList.getHeader(KafkaConstants.PARTITION, Integer.class),
+                    "The message in list should have the partition information");
+            assertNotNull(messageInList.getHeader(KafkaConstants.TOPIC, String.class),
+                    "The message in list should have the correct topic information");
+        }
     }
 
     protected void setupPostExecutionExpectations() {
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
index 24bf18ea395..17a1b15fa18 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingProcessingIT.java
@@ -19,22 +19,22 @@ package org.apache.camel.component.kafka.integration.batching;
 import java.util.List;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchingProcessingIT.class);
 
     public static final String TOPIC = "testManualCommitSyncTest";
+    private volatile boolean invalidExchange = false;
+    private volatile boolean invalidExchangeFormat = false;
 
     @AfterEach
     public void after() {
@@ -43,6 +43,7 @@ public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
 
     @Override
     protected RouteBuilder createRouteBuilder() {
+        // allowManualCommit=true&autoOffsetReset=earliest
         String from = "kafka:" + TOPIC
                       + "?groupId=KafkaBatchingProcessingIT&pollTimeoutMs=1000&batching=true"
                       + "&maxPollRecords=10&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory";
@@ -51,38 +52,32 @@ public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
 
             @Override
             public void configure() {
-                from(from).routeId("foo").to(KafkaTestUtil.MOCK_RESULT).process(e -> {
-                    KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
-                    final Message message = e.getMessage();
-
-                    assertNotNull(message, "The message body should not be null");
-
-                    final Object body = message.getBody();
-                    final List<?> list = assertInstanceOf(List.class, body, "The body should be a list");
-
-                    assertEquals(1, list.size(), "The should be just one message on the list");
-
-                    for (var object : list) {
-                        final Exchange exchange =
-                                assertInstanceOf(Exchange.class, object, "The list content should be an exchange");
-
-                        final Message messageInList = exchange.getMessage();
-
-                        final Object bodyInMessage = messageInList.getBody();
-                        assertNotNull(bodyInMessage, "The body in message should not be null");
-                        final String s = assertInstanceOf(String.class, bodyInMessage, "The body should be a string");
-                        assertTrue(s.contains("message-"), "The message body should start with message-");
-                        assertTrue(messageInList.hasHeaders(), "The message in list should have headers");
-                        assertNotNull(messageInList.getHeader(KafkaConstants.PARTITION, Integer.class),
-                                "The message in list should have the partition information");
-                        assertEquals(TOPIC, messageInList.getHeader(KafkaConstants.PARTITION, String.class),
-                                "The message in list should have the correct topic information");
+                from(from).routeId("batching").process(e -> {
+                    // The received records are stored as exchanges in a list. This gets the list of those exchanges
+                    final List<?> exchanges = e.getMessage().getBody(List.class);
+
+                    // Ensure we are actually receiving what we are asking for
+                    if (exchanges == null || exchanges.isEmpty()) {
+                        invalidExchange = true;
+                        return;
                     }
 
+                    /*
+                    Every exchange in that list should contain a reference to the manual commit object. We use the reference
+                    for the last exchange in the list to commit the whole batch
+                     */
+                    final Object tmp = exchanges.get(exchanges.size() - 1);
+                    if (tmp instanceof Exchange exchange) {
+                        KafkaManualCommit manual
+                                = exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+                        LOG.debug("Performing manual commit");
+                        manual.commit();
+                        LOG.debug("Done performing manual commit");
+                    } else {
+                        invalidExchangeFormat = true;
+                    }
 
-                    manual.commit();
-                });
-                from(from).routeId("bar").autoStartup(false).to(KafkaTestUtil.MOCK_RESULT_BAR);
+                }).to(KafkaTestUtil.MOCK_RESULT);
             }
         };
     }
@@ -90,6 +85,8 @@ public class KafkaBatchingProcessingIT extends BatchingProcessingITSupport {
     @Test
     public void kafkaManualCommit() throws Exception {
         kafkaManualCommitTest(TOPIC);
+
+        Assertions.assertFalse(invalidExchangeFormat, "The exchange list should be composed of exchanges");
     }
 
 }


(camel) 02/03: CAMEL-16044: added documentation

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d8978186f54dbd380a28690ec7df7ced5c5c98c4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jan 19 16:08:48 2024 +0100

    CAMEL-16044: added documentation
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 40 ++++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_4.adoc     |  4 +++
 2 files changed, 44 insertions(+)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 2f44d00f73a..d8d8879bb4a 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -493,4 +493,44 @@ static {
     KafkaComponent.setKerberosConfigLocation("path/to/config/file");
 }
 ----
+
+=== Batching Consumer
+
+To use a Kafka batching consumer with Camel, an application has to set the configuration `batching` to `true` and use manual commits.
+
+The received records are stored in a list in the exchange used in the pipeline. As such, it is possible to commit individually
+every record or the whole batch at once by committing the last exchange on the list.
+
+When working with batch processing, it's up to the application to commit the records, and handle the outcome of potentially invalid records.
+
+The size of the batch is controlled by the option `maxPollRecords`.
+
+In order to avoid blocking for too long, waiting for the a whole set of records to fill the batch, it is is possible to use the `pollTimeoutMs` option to set a timeout for the polling. In this case, the batch may contain less messages than set in the `maxPollRecords`.
+
+[source,java]
+----
+    from("kafka:topic?batching=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+    .process(e -> {
+        // The received records are stored as exchanges in a list. This gets the list of those exchanges
+        final List<?> exchanges = e.getMessage().getBody(List.class);
+
+        // Ensure we are actually receiving what we are asking for
+        if (exchanges == null || exchanges.isEmpty()) {
+            return;
+        }
+
+        /*
+        Every exchange in that list should contain a reference to the manual commit object. We use the reference
+        for the last exchange in the list to commit the whole batch
+         */
+        final Object tmp = exchanges.getLast();
+        if (tmp instanceof Exchange exchange) {
+            KafkaManualCommit manual =
+                    exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            LOG.debug("Performing manual commit");
+            manual.commit();
+            LOG.debug("Done performing manual commit");
+        }
+    });
+----
 include::spring-boot:partial$starter.adoc[]
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
index 2384b73e98b..9bbb3a90cbe 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
@@ -134,6 +134,10 @@ The JdbcAggregationRepository now provides a deserializationFilter parameter. Th
 
 The component was removed without deprecation. The library supporting this component has been unmaintained for a long time. We found no indications that the library itself nor the component are working with modern Facebook, along with the absence of community interest, which lead us to decide to remove this component without deprecation.
 
+=== camel-kafka
+
+The component now has support for batch processing.
+
 == Camel Spring Boot
 
 === Auto Configuration


(camel) 03/03: CAMEL-16044: do not enable auto-commit when using batching mode

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 372ec8c5372c8fc6637d3f7e2bcb21fff11d821e
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jan 19 17:02:03 2024 +0100

    CAMEL-16044: do not enable auto-commit when using batching mode
---
 .../java/org/apache/camel/component/kafka/KafkaConfiguration.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 29e39132146..3cd48f37d4f 100755
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -740,7 +740,11 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware
     }
 
     public boolean getAutoCommitEnable() {
-        return autoCommitEnable;
+        if (!batching) {
+            return autoCommitEnable;
+        }
+
+        return false;
     }
 
     /**