You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/11/16 11:03:49 UTC

(camel-kamelets-examples) 01/04: Kafka-S3 Example with Strimzi and Camel K operator on openshift

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-s3-openshift-example
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit ae05598eb7e8ceb80b51ae58b3f859086daa050d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Nov 16 11:55:46 2023 +0100

    Kafka-S3 Example with Strimzi and Camel K operator on openshift
    
    Signed-off-by: Andrea Cosentino <an...@gmail.com>
---
 camel-k/kafka-s3/README.adoc                       | 159 +++++++++++++++++++++
 .../kafka-s3/kafka-not-secured-sink.kamelet.yaml   |  90 ++++++++++++
 .../kafka-s3/kafka-not-secured-source.kamelet.yaml | 102 +++++++++++++
 .../main.tf                                        |   4 +-
 4 files changed, 353 insertions(+), 2 deletions(-)

diff --git a/camel-k/kafka-s3/README.adoc b/camel-k/kafka-s3/README.adoc
new file mode 100644
index 0000000..c81772d
--- /dev/null
+++ b/camel-k/kafka-s3/README.adoc
@@ -0,0 +1,159 @@
+== Kafka to S3 KameletBinding example
+
+In this sample you'll use Strizi Operator and Camel K Operator on Openshift Cloud
+
+=== Setup the Strimzi Operator
+
+For the purpose of this demo we don't know authentication so we can install the Strimzi operator.
+
+Install the operator on a defined namespace name.
+
+Once the operator completes, create a Kafka instance (default) and a Kafka Topic (default, named my-topic)
+
+=== Setup the Camel K Operator
+
+On the same namespace install the Camel K operator from Apache (and choose stable-1.10 as channel)
+
+Once it is completed we are ready to go.
+
+=== Prepare a Bucket on AWS S3
+
+Create a bucket on your AWS account and choose a specific region. You can do that directly, or you can do that through AWS CLI
+
+```bash
+aws s3api create-bucket --bucket strimzi-bucket --region eu-west-1 --create-bucket-configuration LocationConstraint=eu-west-1
+```
+
+=== Install Custom Kamelets
+
+Be sure to be on the same namespace as the installed operator
+
+```bash
+oc project <namespace_name>
+```
+
+For the purpose of this demo, we need to install the two custom Kamelets not included in the catalog for 1.10.4
+
+```bash
+> oc apply -f kafka-not-secured-sink.kamelet.yaml
+> kamelet.camel.apache.org/kafka-not-secured-sink created
+```
+
+Same for the source
+
+```bash
+> oc apply -f kafka-not-secured-source.kamelet.yaml
+> kamelet.camel.apache.org/kafka-not-secured-source created
+```
+
+=== Prepare KameletBinding
+
+Now we need to setup the KameletBinding
+
+First all let's recover the bootstrapServers and the IP needed to connect to Strimzi Cluster
+
+```bash
+oc describe service my-cluster-kafka-bootstrap
+```
+
+Copy the IP and the port will be 9092.
+
+Open kafka-s3.yaml and timer-kafka.yaml
+
+and edit the bootstrapServers field with the content <IP>:9092
+
+the topic will be "my-topic".
+
+For the S3 part in kafka-s3.yaml file just add the correct accessKey, secretKey, region (eu-west-1) and as bucketNameOrArn strimzi-bucket
+
+=== Run
+
+We can now run the KameletBinding
+
+```bash
+> oc apply -f timer-kafka.yaml 
+kameletbinding.camel.apache.org/timer-kafka-binding created
+> oc apply -f kafka-s3.yaml 
+kameletbinding.camel.apache.org/kafka-s3-binding created
+```
+
+You can watch for new pods spinned up
+
+```bash
+[oscerd@ghost kafka-s3]$ oc get pods -w
+NAME                                                      READY   STATUS      RESTARTS   AGE
+camel-k-operator-bd56fcd94-mvsjp                          1/1     Running     0          68m
+my-cluster-entity-operator-79bcdb57f5-6jlbj               3/3     Running     0          56m
+my-cluster-kafka-0                                        1/1     Running     0          56m
+my-cluster-kafka-1                                        1/1     Running     0          56m
+my-cluster-kafka-2                                        1/1     Running     0          56m
+my-cluster-zookeeper-0                                    1/1     Running     0          56m
+my-cluster-zookeeper-1                                    1/1     Running     0          56m
+my-cluster-zookeeper-2                                    1/1     Running     0          56m
+strimzi-cluster-operator-v0.38.0-7b7447c99f-v52tl         1/1     Running     0          68m
+timer-kafka-binding-9485d8cb9-9mr7s                       1/1     Running     0          51s
+```
+
+At some point both the two bindings will be in Running state
+
+Let's look at logs:
+
+```bash
+> oc logs timer-kafka-binding..
+2023-11-16 10:43:03,530 INFO  [org.apa.cam.k.Runtime] (main) Apache Camel K Runtime 1.15.2
+2023-11-16 10:43:03,552 INFO  [org.apa.cam.qua.cor.CamelBootstrapRecorder] (main) Bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime
+2023-11-16 10:43:03,555 INFO  [org.apa.cam.mai.MainSupport] (main) Apache Camel (Main) 3.18.3 is starting
+2023-11-16 10:43:03,604 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='camel-k-embedded-flow', language='yaml', type='source', location='file:/etc/camel/sources/camel-k-embedded-flow.yaml', }
+2023-11-16 10:43:03,664 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='kafka-not-secured-sink', language='yaml', type='source', location='file:/etc/camel/sources/kafka-not-secured-sink.yaml', }
+2023-11-16 10:43:03,684 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='timer-source', language='yaml', type='source', location='file:/etc/camel/sources/timer-source.yaml', }
+2023-11-16 10:43:03,925 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.18.3 (camel-1) is starting
+2023-11-16 10:43:04,131 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Routes startup (started:3)
+2023-11-16 10:43:04,132 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started binding (kamelet://timer-source/source)
+2023-11-16 10:43:04,132 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started source (timer://tick)
+2023-11-16 10:43:04,132 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started sink (kamelet://source)
+2023-11-16 10:43:04,133 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.18.3 (camel-1) started in 334ms (build:0ms init:127ms start:207ms)
+2023-11-16 10:43:04,149 INFO  [io.quarkus] (main) camel-k-integration 1.10.4 on JVM (powered by Quarkus 2.13.4.Final) started in 2.815s. 
+2023-11-16 10:43:04,149 INFO  [io.quarkus] (main) Profile prod activated. 
+2023-11-16 10:43:04,149 INFO  [io.quarkus] (main) Installed features: [camel-bean, camel-core, camel-k-core, camel-k-runtime, camel-kafka, camel-kamelet, camel-kubernetes, camel-timer, camel-yaml-dsl, cdi, kafka-client, kubernetes-client, security]
+```
+
+For Kafka-s3 instead
+
+```bash
+> oc logs kafka-s3-binding..
+2023-11-16 10:45:53,138 INFO  [org.apa.cam.k.Runtime] (main) Apache Camel K Runtime 1.15.2
+2023-11-16 10:45:53,546 INFO  [org.apa.cam.qua.cor.CamelBootstrapRecorder] (main) Bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime
+2023-11-16 10:45:53,552 INFO  [org.apa.cam.mai.MainSupport] (main) Apache Camel (Main) 3.18.3 is starting
+2023-11-16 10:45:53,607 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='camel-k-embedded-flow', language='yaml', type='source', location='file:/etc/camel/sources/camel-k-embedded-flow.yaml', }
+2023-11-16 10:45:53,658 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='aws-s3-sink', language='yaml', type='source', location='file:/etc/camel/sources/aws-s3-sink.yaml', }
+2023-11-16 10:45:53,664 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='kafka-not-secured-source', language='yaml', type='source', location='file:/etc/camel/sources/kafka-not-secured-source.yaml', }
+2023-11-16 10:45:53,955 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.18.3 (camel-1) is starting
+2023-11-16 10:45:55,253 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (main) Starting Kafka consumer on topic: my-topic with breakOnFirstError: false
+2023-11-16 10:45:55,271 INFO  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[my-topic]) Connecting Kafka consumer thread ID my-topic-Thread 0 with poll timeout of 5000 ms
+2023-11-16 10:45:55,276 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Routes startup (started:3)
+2023-11-16 10:45:55,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started binding (kamelet://kafka-not-secured-source/source)
+2023-11-16 10:45:55,279 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started source (kafka://my-topic)
+2023-11-16 10:45:55,279 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started sink (kamelet://source)
+2023-11-16 10:45:55,282 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.18.3 (camel-1) started in 1s467ms (build:0ms init:141ms start:1s326ms)
+2023-11-16 10:45:55,288 INFO  [io.quarkus] (main) camel-k-integration 1.10.4 on JVM (powered by Quarkus 2.13.4.Final) started in 3.968s. 
+2023-11-16 10:45:55,288 INFO  [io.quarkus] (main) Profile prod activated. 
+2023-11-16 10:45:55,288 INFO  [io.quarkus] (main) Installed features: [camel-aws2-s3, camel-bean, camel-core, camel-k-core, camel-k-runtime, camel-kafka, camel-kamelet, camel-kubernetes, camel-yaml-dsl, cdi, kafka-client, kubernetes-client, security]
+2023-11-16 10:45:55,497 INFO  [org.apa.cam.com.kaf.con.sup.ResumeStrategyFactory] (Camel (camel-1) thread #1 - KafkaConsumer[my-topic]) Using NO-OP resume strategy
+2023-11-16 10:45:55,498 INFO  [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel (camel-1) thread #1 - KafkaConsumer[my-topic]) Subscribing my-topic-Thread 0 to topic my-topic
+```
+
+Both of them are running.
+
+Let's now check the content of the s3 bucket.
+
+```bash
+> aws s3 ls s3://strimzi-bucket --recursive --human-readable --summarize
+2023-11-16 11:52:32   10 Bytes 701027CE70E080F-0000000000000000
+2023-11-16 11:53:01   10 Bytes 701027CE70E080F-0000000000000001
+2023-11-16 11:53:31   10 Bytes 701027CE70E080F-0000000000000002
+2023-11-16 11:54:02   10 Bytes 701027CE70E080F-0000000000000003
+2023-11-16 11:54:31   10 Bytes 701027CE70E080F-0000000000000004
+
+Total Objects: 5
+   Total Size: 50 Bytes
+```
diff --git a/camel-k/kafka-s3/kafka-not-secured-sink.kamelet.yaml b/camel-k/kafka-s3/kafka-not-secured-sink.kamelet.yaml
new file mode 100644
index 0000000..d7ba763
--- /dev/null
+++ b/camel-k/kafka-s3/kafka-not-secured-sink.kamelet.yaml
@@ -0,0 +1,90 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: kafka-not-secured-sink
+  annotations:
+    camel.apache.org/kamelet.support.level: "Stable"
+    camel.apache.org/catalog.version: "0.9.4"
+    camel.apache.org/kamelet.icon: " [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "Kafka"
+  labels:
+    camel.apache.org/kamelet.type: "sink"
+spec:
+  definition:
+    title: "Kafka Not Secured Sink"
+    description: |-
+      Send data to Kafka topics on an insecure broker.
+
+      The Kamelet is able to understand the following headers to be set:
+
+      - `key` / `ce-key`: as message key
+    
+      - `partition-key` / `ce-partitionkey`: as message partition key
+
+      Both the headers are optional.
+    required:
+      - topic
+      - bootstrapServers
+    type: object
+    properties:
+      topic:
+        title: Topic Names
+        description: Comma separated list of Kafka topic names
+        type: string
+      bootstrapServers:
+        title: Bootstrap Servers
+        description: Comma separated list of Kafka Broker URLs
+        type: string
+  dependencies:
+    - "camel:core"
+    - "camel:kafka"
+    - "camel:kamelet"
+  template:
+    from:
+      uri: "kamelet:source"
+      steps:
+      - choice:
+          when:
+          - simple: "${header[key]}"
+            steps:
+            - set-header:
+                name: kafka.KEY
+                simple: "${header[key]}"
+          - simple: "${header[ce-key]}"
+            steps:
+            - set-header:
+                name: kafka.KEY
+                simple: "${header[ce-key]}"
+      - choice:
+          when:
+          - simple: "${header[partition-key]}"
+            steps:
+            - set-header:
+                name: kafka.PARTITION_KEY
+                simple: "${header[partition-key]}"
+          - simple: "${header[ce-partitionkey]}"
+            steps:
+            - set-header:
+                name: kafka.PARTITION_KEY
+                simple: "${header[ce-partitionkey]}"
+      - to:
+          uri: "kafka:{{topic}}"
+          parameters:
+            brokers: "{{bootstrapServers}}"
diff --git a/camel-k/kafka-s3/kafka-not-secured-source.kamelet.yaml b/camel-k/kafka-s3/kafka-not-secured-source.kamelet.yaml
new file mode 100644
index 0000000..a84538b
--- /dev/null
+++ b/camel-k/kafka-s3/kafka-not-secured-source.kamelet.yaml
@@ -0,0 +1,102 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+apiVersion: camel.apache.org/v1alpha1
+kind: Kamelet
+metadata:
+  name: kafka-not-secured-source
+  annotations:
+    camel.apache.org/kamelet.support.level: "Stable"
+    camel.apache.org/catalog.version: "0.9.4"
+    camel.apache.org/kamelet.icon: " [...]
+    camel.apache.org/provider: "Apache Software Foundation"
+    camel.apache.org/kamelet.group: "Kafka"
+    camel.apache.org/keda.type: "kafka"
+  labels:
+    camel.apache.org/kamelet.type: "source"
+spec:
+  definition:
+    title: "Kafka Not Secured Source"
+    description: |-
+      Receive data from Kafka topics on an insecure broker.
+    required:
+      - topic
+      - bootstrapServers
+    type: object
+    properties:
+      topic:
+        title: Topic Names
+        description: Comma separated list of Kafka topic names
+        type: string
+        x-descriptors:
+        - urn:keda:metadata:topic
+        - urn:keda:required
+      bootstrapServers:
+        title: Bootstrap Servers
+        description: Comma separated list of Kafka Broker URLs
+        type: string
+        x-descriptors:
+        - urn:keda:metadata:bootstrapServers
+        - urn:keda:required
+      autoCommitEnable:
+        title: Auto Commit Enable
+        description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer
+        type: boolean
+        default: true
+        x-descriptors:
+        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+      allowManualCommit:
+        title: Allow Manual Commit
+        description: Whether to allow doing manual commits
+        type: boolean
+        default: false
+        x-descriptors:
+        - 'urn:alm:descriptor:com.tectonic.ui:checkbox'
+      pollOnError:
+        title: Poll On Error Behavior
+        description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP
+        type: string
+        default: "ERROR_HANDLER"
+      autoOffsetReset:
+        title: Auto Offset Reset
+        description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none
+        type: string
+        default: "latest"
+        x-descriptors:
+        - urn:keda:metadata:offsetResetPolicy
+      consumerGroup:
+        title: Consumer Group
+        description: A string that uniquely identifies the group of consumers to which this source belongs
+        type: string
+        example: "my-group-id"
+        x-descriptors:
+        - urn:keda:metadata:consumerGroup
+        - urn:keda:required
+  dependencies:
+    - "camel:kafka"
+    - "camel:kamelet"
+  template:
+    from:
+      uri: "kafka:{{topic}}"
+      parameters:
+        brokers: "{{bootstrapServers}}"
+        autoCommitEnable: "{{autoCommitEnable}}"
+        allowManualCommit: "{{allowManualCommit}}"
+        pollOnError: "{{pollOnError}}"
+        autoOffsetReset: "{{autoOffsetReset}}"
+        groupId: "{{?consumerGroup}}"
+      steps:
+      - to: "kamelet:sink"
diff --git a/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf b/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf
index ba4fe6d..ecc6592 100644
--- a/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf
+++ b/jbang/azure-eventhubs-kafka-azure-schema-registry/main.tf
@@ -33,13 +33,13 @@ provider "azurerm" {
 
 # Resource group.
 resource "azurerm_resource_group" "example" {
-  name     = "example-rg"
+  name     = "example-test12345-rg"
   location = "West Europe"
 }
 
 # Eventhubs Namepsace.
 resource "azurerm_eventhub_namespace" "example" {
-  name                = "example-namespace"
+  name                = "example-test12345-namespace"
   location            = azurerm_resource_group.example.location
   resource_group_name = azurerm_resource_group.example.name
   sku                 = "Standard"