You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2024/02/16 10:34:19 UTC

(camel-kamelets-examples) branch kafka-batch-s3 created (now a5527df)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch kafka-batch-s3
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git


      at a5527df  Moved Kafka Batch Log to a different folder named kafka-batch-s3

This branch includes the following new commits:

     new 8dd6f56  Improve Kafka Batch example
     new a5527df  Moved Kafka Batch Log to a different folder named kafka-batch-s3

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(camel-kamelets-examples) 01/02: Improve Kafka Batch example

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-batch-s3
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit 8dd6f5658258bd77c07e1bbbd0d130c1e4f18c5d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Feb 16 11:32:51 2024 +0100

    Improve Kafka Batch example
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 jbang/kafka-batch-log/BatchLog.java        |  32 ---------
 jbang/kafka-batch-log/README.adoc          | 104 +++++++++++++++--------------
 jbang/kafka-batch-log/kafka-batch-log.yaml |  25 +++++--
 3 files changed, 72 insertions(+), 89 deletions(-)

diff --git a/jbang/kafka-batch-log/BatchLog.java b/jbang/kafka-batch-log/BatchLog.java
deleted file mode 100644
index 54d89bd..0000000
--- a/jbang/kafka-batch-log/BatchLog.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package camel.example;
-
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BatchLog implements Processor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BatchLog.class);
-
-    @Override
-    public void process(Exchange e) throws Exception {
-        final List<?> exchanges = e.getMessage().getBody(List.class);
-
-        // Ensure we are actually receiving what we are asking for
-        if (exchanges == null || exchanges.isEmpty()) {
-            return;
-        }
-
-        // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
-        for (Object obj : exchanges) {
-            if (obj instanceof Exchange) {
-                LOG.info("Processing exchange with body {}", ((Exchange)obj).getMessage().getBody(String.class));
-            }
-        }
-    }
-
-}
diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-log/README.adoc
index 31715d4..12d1a5b 100644
--- a/jbang/kafka-batch-log/README.adoc
+++ b/jbang/kafka-batch-log/README.adoc
@@ -1,6 +1,6 @@
 == Kafka Batch Consumer with Manual commit
 
-In this sample you'll use the Kafka Batch Source Kamelet in action.
+In this sample you'll use the Kafka Batch Source Kamelet in action and write the single records of the batch into an S3 bucket.
 
 === Install JBang
 
@@ -51,13 +51,21 @@ ansible-playbook -v deploy.yaml
 
 This should start a Kafka instance for you, on your local machine.
 
+=== Set up AWS S3
+
+Create a bucket on your personal account.
+
+The Kamelet will use the defaultCredentialsProvider, so you'll need the credentials file on your hosts.
+
+Modify the kafka-batch-s3.yaml file to add the correct region and the correct bucket name.
+
 === How to run
 
 Then you can run this example using:
 
 [source,sh]
 ----
-$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> BatchLog.java kafka-batch-log.yaml
+$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-s3.yaml
 ----
 
 === Consumer running
