You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2024/03/07 06:07:56 UTC

(camel-kamelets-examples) 01/01: Added an Example of Kafka to AWS Bedrock Text Sink

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-aws-bedrock
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit 3b15d8bc19e8fc8693d3ea66591fef8df0663de1
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Mar 7 07:07:21 2024 +0100

    Added an Example of Kafka to AWS Bedrock Text Sink
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 jbang/kafka-aws-bedrock/README.adoc                | 184 +++++++++++++++++++++
 .../kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml |  35 ++++
 jbang/kafka-aws-bedrock/prompt1.txt                |   1 +
 jbang/kafka-aws-bedrock/prompt2.txt                |   1 +
 4 files changed, 221 insertions(+)

diff --git a/jbang/kafka-aws-bedrock/README.adoc b/jbang/kafka-aws-bedrock/README.adoc
new file mode 100644
index 0000000..670647c
--- /dev/null
+++ b/jbang/kafka-aws-bedrock/README.adoc
@@ -0,0 +1,184 @@
+== Kafka to AWS Bedrock Text Sink Kamelet
+
+In this sample you'll use the Kafka Source Kamelet and the AWS Bedrock Text Sink Kamelet
+
+=== Install JBang
+
+First install JBang according to https://www.jbang.dev
+
+When JBang is installed then you should be able to run from a shell:
+
+[source,sh]
+----
+$ jbang --version
+----
+
+This will output the version of JBang.
+
+To run this example you can either install Camel on JBang via:
+
+[source,sh]
+----
+$ jbang app install camel@apache/camel
+----
+
+Which allows to run CamelJBang with `camel` as shown below.
+
+=== Setup Kafka instance
+
+You'll need to run a Kafka cluster to point to. In this case you could use an ansible role like https://github.com/oscerd/kafka-ansible-role
+
+And set up a file deploy.yaml with the following content:
+
+```yaml
+- name: role kafka
+  hosts: localhost
+  remote_user: user
+  
+  roles:
+    - role: kafka-ansible-role
+      kafka_version: 3.6.1
+      path_dir: /home/user/
+      unarchive_dest_dir: /home/user/kafka/demo/
+      start_kafka: true
+```
+
+and then run
+
+```shell script
+ansible-playbook -v deploy.yaml
+```
+
+This should start a Kafka instance for you, on your local machine.
+
+=== Set up AWS Bedrock
+
+The Kamelet expects the AWS credentials to be placed in `/home/<user>/.aws/credentials`
+
+That's reason why you see the useDefaultCredentialsProvider option set to true in the AWS Bedrock Text Sink configuration.
+
+=== How to run
+
+Then you can run this example using:
+
+[source,sh]
+----
+$ jbang -Dcamel.jbang.version=4.5.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<local_path_to_kamelets> kafka-aws-bedrock.camel.yaml
+----
+
+
+=== Sending two messages
+
+In the log you should see the consumer:
+
+[source,sh]
+----
+2024-03-07 06:50:53.517  INFO 11498 --- [           main] el.impl.engine.AbstractCamelContext : Routes startup (total:1 started:1 kamelets:2)
+2024-03-07 06:50:53.517  INFO 11498 --- [           main] el.impl.engine.AbstractCamelContext :     Started route1 (kamelet://kafka-not-secured-source)
+2024-03-07 06:50:53.517  INFO 11498 --- [           main] el.impl.engine.AbstractCamelContext : Apache Camel 4.5.0-SNAPSHOT (kafka-aws-bedrock) started in 817ms (build:0ms init:0ms start:817ms)
+2024-03-07 06:50:53.621  INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
+2024-03-07 06:50:53.622  INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
+2024-03-07 06:50:53.622  INFO 11498 --- [[bedrock-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1709790653615
+2024-03-07 06:50:53.634  INFO 11498 --- [[bedrock-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2024-03-07 06:50:53.634  INFO 11498 --- [[bedrock-topic]] l.component.kafka.KafkaFetchRecords : Subscribing bedrock-topic-Thread 0 to topic bedrock-topic
+2024-03-07 06:50:53.636  INFO 11498 --- [[bedrock-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Subscribed to topic(s): bedrock-topic
+2024-03-07 06:50:54.023  WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 2 : {bedrock-topic=LEADER_NOT_AVAILABLE}
+2024-03-07 06:50:54.025  INFO 11498 --- [[bedrock-topic]] org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Cluster ID: Wq0puR18So6u4dXsKJnMNg
+2024-03-07 06:50:54.140  WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 4 : {bedrock-topic=LEADER_NOT_AVAILABLE}
+2024-03-07 06:50:54.263  WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 6 : {bedrock-topic=LEADER_NOT_AVAILABLE}
+2024-03-07 06:50:54.371  WARN 11498 --- [[bedrock-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata with correlation id 8 : {bedrock-topic=LEADER_NOT_AVAILABLE}
+2024-03-07 06:50:54.784  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
+2024-03-07 06:50:54.789  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group
+2024-03-07 06:50:54.821  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: need to re-join with the given member-id: consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276
+2024-03-07 06:50:54.823  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
+2024-03-07 06:50:54.823  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group
+2024-03-07 06:50:54.846  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully joined group with generation Generation{generationId=1, memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276', protocol='range'}
+2024-03-07 06:50:54.857  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Finished assignment for group at generation 1: {consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276=Assignment(partitions=[bedrock-topic-0])}
+2024-03-07 06:50:54.934  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully synced group in generation Generation{generationId=1, memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276', protocol='range'}
+2024-03-07 06:50:54.935  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Notifying assignor about the new Assignment(partitions=[bedrock-topic-0])
+2024-03-07 06:50:54.937  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Adding newly assigned partitions: bedrock-topic-0
+2024-03-07 06:50:54.962  INFO 11498 --- [[bedrock-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Found no committed offset for partition bedrock-topic-0
+2024-03-07 06:50:54.989  INFO 11498 --- [[bedrock-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Resetting offset for partition bedrock-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}.
+----
+
+In the example folder you have two different messages, prompt1.txt and prompt2.txt, the first one will request the generation of a Json record and the second one an XML record. Both of them need to have an id field, a product name and a price.
+
+We use kcat to send the messages to kafka and consume them.
+
+[source,sh]
+----
+$ kcat -P -b localhost:9092 -t bedrock-topic prompt1.txt
+$ kcat -P -b localhost:9092 -t bedrock-topic prompt2.txt
+----
+
+If you look in the example folder you should see a results folder now:
+
+[source,sh]
+----
+$ cd results/
+$ cat *
+----
+
+You have two different files and if you cat the content you should see
+
+[source,sh]
+----
+```tabular-data-json
+{
+    "rows": [
+        {
+            "id": 1,
+            "product_name": "Phone 12",
+            "price": "$999"
+        },
+        {
+            "id": 2,
+            "product_name": "Phone 11",
+            "price": "$899"
+        },
+        {
+            "id": 3,
+            "product_name": "Phone X",
+            "price": "$799"
+        },
+        {
+            "id": 4,
+            "product_name": "Phone XS",
+            "price": "$699"
+        },
+        {
+            "id": 5,
+            "product_name": "Phone XR",
+            "price": "$599"
+        }
+    ]
+}
+```
+```tabular-data-xml
+<record id="1">
+  <product name="Apple iPhone 12 Pro Max">$1,099</product>
+</record>
+<record id="2">
+  <product name="Samsung Galaxy S21 Ultra 5G">$1,199</product>
+</record>
+<record id="3">
+  <product name="Google Pixel 6 Pro">$899</product>
+</record>
+<record id="4">
+  <product name="Apple iPhone 11 Pro">$999</product>
+</record>
+<record id="5">
+  <product name="Samsung Galaxy Note 20 Ultra 5G">$1,299</product>
+</record>
+```
+----
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/community/support/[let us know].
+
+We also love contributors, so
+https://camel.apache.org/community/contributing/[get involved] :-)
+
+The Camel riders!
diff --git a/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml b/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml
new file mode 100644
index 0000000..7b0680b
--- /dev/null
+++ b/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+- route:
+    from:
+      uri: "kamelet:kafka-not-secured-source"
+      parameters:
+        topic: "bedrock-topic"
+        bootstrapServers: "localhost:9092"
+        groupId: 'my-consumer-group'
+      steps:
+      - to: 
+          uri: "kamelet:aws-bedrock-text-sink"
+          parameters:
+            region: "us-east-1"
+            useDefaultCredentialsProvider: "true"
+            modelId: "amazon.titan-text-express-v1"
+      - transform:
+          simple:
+            jq: " .results[0].outputText"
+      - to: "file:./results/"
diff --git a/jbang/kafka-aws-bedrock/prompt1.txt b/jbang/kafka-aws-bedrock/prompt1.txt
new file mode 100644
index 0000000..d8def8d
--- /dev/null
+++ b/jbang/kafka-aws-bedrock/prompt1.txt
@@ -0,0 +1 @@
+{"inputText":"User: Can you please generate five Json record, with id field, product name and price? The output should be in JSON format.","textGenerationConfig":{"maxTokenCount":1024,"stopSequences":["User:"],"temperature":0,"topP":1}}
diff --git a/jbang/kafka-aws-bedrock/prompt2.txt b/jbang/kafka-aws-bedrock/prompt2.txt
new file mode 100644
index 0000000..0503ff7
--- /dev/null
+++ b/jbang/kafka-aws-bedrock/prompt2.txt
@@ -0,0 +1 @@
+{"inputText":"User: Can you please generate five XML record, with id field, product name and price? The output should be in XML format.","textGenerationConfig":{"maxTokenCount":1024,"stopSequences":["User:"],"temperature":0,"topP":1}}