You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/02/09 07:46:37 UTC

[camel-k-runtime] 01/02: Added a Java example of kafka source to AWS S3

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 43594a61c9b4375ad7525ddad930affb6f71e1be
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Feb 9 08:33:56 2021 +0100

    Added a Java example of kafka source to AWS S3
---
 examples/kafka-source-s3/README.md                 | 141 +++++++++++++++++++++
 examples/kafka-source-s3/data/MyRoutes.java        |  26 ++++
 .../kafka-source-s3/data/application.properties    |  44 +++++++
 examples/kafka-source-s3/pom.xml                   | 105 +++++++++++++++
 .../src/main/resources/application.properties      |  16 +++
 examples/pom.xml                                   |   1 +
 6 files changed, 333 insertions(+)

diff --git a/examples/kafka-source-s3/README.md b/examples/kafka-source-s3/README.md
new file mode 100644
index 0000000..0f75ac0
--- /dev/null
+++ b/examples/kafka-source-s3/README.md
@@ -0,0 +1,141 @@
+== Camel-K-runtime Java Example for kafka consumer to AWS S3
+
+This example shows the usage of Camel-k-runtime for kafka consumer to AWS S3
+
+The route involves kafka and aws2-s3 component
+
+=== Setup
+
+You'll need to have a kafka instance running on your machine or in docker.
+You'll need AWS Credentials.
+
+=== How to run
+
+You have two ways of doing this.
+
+First approach:
+
+    mvn exec:exec
+
+This approach will pack and run a camel-quarkus runner.
+
+Second approach
+
+    mvn clean package
+    export CAMEL_K_CONF=${project.basedir}/data/application.properties
+    export CAMEL_K_ROUTES=file:${project.basedir}/data/MyRoutes.java
+    java -jar target/camel-k-runtime-example-java-runner.jar
+
+You should get the following output in both cases
+
+2021-02-09 08:27:13,463 INFO  [org.apa.cam.k.Runtime] (main) Apache Camel K Runtime 1.7.0-SNAPSHOT
+2021-02-09 08:27:13,482 INFO  [org.apa.cam.qua.cor.CamelBootstrapRecorder] (main) bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime
+2021-02-09 08:27:13,488 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: file:/home/oscerd/workspace/apache-camel/camel-k-runtime/examples/kafka-source-s3/data/MyRoutes.java
+2021-02-09 08:27:14,345 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.7.0 (camel-q) is starting
+2021-02-09 08:27:14,345 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) StreamCaching is enabled on CamelContext: camel-q
+2021-02-09 08:27:15,724 INFO  [org.apa.cam.imp.eng.DefaultStreamCachingStrategy] (main) StreamCaching in use with spool directory: /tmp/camel-q and rules: [Spool > 128K body size]
+2021-02-09 08:27:15,725 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (main) Starting Kafka consumer on topic: testtopic with breakOnFirstError: false
+2021-02-09 08:27:15,747 INFO  [org.apa.kaf.cli.con.ConsumerConfig] (main) ConsumerConfig values: 
+	allow.auto.create.topics = true
+	auto.commit.interval.ms = 5000
+	auto.offset.reset = latest
+	bootstrap.servers = [localhost:9092]
+	check.crcs = true
+	client.dns.lookup = default
+	client.id = 
+	client.rack = 
+	connections.max.idle.ms = 540000
+	default.api.timeout.ms = 60000
+	enable.auto.commit = true
+	exclude.internal.topics = true
+	fetch.max.bytes = 52428800
+	fetch.max.wait.ms = 500
+	fetch.min.bytes = 1
+	group.id = 94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b
+	group.instance.id = null
+	heartbeat.interval.ms = 3000
+	interceptor.classes = []
+	internal.leave.group.on.close = true
+	isolation.level = read_uncommitted
+	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
+	max.partition.fetch.bytes = 1048576
+	max.poll.interval.ms = 300000
+	max.poll.records = 500
+	metadata.max.age.ms = 300000
+	metric.reporters = []
+	metrics.num.samples = 2
+	metrics.recording.level = INFO
+	metrics.sample.window.ms = 30000
+	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
+	receive.buffer.bytes = 65536
+	reconnect.backoff.max.ms = 1000
+	reconnect.backoff.ms = 50
+	request.timeout.ms = 40000
+	retry.backoff.ms = 100
+	sasl.client.callback.handler.class = null
+	sasl.jaas.config = null
+	sasl.kerberos.kinit.cmd = /usr/bin/kinit
+	sasl.kerberos.min.time.before.relogin = 60000
+	sasl.kerberos.service.name = null
+	sasl.kerberos.ticket.renew.jitter = 0.05
+	sasl.kerberos.ticket.renew.window.factor = 0.8
+	sasl.login.callback.handler.class = null
+	sasl.login.class = null
+	sasl.login.refresh.buffer.seconds = 300
+	sasl.login.refresh.min.period.seconds = 60
+	sasl.login.refresh.window.factor = 0.8
+	sasl.login.refresh.window.jitter = 0.05
+	sasl.mechanism = GSSAPI
+	security.protocol = PLAINTEXT
+	security.providers = null
+	send.buffer.bytes = 131072
+	session.timeout.ms = 10000
+	ssl.cipher.suites = null
+	ssl.enabled.protocols = [TLSv1.2]
+	ssl.endpoint.identification.algorithm = https
+	ssl.key.password = null
+	ssl.keymanager.algorithm = SunX509
+	ssl.keystore.location = null
+	ssl.keystore.password = null
+	ssl.keystore.type = JKS
+	ssl.protocol = TLSv1.2
+	ssl.provider = null
+	ssl.secure.random.implementation = null
+	ssl.trustmanager.algorithm = PKIX
+	ssl.truststore.location = null
+	ssl.truststore.password = null
+	ssl.truststore.type = JKS
+	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
+
+2021-02-09 08:27:15,850 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (main) The configuration 'specific.avro.reader' was supplied but isn't a known config.
+2021-02-09 08:27:15,851 INFO  [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka version: 2.5.0
+2021-02-09 08:27:15,851 INFO  [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka commitId: 66563e712b0b9f84
+2021-02-09 08:27:15,851 INFO  [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka startTimeMs: 1612855635850
+2021-02-09 08:27:15,854 INFO  [org.apa.cam.imp.eng.InternalRouteStartupManager] (main) Route: route1 started and consuming from: kafka://testtopic
+2021-02-09 08:27:15,854 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) Subscribing testtopic-Thread 0 to topic testtopic
+2021-02-09 08:27:15,855 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Subscribed to topic(s): testtopic
+2021-02-09 08:27:15,857 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Total 1 routes, of which 1 are started
+2021-02-09 08:27:15,857 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.7.0 (camel-q) started in 1s512ms
+2021-02-09 08:27:15,860 INFO  [io.quarkus] (main) camel-k-runtime-example-java 1.7.0-SNAPSHOT on JVM (powered by Quarkus 1.11.0.Final) started in 2.798s. 
+2021-02-09 08:27:15,860 INFO  [io.quarkus] (main) Profile prod activated. 
+2021-02-09 08:27:15,861 INFO  [io.quarkus] (main) Installed features: [camel-aws2-commons, camel-aws2-s3, camel-bean, camel-core, camel-endpointdsl, camel-k-core, camel-k-loader-java, camel-k-runtime, camel-kafka, camel-main, camel-support-common, camel-support-commons-logging, cdi]
+2021-02-09 08:27:16,036 INFO  [org.apa.kaf.cli.Metadata] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Cluster ID: ujmZ7YiORXCtQJ2h9USuEw
+2021-02-09 08:27:16,037 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
+2021-02-09 08:27:16,045 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] (Re-)joining group
+2021-02-09 08:27:16,052 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
+2021-02-09 08:27:16,052 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] (Re-)joining group
+2021-02-09 08:27:16,056 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Finished assignment for group at generation 1: {consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1-bde70b4e-c3e5-4e67-b832-b258cbaad60d=Assignment(partitions=[testtopic-0])}
+2021-02-09 08:27:16,062 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Successfully joined group with generation 1
+2021-02-09 08:27:16,065 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Adding newly assigned partitions: testtopic-0
+2021-02-09 08:27:16,072 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Found no committed offset for partition testtopic-0
+2021-02-09 08:27:16,083 INFO  [org.apa.kaf.cli.con.int.SubscriptionState] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Resetting offset for partition testtopic-0 to offset 3.
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/support.html[let us know].
+
+We also love contributors, so
+https://camel.apache.org/contributing.html[get involved] :-)
+
+The Camel riders!
diff --git a/examples/kafka-source-s3/data/MyRoutes.java b/examples/kafka-source-s3/data/MyRoutes.java
new file mode 100644
index 0000000..104e16f
--- /dev/null
+++ b/examples/kafka-source-s3/data/MyRoutes.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.camel.builder.RouteBuilder;
+
+public class MyRoutes extends RouteBuilder {
+    @Override
+    public void configure() throws Exception {
+        from("kafka:{{kafkatopic}}")
+            .setHeader("CamelAwsS3Key",simple("${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}"))
+            .to("aws2-s3:camel-kafka-connector");
+    }
+}
diff --git a/examples/kafka-source-s3/data/application.properties b/examples/kafka-source-s3/data/application.properties
new file mode 100644
index 0000000..cea7485
--- /dev/null
+++ b/examples/kafka-source-s3/data/application.properties
@@ -0,0 +1,44 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# quarkus
+#
+quarkus.banner.enabled = false
+quarkus.log.level = INFO
+quarkus.log.category."org.apache.camel".level = INFO
+
+#
+# camel - main
+#
+camel.main.name = camel-q
+camel.main.stream-caching-enabled = true
+camel.main.stream-caching-spool-directory = ${java.io.tmpdir}/camel-q
+
+#
+# camel - aws2-s3
+#
+camel.component.aws2-s3.accessKey=xxxx
+camel.component.aws2-s3.secretKey=yyyy
+camel.component.aws2-s3.region=region
+
+#
+# camel - kafka
+#
+kafkatopic=testtopic
+camel.component.kafka.brokers=localhost:9092
+
diff --git a/examples/kafka-source-s3/pom.xml b/examples/kafka-source-s3/pom.xml
new file mode 100644
index 0000000..57a9e77
--- /dev/null
+++ b/examples/kafka-source-s3/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.apache.camel.k</groupId>
+        <artifactId>camel-k-runtime-examples</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>camel-k-runtime-example-java-kafka-s3</artifactId>
+
+    <properties>
+        <noDeps>true</noDeps>
+        <quarkus.camel.main.routes-discovery.enabled>false</quarkus.camel.main.routes-discovery.enabled>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.k</groupId>
+            <artifactId>camel-k-runtime</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.k</groupId>
+            <artifactId>camel-k-loader-java</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-aws2-s3</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>io.quarkus</groupId>
+                <artifactId>quarkus-bootstrap-maven-plugin</artifactId>
+                <version>${quarkus-version}</version>
+            </plugin>
+            <plugin>
+                <groupId>io.quarkus</groupId>
+                <artifactId>quarkus-maven-plugin</artifactId>
+                <version>${quarkus-version}</version>
+                <executions>
+                    <execution>
+                        <id>build</id>
+                        <goals>
+                            <goal>build</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${project.artifactId}</finalName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>${exec-maven-plugin-version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <executable>java</executable>
+                    <workingDirectory>${project.basedir}</workingDirectory>
+                    <arguments>
+                        <argument>-jar</argument>
+                        <argument>${project.build.directory}/${project.artifactId}-runner.jar</argument>
+                    </arguments>
+                    <environmentVariables>
+                        <CAMEL_K_CONF>${project.basedir}/data/application.properties</CAMEL_K_CONF>
+                        <CAMEL_K_ROUTES>file:${project.basedir}/data/MyRoutes.java</CAMEL_K_ROUTES>
+                    </environmentVariables>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/examples/kafka-source-s3/src/main/resources/application.properties b/examples/kafka-source-s3/src/main/resources/application.properties
new file mode 100644
index 0000000..fa7a54b
--- /dev/null
+++ b/examples/kafka-source-s3/src/main/resources/application.properties
@@ -0,0 +1,16 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 9e000a0..624a698 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -40,6 +40,7 @@
         <module>kotlin</module>
         <module>groovy</module>
         <module>java</module>
+        <module>kafka-source-s3</module>
         <module>xml</module>
         <module>cron</module>
     </modules>