You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/08/02 08:42:48 UTC

[camel-k-examples] 01/02: Kafka to AWS S3 Streaming Upload Example

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-to-s3-streaming
in repository https://gitbox.apache.org/repos/asf/camel-k-examples.git

commit 44f1234f6d5ffa384226c456a9b5bf44e3721e20
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Aug 2 10:40:53 2021 +0200

    Kafka to AWS S3 Streaming Upload Example
---
 kamelets/kafka-to-s3-streaming-upload/README.md    | 73 ++++++++++++++++++++++
 .../kafka-to-s3-streaming-upload/flow-binding.yaml | 26 ++++++++
 2 files changed, 99 insertions(+)

diff --git a/kamelets/kafka-to-s3-streaming-upload/README.md b/kamelets/kafka-to-s3-streaming-upload/README.md
new file mode 100644
index 0000000..2dd82e4
--- /dev/null
+++ b/kamelets/kafka-to-s3-streaming-upload/README.md
@@ -0,0 +1,73 @@
+# Kafka to LOG 
+
+- Use the quickstart for https://strimzi.io/quickstarts/ and follow the minikube guide.
+
+- Install camel-k on the kafka namespace
+
+- Add the correct credentials in the flow binding file for AWS S3 service. Don't forget to create the kamelets-demo bucket in the region you select.
+
+- Run the following commands
+
+kubectl apply -f flow-binding.yaml -n kafka
+
+- Check logs
+
+kamel logs kafka-to-s3-streaming-upload -n kafka
+
+    [1] 2021-07-30 05:45:28,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Routes startup summary (total:3 started:3)
+    [1] 2021-07-30 05:45:28,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started route1 (kamelet://kafka-not-secured-source/source)
+    [1] 2021-07-30 05:45:28,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started source (kafka://test-topic)
+    [1] 2021-07-30 05:45:28,277 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main)     Started sink (kamelet://source)
+    [1] 2021-07-30 05:45:28,278 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.11.0 (camel-1) started in 1s751ms (build:0ms init:75ms start:1s676ms)
+    [1] 2021-07-30 05:45:28,281 INFO  [io.quarkus] (main) camel-k-integration 1.5.0 on JVM (powered by Quarkus 2.0.0.Final) started in 7.710s. 
+    [1] 2021-07-30 05:45:28,281 INFO  [io.quarkus] (main) Profile prod activated. 
+    [1] 2021-07-30 05:45:28,282 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'sasl.kerberos.ticket.renew.window.factor' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,284 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'sasl.kerberos.kinit.cmd' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,285 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'specific.avro.reader' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,284 INFO  [io.quarkus] (main) Installed features: [camel-aws2-commons, camel-aws2-s3, camel-bean, camel-core, camel-k-core, camel-k-runtime, camel-kafka, camel-kamelet, camel-support-common, camel-support-commons-logging, camel-support-httpclient, camel-yaml-dsl, cdi, kafka-client]
+    [1] 2021-07-30 05:45:28,287 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'sasl.kerberos.ticket.renew.jitter' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,288 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.trustmanager.algorithm' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,288 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.keystore.type' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,289 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'sasl.kerberos.min.time.before.relogin' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,289 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.endpoint.identification.algorithm' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,289 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.protocol' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,290 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.enabled.protocols' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,291 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.truststore.type' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,291 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) The configuration 'ssl.keymanager.algorithm' was supplied but isn't a known config.
+    [1] 2021-07-30 05:45:28,292 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Kafka version: 2.8.0
+    [1] 2021-07-30 05:45:28,292 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Kafka commitId: ebb1d6e21cc92130
+    [1] 2021-07-30 05:45:28,292 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Kafka startTimeMs: 1627623928292
+    [1] 2021-07-30 05:45:28,292 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Subscribing test-topic-Thread 0 to topic test-topic
+    [1] 2021-07-30 05:45:28,293 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Subscribed to topic(s): test-topic
+    [1] 2021-07-30 05:45:28,582 WARN  [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE}
+    [1] 2021-07-30 05:45:28,583 INFO  [org.apa.kaf.cli.Metadata] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Cluster ID: r6q2BGnHT7awUAK0diO1hA
+    [1] 2021-07-30 05:45:28,585 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Discovered group coordinator my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 2147483647 rack: null)
+    [1] 2021-07-30 05:45:28,635 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] (Re-)joining group
+    [1] 2021-07-30 05:45:28,755 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] (Re-)joining group
+    [1] 2021-07-30 05:45:28,759 WARN  [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Error while fetching metadata with correlation id 6 : {test-topic=LEADER_NOT_AVAILABLE}
+    [1] 2021-07-30 05:45:28,868 WARN  [org.apa.kaf.cli.NetworkClient] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Error while fetching metadata with correlation id 8 : {test-topic=LEADER_NOT_AVAILABLE}
+    [1] 2021-07-30 05:45:31,766 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Successfully joined group with generation Generation{generationId=1, memberId='consumer-camel-k-integration-2-a03ed9eb-5a45-4170-b519-97f9a32e019d', protocol='range'}
+    [1] 2021-07-30 05:45:31,772 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Finished assignment for group at generation 1: {consumer-camel-k-integration-2-a03ed9eb-5a45-4170-b519-97f9a32e019d=Assignment(partitions=[test-topic-0])}
+    [1] 2021-07-30 05:45:31,790 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Successfully synced group in generation Generation{generationId=1, memberId='consumer-camel-k-integration-2-a03ed9eb-5a45-4170-b519-97f9a32e019d', protocol='range'}
+    [1] 2021-07-30 05:45:31,791 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Notifying assignor about the new Assignment(partitions=[test-topic-0])
+    [1] 2021-07-30 05:45:31,797 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Adding newly assigned partitions: test-topic-0
+    [1] 2021-07-30 05:45:31,808 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Found no committed offset for partition test-topic-0
+    [1] 2021-07-30 05:45:31,818 INFO  [org.apa.kaf.cli.con.int.SubscriptionState] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) [Consumer clientId=consumer-camel-k-integration-2, groupId=camel-k-integration] Resetting offset for partition test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9092 (id: 0 rack: null)], epoch=0}}.
+
+- Send some data to Kafka topic
+
+Run the following command
+
+kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.24.0-kafka-2.8.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic test-topic
+If you don't see a command prompt, try pressing enter.
+>l1 
+>l2 
+>l3 
+>l4 
+>l5 
+
+- In the integration logs you should now read 
+
+    [1] 2021-07-30 05:46:12,156 INFO  [org.apa.cam.com.aws.s3.str.AWS2S3StreamUploadProducer] (Camel (camel-1) thread #0 - KafkaConsumer[test-topic]) Completed upload for the part 1 with etag "03204a1ad503637fc7b6cef6ac290fd4-1" at index 5
+
+- If you go into the kamelets-demo bucket of your account, you should see only one file KafkaTestFile.txt, containing the file messages concatenated.
diff --git a/kamelets/kafka-to-s3-streaming-upload/flow-binding.yaml b/kamelets/kafka-to-s3-streaming-upload/flow-binding.yaml
new file mode 100644
index 0000000..2681656
--- /dev/null
+++ b/kamelets/kafka-to-s3-streaming-upload/flow-binding.yaml
@@ -0,0 +1,26 @@
+apiVersion: camel.apache.org/v1alpha1
+kind: KameletBinding
+metadata:
+  name: kafka-to-s3-streaming-upload
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: kafka-not-secured-source
+    properties:
+      brokers: 'my-cluster-kafka-bootstrap:9092'
+      topic: 'test-topic'
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1alpha1
+      name: aws-s3-streaming-upload-sink
+    properties:
+      bucketNameOrArn: 'kamelets-demo'
+      accessKey: 'access'
+      secretKey: 'secret'
+      region: 'region'
+      batchSize: '1000000'
+      batchMessageNumber: 5
+      keyName: KafkaTestFile.txt