You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2024/02/05 08:57:40 UTC

(camel-kamelets-examples) 01/01: Added a little example of kafka Batch consumer in action with manual commit

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-batch-log
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit 65194924fa01fcd57e50bfc27b518d26b37a8ce1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Feb 5 09:56:46 2024 +0100

    Added a little example of kafka Batch consumer in action with manual commit
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 jbang/kafka-batch-log/README.adoc          | 189 +++++++++++++++++++++++++++++
 jbang/kafka-batch-log/kafka-batch-log.yaml |  41 +++++++
 2 files changed, 230 insertions(+)

diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-log/README.adoc
new file mode 100644
index 0000000..4f62e8a
--- /dev/null
+++ b/jbang/kafka-batch-log/README.adoc
@@ -0,0 +1,189 @@
+== Kafka Batch Consumer with Manual commit
+
+In this sample you'll use the Kafka Batch Source Kamelet in action.
+
+=== Install JBang
+
+First install JBang according to https://www.jbang.dev
+
+When JBang is installed then you should be able to run from a shell:
+
+[source,sh]
+----
+$ jbang --version
+----
+
+This will output the version of JBang.
+
+To run this example you can either install Camel on JBang via:
+
+[source,sh]
+----
+$ jbang app install camel@apache/camel
+----
+
+Which allows to run CamelJBang with `camel` as shown below.
+
+=== Setup Kafka instance
+
+You'll need to run a Kafka cluster to point to. In this case you could use an ansible role like https://github.com/oscerd/kafka-ansible-role
+
+And set up a file deploy.yaml with the following content:
+
+```yaml
+- name: role kafka
+  hosts: localhost
+  remote_user: user
+  
+  roles:
+    - role: kafka-ansible-role
+      kafka_version: 3.4.1
+      path_dir: /home/user/
+      unarchive_dest_dir: /home/user/kafka/demo/
+      start_kafka: true
+```
+
+and then run
+
+```shell script
+ansible-playbook -v deploy.yaml
+```
+
+This should start a Kafka instance for you, on your local machine.
+
+=== How to run
+
+Then you can run this example using:
+
+[source,sh]
+----
+$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-log.yaml
+----
+
+=== Consumer running
+
+For Kafka-mq integration you should see:
+
+[source,sh]
+----
+2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext : Routes startup (started:4)
+2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-to-log (kamelet://kafka-batch-not-secured-source)
+2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-batch-not-secured-source-1 (kafka://test-topic)
+2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext :     Started log-sink-2 (kamelet://source)
+2024-02-05 09:38:24.103  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext :     Started kafka-batch-manual-commit-action-3 (kamelet://source)
+2024-02-05 09:38:24.104  INFO 21666 --- [           main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0-SNAPSHOT (kafka-batch-log) started in 354ms (build:0ms init:0ms start:354ms)
+2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
+2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
+2024-02-05 09:38:24.193  INFO 21666 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1707122304192
+2024-02-05 09:38:24.197  INFO 21666 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2024-02-05 09:38:24.197  INFO 21666 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic
+2024-02-05 09:38:24.198  INFO 21666 --- [mer[test-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic
+2024-02-05 09:38:24.475  WARN 21666 --- [mer[test-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE}
+2024-02-05 09:38:24.477  INFO 21666 --- [mer[test-topic]] org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: VxYjgKU6RGSnOeHWuObnwA
+2024-02-05 09:38:24.483  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
+2024-02-05 09:38:24.487  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
+2024-02-05 09:38:24.530  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af
+2024-02-05 09:38:24.532  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
+2024-02-05 09:38:24.533  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group
+2024-02-05 09:38:24.536  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'}
+2024-02-05 09:38:24.594  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 1: {consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af=Assignment(partitions=[test-topic-0])}
+2024-02-05 09:38:24.607  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-my-group-1-0a444d13-3462-4037-99cc-2f088b28d8af', protocol='range'}
+2024-02-05 09:38:24.611  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0])
+2024-02-05 09:38:24.615  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0
+2024-02-05 09:38:24.632  INFO 21666 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Found no committed offset for partition test-topic-0
+2024-02-05 09:38:24.648  INFO 21666 --- [mer[test-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-my-group-1, groupId=my-group] Resetting offset for partition test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}.
+----
+
+At this point we should start sending messages to the test-topic topic. We could use kcat for this.
+
+[source,sh]
+----
+for i in {1..2}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done
+----
+
+In the consumer log, once the pollTimeout of 40 s completes, you should see an output of
+
+[source,sh]
+----
+2024-02-05 09:42:07.908  INFO 21666 --- [mer[test-topic]] log-sink                            : Exchange[
+  ExchangePattern: InOnly
+  Headers: {}
+  BodyType: java.util.ArrayList
+  Body: [Exchange[], Exchange[]]
+]
+----
+
+If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action.
+
+[source,sh]
+----
+./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
+
+GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
+my-group        test-topic      0          2               2               0               -               -               -
+----
+
+You could also try to send groups of 10 records to see how the batch consumer behaves:
+
+[source,sh]
+----
+for i in {1..50}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done
+----
+
+And you should immediately see the output in group of 10 records
+
+[source,sh]
+----
+2024-02-05 09:50:33.947  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
+  ExchangePattern: InOnly
+  Headers: {}
+  BodyType: java.util.ArrayList
+  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+2024-02-05 09:50:44.137  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
+  ExchangePattern: InOnly
+  Headers: {}
+  BodyType: java.util.ArrayList
+  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+2024-02-05 09:50:54.324  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
+  ExchangePattern: InOnly
+  Headers: {}
+  BodyType: java.util.ArrayList
+  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+2024-02-05 09:51:04.535  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
+  ExchangePattern: InOnly
+  Headers: {}
+  BodyType: java.util.ArrayList
+  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+2024-02-05 09:51:14.747  INFO 24182 --- [mer[test-topic]] log-sink                            : Exchange[
+  ExchangePattern: InOnly
+  Headers: {}
+  BodyType: java.util.ArrayList
+  Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]]
+]
+----
+
+For the aim of this example the payload of the records is not important.
+
+If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now.
+
+[source,sh]
+----
+./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
+
+GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                              HOST            CLIENT-ID
+my-group        test-topic      0          52              52              0               -                                                        -               -
+----
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/community/support/[let us know].
+
+We also love contributors, so
+https://camel.apache.org/community/contributing/[get involved] :-)
+
+The Camel riders!
diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-log/kafka-batch-log.yaml
new file mode 100644
index 0000000..11c25df
--- /dev/null
+++ b/jbang/kafka-batch-log/kafka-batch-log.yaml
@@ -0,0 +1,41 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# camel-k: dependency=camel:kafka
+
+- route:
+    id: "kafka-to-log"
+    from:
+      uri: "kamelet:kafka-batch-not-secured-source"
+      parameters:
+        bootstrapServers: "localhost:9092"
+        topic: "test-topic"
+        consumerGroup: 'my-group'
+        batchSize: 10
+        pollTimeout: 40000
+        maxPollIntervalMs: 60000
+        autoCommitEnable: false
+        allowManualCommit: true
+      steps:
+        - to:
+            uri: "kamelet:log-sink"
+            parameters:
+              showStreams: true
+              showHeaders: true
+              multiline: true
+        - to:
+            uri: "kamelet:kafka-batch-manual-commit-action"