You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/19 16:35:32 UTC

(camel) 02/03: CAMEL-16044: added documentation

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d8978186f54dbd380a28690ec7df7ced5c5c98c4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jan 19 16:08:48 2024 +0100

    CAMEL-16044: added documentation
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 40 ++++++++++++++++++++++
 .../ROOT/pages/camel-4x-upgrade-guide-4_4.adoc     |  4 +++
 2 files changed, 44 insertions(+)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 2f44d00f73a..d8d8879bb4a 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -493,4 +493,44 @@ static {
     KafkaComponent.setKerberosConfigLocation("path/to/config/file");
 }
 ----
+
+=== Batching Consumer
+
+To use a Kafka batching consumer with Camel, an application has to set the configuration `batching` to `true` and use manual commits.
+
+The received records are stored in a list in the exchange used in the pipeline. As such, it is possible to commit individually
+every record or the whole batch at once by committing the last exchange on the list.
+
+When working with batch processing, it's up to the application to commit the records, and handle the outcome of potentially invalid records.
+
+The size of the batch is controlled by the option `maxPollRecords`.
+
+In order to avoid blocking for too long, waiting for the a whole set of records to fill the batch, it is is possible to use the `pollTimeoutMs` option to set a timeout for the polling. In this case, the batch may contain less messages than set in the `maxPollRecords`.
+
+[source,java]
+----
+    from("kafka:topic?batching=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+    .process(e -> {
+        // The received records are stored as exchanges in a list. This gets the list of those exchanges
+        final List<?> exchanges = e.getMessage().getBody(List.class);
+
+        // Ensure we are actually receiving what we are asking for
+        if (exchanges == null || exchanges.isEmpty()) {
+            return;
+        }
+
+        /*
+        Every exchange in that list should contain a reference to the manual commit object. We use the reference
+        for the last exchange in the list to commit the whole batch
+         */
+        final Object tmp = exchanges.getLast();
+        if (tmp instanceof Exchange exchange) {
+            KafkaManualCommit manual =
+                    exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            LOG.debug("Performing manual commit");
+            manual.commit();
+            LOG.debug("Done performing manual commit");
+        }
+    });
+----
 include::spring-boot:partial$starter.adoc[]
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
index 2384b73e98b..9bbb3a90cbe 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_4.adoc
@@ -134,6 +134,10 @@ The JdbcAggregationRepository now provides a deserializationFilter parameter. Th
 
 The component was removed without deprecation. The library supporting this component has been unmaintained for a long time. We found no indications that the library itself nor the component are working with modern Facebook, along with the absence of community interest, which lead us to decide to remove this component without deprecation.
 
+=== camel-kafka
+
+The component now has support for batch processing.
+
 == Camel Spring Boot
 
 === Auto Configuration