You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2024/02/05 08:57:40 UTC
(camel-kamelets-examples) 01/01: Added a little example of kafka Batch consumer in action with manual commit
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch kafka-batch-log
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit 65194924fa01fcd57e50bfc27b518d26b37a8ce1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Feb 5 09:56:46 2024 +0100
Added a little example of kafka Batch consumer in action with manual commit
Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
jbang/kafka-batch-log/README.adoc | 189 +++++++++++++++++++++++++++++
jbang/kafka-batch-log/kafka-batch-log.yaml | 41 +++++++
2 files changed, 230 insertions(+)
diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-log/README.adoc
new file mode 100644
index 0000000..4f62e8a
--- /dev/null
+++ b/jbang/kafka-batch-log/README.adoc
@@ -0,0 +1,189 @@
+== Kafka Batch Consumer with Manual commit
+
+In this sample you'll use the Kafka Batch Source Kamelet in action.
+
+=== Install JBang
+
+First install JBang according to https://www.jbang.dev
+
+When JBang is installed then you should be able to run from a shell:
+
+[source,sh]
+----
+$ jbang --version
+----
+
+This will output the version of JBang.
+
+To run this example you can either install Camel on JBang via:
+
+[source,sh]
+----
+$ jbang app install camel@apache/camel
+----
+
+Which allows to run CamelJBang with `camel` as shown below.
+
+=== Setup Kafka instance
+
+You'll need to run a Kafka cluster to point to. In this case you could use an ansible role like https://github.com/oscerd/kafka-ansible-role
+
+And set up a file deploy.yaml with the following content:
+
+```yaml
+- name: role kafka
+ hosts: localhost
+ remote_user: user
+
+ roles:
+ - role: kafka-ansible-role
+ kafka_version: 3.4.1
+ path_dir: /home/user/
+ unarchive_dest_dir: /home/user/kafka/demo/
+ start_kafka: true
+```
+
+and then run
+
+```shell script
+ansible-playbook -v deploy.yaml
+```
+
+This should start a Kafka instance for you, on your local machine.
+
+=== How to run
+
+Then you can run this example using:
+
+[source,sh]
+----
+$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-log.yaml
+----
+
+=== Consumer running
+
+For Kafka-mq integration you should see:
+
+[source,sh]
+----
+2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (started:4)
+2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-to-log (kamelet://kafka-batch-not-secured-source)
+2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-not-secured-source-1 (kafka://test-topic)
+2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started log-sink-2 (kamelet://source)
+2024-02-05 09:38:24.103 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-batch-manual-commit-action-3 (kamelet://source)
+2024-02-05 09:38:24.104 INFO 21666 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT (kafka-batch-log) started in 354ms (build:0ms init:0ms start:354ms)
+2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
+2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
+2024-02-05 09:38:24.193 INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1707122304192
+2024-02-05 09:38:24.197 INFO 21666 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2024-02-05 09:38:24.197 INFO 21666 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic
+2024-02-05 09:38:24.198 INFO 21666 --- [mer[test-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic
+2024-02-05 09:38:24.475 WARN 21666 --- [mer[test-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE}
+2024-02-05 09:38:24.477 INFO 21666 --- [mer[test-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: VxYjgKU6RGSnOeHWuObnwA
+2024-02-05 09:38:24.483 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
+2024-02-05 09:38:24.487 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
+2024-02-05 09:38:24.530 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af
+2024-02-05 09:38:24.532 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
+2024-02-05 09:38:24.533 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
+2024-02-05 09:38:24.536 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'}
+2024-02-05 09:38:24.594 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 1: {consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af=Assignment(partitions=[test-topic-0])}
+2024-02-05 09:38:24.607 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'}
+2024-02-05 09:38:24.611 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0])
+2024-02-05 09:38:24.615 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0
+2024-02-05 09:38:24.632 INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Found no committed offset for partition test-topic-0
+2024-02-05 09:38:24.648 INFO 21666 --- [mer[test-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-my-group-1, groupId=my-group] Resetting offset for partition test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}.
+----
+
+At this point we should start sending messages to the test-topic topic. We could use kcat for this.
+
+[source,sh]
+----
+for i in {1..2}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done
+----
+
+In the consumer log, once the pollTimeout of 40 s completes, you should see an output of
+
+[source,sh]
+----
+2024-02-05 09:42:07.908 INFO 21666 --- [mer[test-topic]] log-sink : Exchange[
+ ExchangePattern: InOnly
+ Headers: {}
+ BodyType: java.util.ArrayList
+ Body: [Exchange[], Exchange[]]
+]
+----
+
+If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action.
+
+[source,sh]
+----
+./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
+
+GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
+my-group test-topic 0 2 2 0 - - -
+----
+
+You could also try to send groups of 10 records to see how the batch consumer behaves:
+
+[source,sh]
+----
+for i in {1..50}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done
+----
+
+And you should immediately see the output in group of 10 records
+
+[source,sh]
+----
+2024-02-05 09:50:33.947 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
+ ExchangePattern: InOnly
+ Headers: {}
+ BodyType: java.util.ArrayList
+ Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+2024-02-05 09:50:44.137 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
+ ExchangePattern: InOnly
+ Headers: {}
+ BodyType: java.util.ArrayList
+ Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+2024-02-05 09:50:54.324 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
+ ExchangePattern: InOnly
+ Headers: {}
+ BodyType: java.util.ArrayList
+ Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+2024-02-05 09:51:04.535 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
+ ExchangePattern: InOnly
+ Headers: {}
+ BodyType: java.util.ArrayList
+ Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+2024-02-05 09:51:14.747 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[
+ ExchangePattern: InOnly
+ Headers: {}
+ BodyType: java.util.ArrayList
+ Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+----
+
+For the aim of this example the payload of the records is not important.
+
+If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now.
+
+[source,sh]
+----
+./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
+
+GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
+my-group test-topic 0 52 52 0 - - -
+----
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/community/support/[let us know].
+
+We also love contributors, so
+https://camel.apache.org/community/contributing/[get involved] :-)
+
+The Camel riders!
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-log/kafka-batch-log.yaml
new file mode 100644
index 0000000..11c25df
--- /dev/null
+++ b/jbang/kafka-batch-log/kafka-batch-log.yaml
@@ -0,0 +1,41 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# camel-k: dependency=camel:kafka
+
+- route:
+ id: "kafka-to-log"
+ from:
+ uri: "kamelet:kafka-batch-not-secured-source"
+ parameters:
+ bootstrapServers: "localhost:9092"
+ topic: "test-topic"
+ consumerGroup: 'my-group'
+ batchSize: 10
+ pollTimeout: 40000
+ maxPollIntervalMs: 60000
+ autoCommitEnable: false
+ allowManualCommit: true
+ steps:
+ - to:
+ uri: "kamelet:log-sink"
+ parameters:
+ showStreams: true
+ showHeaders: true
+ multiline: true
+ - to:
+ uri: "kamelet:kafka-batch-manual-commit-action"