You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2024/02/16 10:34:19 UTC
(camel-kamelets-examples) branch kafka-batch-s3 created (now a5527df)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a change to branch kafka-batch-s3
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
at a5527df Moved Kafka Batch Log to a different folder named kafka-batch-s3
This branch includes the following new commits:
new 8dd6f56 Improve Kafka Batch example
new a5527df Moved Kafka Batch Log to a different folder named kafka-batch-s3
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
(camel-kamelets-examples) 01/02: Improve Kafka Batch example
Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch kafka-batch-s3
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit 8dd6f5658258bd77c07e1bbbd0d130c1e4f18c5d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Feb 16 11:32:51 2024 +0100
Improve Kafka Batch example
Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
jbang/kafka-batch-log/BatchLog.java | 32 ---------
jbang/kafka-batch-log/README.adoc | 104 +++++++++++++++--------------
jbang/kafka-batch-log/kafka-batch-log.yaml | 25 +++++--
3 files changed, 72 insertions(+), 89 deletions(-)
diff --git a/jbang/kafka-batch-log/BatchLog.java b/jbang/kafka-batch-log/BatchLog.java
deleted file mode 100644
index 54d89bd..0000000
--- a/jbang/kafka-batch-log/BatchLog.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package camel.example;
-
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BatchLog implements Processor {
-
- private static final Logger LOG = LoggerFactory.getLogger(BatchLog.class);
-
- @Override
- public void process(Exchange e) throws Exception {
- final List<?> exchanges = e.getMessage().getBody(List.class);
-
- // Ensure we are actually receiving what we are asking for
- if (exchanges == null || exchanges.isEmpty()) {
- return;
- }
-
- // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list
- for (Object obj : exchanges) {
- if (obj instanceof Exchange) {
- LOG.info("Processing exchange with body {}", ((Exchange)obj).getMessage().getBody(String.class));
- }
- }
- }
-
-}
diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-log/README.adoc
index 31715d4..12d1a5b 100644
--- a/jbang/kafka-batch-log/README.adoc
+++ b/jbang/kafka-batch-log/README.adoc
@@ -1,6 +1,6 @@
== Kafka Batch Consumer with Manual commit
-In this sample you'll use the Kafka Batch Source Kamelet in action.
+In this sample you'll use the Kafka Batch Source Kamelet in action and write the single records of the batch into an S3 bucket.
=== Install JBang
@@ -51,13 +51,21 @@ ansible-playbook -v deploy.yaml
This should start a Kafka instance for you, on your local machine.
+=== Set up AWS S3
+
+Create a bucket on your personal account.
+
+The Kamelet will use the defaultCredentialsProvider, so you'll need the credentials file on your hosts.
+
+Modify the kafka-batch-s3.yaml file to add the correct region and the correct bucket name.
+
=== How to run
Then you can run this example using:
[source,sh]
----
-$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> BatchLog.java kafka-batch-log.yaml
+$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-s3.yaml
----
=== Consumer running
@@ -66,32 +74,31 @@ You should see:
[source,sh]
----
-2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (started:4)
-2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-to-log (kamelet://kafka-batch-not-secured-source)
-2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-not-secured-source-1 (kafka://test-topic)
-2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started log-sink-2 (kamelet://source)
-2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-manual-commit-action-3 (kamelet://source)
-2024-02-05 09:38:24.104 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT (kafka-batch-log) started in 354ms (build:0ms init:0ms start:354ms)
-2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
-2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
-2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1707122304192
-2024-02-05 09:38:24.197 INFO 21666 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
-2024-02-05 09:38:24.197 INFO 21666 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic
-2024-02-05 09:38:24.198 INFO 21666 --- [mer[test-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic
-2024-02-05 09:38:24.475 WARN 21666 --- [mer[test-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE}
-2024-02-05 09:38:24.477 INFO 21666 --- [mer[test-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: VxYjgKU6RGSnOeHWuObnwA
-2024-02-05 09:38:24.483 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
-2024-02-05 09:38:24.487 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
-2024-02-05 09:38:24.530 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af
-2024-02-05 09:38:24.532 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
-2024-02-05 09:38:24.533 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
-2024-02-05 09:38:24.536 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'}
-2024-02-05 09:38:24.594 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 1: {consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af=Assignment(partitions=[test-topic-0])}
-2024-02-05 09:38:24.607 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'}
-2024-02-05 09:38:24.611 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0])
-2024-02-05 09:38:24.615 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0
-2024-02-05 09:38:24.632 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Found no committed offset for partition test-topic-0
-2024-02-05 09:38:24.648 INFO 21666 --- [mer[test-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-my-group-1, groupId=my-group] Resetting offset for partition test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}.
+2024-02-16 10:19:47.357 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (started:4)
+2024-02-16 10:19:47.357 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-to-log (kamelet://kafka-batch-not-secured-source)
+2024-02-16 10:19:47.357 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-not-secured-source-1 (kafka://test-topic)
+2024-02-16 10:19:47.358 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Started aws-s3-sink-2 (kamelet://source)
+2024-02-16 10:19:47.358 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-manual-commit-action-3 (kamelet://source)
+2024-02-16 10:19:47.358 INFO 17500 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT (kafka-batch-log) started in 1s244ms (build:0ms init:0ms start:1s244ms)
+2024-02-16 10:19:47.418 INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
+2024-02-16 10:19:47.418 INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
+2024-02-16 10:19:47.419 INFO 17500 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708075187417
+2024-02-16 10:19:47.424 INFO 17500 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2024-02-16 10:19:47.424 INFO 17500 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic
+2024-02-16 10:19:47.425 INFO 17500 --- [mer[test-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic
+2024-02-16 10:19:47.693 INFO 17500 --- [mer[test-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: QKy-eUclRryoTxWZq4xsPA
+2024-02-16 10:19:47.694 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
+2024-02-16 10:19:47.697 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
+2024-02-16 10:19:47.710 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733
+2024-02-16 10:19:47.712 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
+2024-02-16 10:19:47.712 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
+2024-02-16 10:19:47.718 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=19, memberId='consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733', protocol='range'}
+2024-02-16 10:19:47.725 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 19: {consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733=Assignment(partitions=[test-topic-0])}
+2024-02-16 10:19:47.733 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=19, memberId='consumer-my-group-1-083abfe1-f3d1-4f52-ad0a-c8118c711733', protocol='range'}
+2024-02-16 10:19:47.734 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0])
+2024-02-16 10:19:47.737 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0
+2024-02-16 10:19:47.749 INFO 17500 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}
+
----
At this point we should start sending messages to the test-topic topic. We could use kcat for this.
@@ -101,14 +108,6 @@ At this point we should start sending messages to the test-topic topic. We could
for i in {1..2}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done
----
-In the consumer log, once the pollTimeout of 40 s completes, you should see an output of
-
-[source,sh]
-----
-2024-02-05 09:42:07.908 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:07.909 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-----
-
If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action.
[source,sh]
@@ -126,28 +125,31 @@ You could also try to send groups of 10 records to see how the batch consumer be
for i in {1..50}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done
----
-And you should immediately see the output in group of 10 records
+When the process complete you can check your aws bucket:
[source,sh]
----
+$ aws s3 ls s3://<bucket_name>
+2024-02-16 10:20:57 10 test-topic-20240216102055784.txt
+2024-02-16 10:20:57 10 test-topic-20240216102056153.txt
+2024-02-16 10:20:57 10 test-topic-20240216102056281.txt
+2024-02-16 10:20:57 10 test-topic-20240216102056409.txt
+2024-02-16 10:20:57 10 test-topic-20240216102056541.txt
+2024-02-16 10:20:57 10 test-topic-20240216102056667.txt
+2024-02-16 10:20:57 10 test-topic-20240216102056803.txt
+2024-02-16 10:20:57 10 test-topic-20240216102056930.txt
.
.
.
.
-2024-02-05 09:42:40.908 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.909 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.913 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.914 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.920 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.928 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.930 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.940 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.950 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-2024-02-05 09:42:40.955 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there
-.
-.
-.
-.
+----
+
+And you could also verify the content
+
+[source,sh]
+----
+$ aws s3 cp s3://<bucket_name>/test-topic-20240216102055784.txt -
+hello there
----
If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now.
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-log/kafka-batch-log.yaml
index 1a779bd..a1e4e6d 100644
--- a/jbang/kafka-batch-log/kafka-batch-log.yaml
+++ b/jbang/kafka-batch-log/kafka-batch-log.yaml
@@ -17,10 +17,6 @@
# camel-k: dependency=camel:kafka
-- beans:
- - name: batchLog
- type: "#class:camel.example.BatchLog"
-
- route:
id: "kafka-to-log"
from:
@@ -34,8 +30,25 @@
maxPollIntervalMs: 60000
autoCommitEnable: false
allowManualCommit: true
+ deserializeHeaders: true
steps:
- - bean:
- ref: batchLog
+ - split:
+ simple: "${body}"
+ steps:
+ - setHeaders:
+ headers:
+ - name: "kafka.TOPIC"
+ simple: "${body.getMessage().getHeader('kafka.TOPIC')}"
+ - setHeader:
+ name: "file"
+ simple: "${headers[kafka.TOPIC]}-${date:now:yyyyMMddHHmmssSSS}.txt"
+ - setBody:
+ simple: "${body.getMessage().getBody()}"
+ - to:
+ uri: "kamelet:aws-s3-sink"
+ parameters:
+ useDefaultCredentialsProvider: true
+ region: "eu-west-1"
+ bucketNameOrArn: kamelets-demo
- to:
uri: "kamelet:kafka-batch-manual-commit-action"
(camel-kamelets-examples) 02/02: Moved Kafka Batch Log to a different folder named kafka-batch-s3
Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch kafka-batch-s3
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit a5527df4e8b3ba7cd1b36fb68e63ae4735d4aee1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Feb 16 11:33:29 2024 +0100
Moved Kafka Batch Log to a different folder named kafka-batch-s3
Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
jbang/{kafka-batch-log => kafka-batch-s3}/README.adoc | 0
jbang/{kafka-batch-log => kafka-batch-s3}/kafka-batch-log.yaml | 0
2 files changed, 0 insertions(+), 0 deletions(-)
diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-s3/README.adoc
similarity index 100%
rename from jbang/kafka-batch-log/README.adoc
rename to jbang/kafka-batch-s3/README.adoc
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-s3/kafka-batch-log.yaml
similarity index 100%
rename from jbang/kafka-batch-log/kafka-batch-log.yaml
rename to jbang/kafka-batch-s3/kafka-batch-log.yaml