@@ -66,32 +74,31 @@ You should see:
 
 [source,sh]
 ----
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext : Routes startup (started:4)
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-to-log (kamelet://kafka-batch-not-secured-source)
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-batch-not-secured-source-1 (kafka://test-topic)
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext :     Started log-sink-2 (kamelet://source)
-2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-batch-manual-commit-action-3 (kamelet://source)
-2024-02-05 09:38:24.104  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT (kafka-batch-log) started in 354ms (build:0ms init:0ms start:354ms)
-2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
-2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
-2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1707122304192
-2024-02-05 09:38:24.197  INFO 21666 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
-2024-02-05 09:38:24.197  INFO 21666 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic
-2024-02-05 09:38:24.198  INFO 21666 --- [mer[test-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic
-2024-02-05 09:38:24.475  WARN 21666 --- [mer[test-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE}
-2024-02-05 09:38:24.477  INFO 21666 --- [mer[test-topic]] org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: VxYjgKU6RGSnOeHWuObnwA
-2024-02-05 09:38:24.483  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
-2024-02-05 09:38:24.487  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
-2024-02-05 09:38:24.530  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af
-2024-02-05 09:38:24.532  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
-2024-02-05 09:38:24.533  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
-2024-02-05 09:38:24.536  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'}
-2024-02-05 09:38:24.594  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 1: {consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af=Assignment(partitions=[test-topic-0])}
-2024-02-05 09:38:24.607  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'}
-2024-02-05 09:38:24.611  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0])
-2024-02-05 09:38:24.615  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0
-2024-02-05 09:38:24.632  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Found no committed offset for partition test-topic-0
-2024-02-05 09:38:24.648  INFO 21666 --- [mer[test-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-my-group-1, groupId=my-group] Resetting offset for partition test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}.
+2024-02-16 10:19:47.357  INFO 17500 --- [           main] el.impl.engine.AbstractCamelContext : Routes startup (started:4)
+2024-02-16 10:19:47.357  INFO 17500 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-to-log (kamelet://kafka-batch-not-secured-source)
+2024-02-16 10:19:47.357  INFO 17500 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-batch-not-secured-source-1 (kafka://test-topic)
+2024-02-16 10:19:47.358  INFO 17500 --- [           main] el.impl.engine.AbstractCamelContext :     Started aws-s3-sink-2 (kamelet://source)
+2024-02-16 10:19:47.358  INFO 17500 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-batch-manual-commit-action-3 (kamelet://source)
+2024-02-16 10:19:47.358  INFO 17500 --- [           main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT (kafka-batch-log) started in 1s244ms (build:0ms init:0ms start:1s244ms)
+2024-02-16 10:19:47.418  INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
+2024-02-16 10:19:47.418  INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
+2024-02-16 10:19:47.419  INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708075187417
+2024-02-16 10:19:47.424  INFO 17500 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2024-02-16 10:19:47.424  INFO 17500 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic
+2024-02-16 10:19:47.425  INFO 17500 --- [mer[test-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic
+2024-02-16 10:19:47.693  INFO 17500 --- [mer[test-topic]] org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: QKy-eUclRryoTxWZq4xsPA
+2024-02-16 10:19:47.694  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
+2024-02-16 10:19:47.697  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
+2024-02-16 10:19:47.710  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733
+2024-02-16 10:19:47.712  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
+2024-02-16 10:19:47.712  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
+2024-02-16 10:19:47.718  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=19, memberId='consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733', protocol='range'}
+2024-02-16 10:19:47.725  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 19: {consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733=Assignment(partitions=[test-topic-0])}
+2024-02-16 10:19:47.733  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=19, memberId='consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733', protocol='range'}
+2024-02-16 10:19:47.734  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0])
+2024-02-16 10:19:47.737  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0
+2024-02-16 10:19:47.749  INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}
+
 ----
 
 At this point we should start sending messages to the test-topic topic. We could use kcat for this.
@@ -101,14 +108,6 @@ At this point we should start sending messages to the test-topic topic. We could
 for i in {1..2}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done
 ----
 
-In the consumer log, once the pollTimeout of 40 s completes, you should see an output of
-
-[source,sh]
-----
-2024-02-05 09:42:07.908  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:07.909  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-----
-
 If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action.
 
 [source,sh]
@@ -126,28 +125,31 @@ You could also try to send groups of 10 records to see how the batch consumer be
 for i in {1..50}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done
 ----
 
-And you should immediately see the output in group of 10 records
+When the process complete you can check your aws bucket:
 
 [source,sh]
 ----
+$ aws s3 ls s3://<bucket_name>
+2024-02-16 10:20:57         10 test-topic-20240216102055784.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056153.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056281.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056409.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056541.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056667.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056803.txt
+2024-02-16 10:20:57         10 test-topic-20240216102056930.txt
 .
 .
 .
 .
-2024-02-05 09:42:40.908  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.909  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.913  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.914  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.920  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.928  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.930  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.940  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.950  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-2024-02-05 09:42:40.955  INFO 21666 --- [mer[test-topic]] camel.example.BatchLog              : Processing exchange with body hello there
-.
-.
-.
-.
+----
+
+And you could also verify the content
+
+[source,sh]
+----
+$ aws s3 cp s3://<bucket_name>/test-topic-20240216102055784.txt -
+hello there
 ----
 
 If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now.
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-log/kafka-batch-log.yaml
index 1a779bd..a1e4e6d 100644
--- a/jbang/kafka-batch-log/kafka-batch-log.yaml
+++ b/jbang/kafka-batch-log/kafka-batch-log.yaml
@@ -17,10 +17,6 @@
 
 # camel-k: dependency=camel:kafka
 
-- beans:
-  - name: batchLog
-    type: "#class:camel.example.BatchLog"
-
 - route:
     id: "kafka-to-log"
     from:
@@ -34,8 +30,25 @@
         maxPollIntervalMs: 60000
         autoCommitEnable: false
         allowManualCommit: true
+        deserializeHeaders: true
       steps:
-        - bean:
-            ref: batchLog
+        - split:
+            simple: "${body}"
+            steps:
+              - setHeaders:
+                  headers:
+                    - name: "kafka.TOPIC"
+                      simple: "${body.getMessage().getHeader('kafka.TOPIC')}"
+              - setHeader:
+                  name: "file"
+                  simple: "${headers[kafka.TOPIC]}-${date:now:yyyyMMddHHmmssSSS}.txt"
+              - setBody:
+                  simple: "${body.getMessage().getBody()}"
+              - to:
+                  uri: "kamelet:aws-s3-sink"
+                  parameters:
+                    useDefaultCredentialsProvider: true
+                    region: "eu-west-1"
+                    bucketNameOrArn: kamelets-demo             
         - to:
             uri: "kamelet:kafka-batch-manual-commit-action"


(camel-kamelets-examples) 02/02: Moved Kafka Batch Log to a different folder named kafka-batch-s3

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-batch-s3
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit a5527df4e8b3ba7cd1b36fb68e63ae4735d4aee1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Feb 16 11:33:29 2024 +0100

    Moved Kafka Batch Log to a different folder named kafka-batch-s3
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 jbang/{kafka-batch-log => kafka-batch-s3}/README.adoc          | 0
 jbang/{kafka-batch-log => kafka-batch-s3}/kafka-batch-log.yaml | 0
 2 files changed, 0 insertions(+), 0 deletions(-)

diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-s3/README.adoc
similarity index 100%
rename from jbang/kafka-batch-log/README.adoc
rename to jbang/kafka-batch-s3/README.adoc
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-s3/kafka-batch-log.yaml
similarity index 100%
rename from jbang/kafka-batch-log/kafka-batch-log.yaml
rename to jbang/kafka-batch-s3/kafka-batch-log.yaml