You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/02/09 07:34:31 UTC

[camel-k-runtime] branch kafka-source created (now ce4727b)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch kafka-source
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git.


      at ce4727b  Added a Java example of kafka source to AWS S3

This branch includes the following new commits:

     new ce4727b  Added a Java example of kafka source to AWS S3

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-k-runtime] 01/01: Added a Java example of kafka source to AWS S3

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-source
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit ce4727bc6a7433b2712e951c53b46b8af322d592
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Feb 9 08:33:56 2021 +0100

    Added a Java example of kafka source to AWS S3
---
 examples/kafka-source-s3/README.md                 | 141 +++++++++++++++++++++
 examples/kafka-source-s3/data/MyRoutes.java        |  26 ++++
 .../kafka-source-s3/data/application.properties    |  44 +++++++
 examples/kafka-source-s3/pom.xml                   | 105 +++++++++++++++
 .../src/main/resources/application.properties      |  16 +++
 examples/pom.xml                                   |   1 +
 6 files changed, 333 insertions(+)

diff --git a/examples/kafka-source-s3/README.md b/examples/kafka-source-s3/README.md
new file mode 100644
index 0000000..0f75ac0
--- /dev/null
+++ b/examples/kafka-source-s3/README.md
@@ -0,0 +1,141 @@
+== Camel-K-runtime Java Example for kafka consumer to AWS S3
+
+This example shows the usage of Camel-k-runtime for kafka consumer to AWS S3
+
+The route involves kafka and aws2-s3 component
+
+=== Setup
+
+You'll need to have a kafka instance running on your machine or in docker.
+You'll need AWS Credentials.
+
+=== How to run
+
+You have two ways of doing this.
+
+First approach:
+
+    mvn exec:exec
+
+This approach will pack and run a camel-quarkus runner.
+
+Second approach
+
+    mvn clean package
+    export CAMEL_K_CONF=${project.basedir}/data/application.properties
+    export CAMEL_K_ROUTES=file:${project.basedir}/data/MyRoutes.java
+    java -jar target/camel-k-runtime-example-java-runner.jar
+
+You should get the following output in both cases
+
+2021-02-09 08:27:13,463 INFO  [org.apa.cam.k.Runtime] (main) Apache Camel K Runtime 1.7.0-SNAPSHOT
+2021-02-09 08:27:13,482 INFO  [org.apa.cam.qua.cor.CamelBootstrapRecorder] (main) bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime
+2021-02-09 08:27:13,488 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: file:/home/oscerd/workspace/apache-camel/camel-k-runtime/examples/kafka-source-s3/data/MyRoutes.java
+2021-02-09 08:27:14,345 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.7.0 (camel-q) is starting
+2021-02-09 08:27:14,345 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) StreamCaching is enabled on CamelContext: camel-q
+2021-02-09 08:27:15,724 INFO  [org.apa.cam.imp.eng.DefaultStreamCachingStrategy] (main) StreamCaching in use with spool directory: /tmp/camel-q and rules: [Spool > 128K body size]
+2021-02-09 08:27:15,725 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (main) Starting Kafka consumer on topic: testtopic with breakOnFirstError: false
+2021-02-09 08:27:15,747 INFO  [org.apa.kaf.cli.con.ConsumerConfig] (main) ConsumerConfig values: 
+	allow.auto.create.topics = true
+	auto.commit.interval.ms = 5000
+	auto.offset.reset = latest
+	bootstrap.servers = [localhost:9092]
+	check.crcs = true
+	client.dns.lookup = default
+	client.id = 
+	client.rack = 
+	connections.max.idle.ms = 540000
+	default.api.timeout.ms = 60000
+	enable.auto.commit = true
+	exclude.internal.topics = true
+	fetch.max.bytes = 52428800
+	fetch.max.wait.ms = 500
+	fetch.min.bytes = 1
+	group.id = 94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b
+	group.instance.id = null
+	heartbeat.interval.ms = 3000
+	interceptor.classes = []
+	internal.leave.group.on.close = true
+	isolation.level = read_uncommitted
+	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
+	max.partition.fetch.bytes = 1048576
+	max.poll.interval.ms = 300000
+	max.poll.records = 500
+	metadata.max.age.ms = 300000
+	metric.reporters = []
+	metrics.num.samples = 2
+	metrics.recording.level = INFO
+	metrics.sample.window.ms = 30000
+	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
+	receive.buffer.bytes = 65536
+	reconnect.backoff.max.ms = 1000
+	reconnect.backoff.ms = 50
+	request.timeout.ms = 40000
+	retry.backoff.ms = 100
+	sasl.client.callback.handler.class = null
+	sasl.jaas.config = null
+	sasl.kerberos.kinit.cmd = /usr/bin/kinit
+	sasl.kerberos.min.time.before.relogin = 60000
+	sasl.kerberos.service.name = null
+	sasl.kerberos.ticket.renew.jitter = 0.05
+	sasl.kerberos.ticket.renew.window.factor = 0.8
+	sasl.login.callback.handler.class = null
+	sasl.login.class = null
+	sasl.login.refresh.buffer.seconds = 300
+	sasl.login.refresh.min.period.seconds = 60
+	sasl.login.refresh.window.factor = 0.8
+	sasl.login.refresh.window.jitter = 0.05
+	sasl.mechanism = GSSAPI
+	security.protocol = PLAINTEXT
+	security.providers = null
+	send.buffer.bytes = 131072
+	session.timeout.ms = 10000
+	ssl.cipher.suites = null
+	ssl.enabled.protocols = [TLSv1.2]
+	ssl.endpoint.identification.algorithm = https
+	ssl.key.password = null
+	ssl.keymanager.algorithm = SunX509
+	ssl.keystore.location = null
+	ssl.keystore.password = null
+	ssl.keystore.type = JKS
+	ssl.protocol = TLSv1.2
+	ssl.provider = null
+	ssl.secure.random.implementation = null
+	ssl.trustmanager.algorithm = PKIX
+	ssl.truststore.location = null
+	ssl.truststore.password = null
+	ssl.truststore.type = JKS
+	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
+
+2021-02-09 08:27:15,850 WARN  [org.apa.kaf.cli.con.ConsumerConfig] (main) The configuration 'specific.avro.reader' was supplied but isn't a known config.
+2021-02-09 08:27:15,851 INFO  [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka version: 2.5.0
+2021-02-09 08:27:15,851 INFO  [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka commitId: 66563e712b0b9f84
+2021-02-09 08:27:15,851 INFO  [org.apa.kaf.com.uti.AppInfoParser] (main) Kafka startTimeMs: 1612855635850
+2021-02-09 08:27:15,854 INFO  [org.apa.cam.imp.eng.InternalRouteStartupManager] (main) Route: route1 started and consuming from: kafka://testtopic
+2021-02-09 08:27:15,854 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) Subscribing testtopic-Thread 0 to topic testtopic
+2021-02-09 08:27:15,855 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Subscribed to topic(s): testtopic
+2021-02-09 08:27:15,857 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Total 1 routes, of which 1 are started
+2021-02-09 08:27:15,857 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (main) Apache Camel 3.7.0 (camel-q) started in 1s512ms
+2021-02-09 08:27:15,860 INFO  [io.quarkus] (main) camel-k-runtime-example-java 1.7.0-SNAPSHOT on JVM (powered by Quarkus 1.11.0.Final) started in 2.798s. 
+2021-02-09 08:27:15,860 INFO  [io.quarkus] (main) Profile prod activated. 
+2021-02-09 08:27:15,861 INFO  [io.quarkus] (main) Installed features: [camel-aws2-commons, camel-aws2-s3, camel-bean, camel-core, camel-endpointdsl, camel-k-core, camel-k-loader-java, camel-k-runtime, camel-kafka, camel-main, camel-support-common, camel-support-commons-logging, cdi]
+2021-02-09 08:27:16,036 INFO  [org.apa.kaf.cli.Metadata] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Cluster ID: ujmZ7YiORXCtQJ2h9USuEw
+2021-02-09 08:27:16,037 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null)
+2021-02-09 08:27:16,045 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] (Re-)joining group
+2021-02-09 08:27:16,052 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
+2021-02-09 08:27:16,052 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] (Re-)joining group
+2021-02-09 08:27:16,056 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Finished assignment for group at generation 1: {consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1-bde70b4e-c3e5-4e67-b832-b258cbaad60d=Assignment(partitions=[testtopic-0])}
+2021-02-09 08:27:16,062 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Successfully joined group with generation 1
+2021-02-09 08:27:16,065 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Adding newly assigned partitions: testtopic-0
+2021-02-09 08:27:16,072 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Found no committed offset for partition testtopic-0
+2021-02-09 08:27:16,083 INFO  [org.apa.kaf.cli.con.int.SubscriptionState] (Camel (camel-q) thread #0 - KafkaConsumer[testtopic]) [Consumer clientId=consumer-94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b-1, groupId=94ee637f-5058-4b8a-98b3-6a8e6e3fcc5b] Resetting offset for partition testtopic-0 to offset 3.
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/support.html[let us know].
+
+We also love contributors, so
+https://camel.apache.org/contributing.html[get involved] :-)
+
+The Camel riders!
diff --git a/examples/kafka-source-s3/data/MyRoutes.java b/examples/kafka-source-s3/data/MyRoutes.java
new file mode 100644
index 0000000..104e16f
--- /dev/null
+++ b/examples/kafka-source-s3/data/MyRoutes.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.camel.builder.RouteBuilder;
+
+public class MyRoutes extends RouteBuilder {
+    @Override
+    public void configure() throws Exception {
+        from("kafka:{{kafkatopic}}")
+            .setHeader("CamelAwsS3Key",simple("${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}"))
+            .to("aws2-s3:camel-kafka-connector");
+    }
+}
diff --git a/examples/kafka-source-s3/data/application.properties b/examples/kafka-source-s3/data/application.properties
new file mode 100644
index 0000000..cea7485
--- /dev/null
+++ b/examples/kafka-source-s3/data/application.properties
@@ -0,0 +1,44 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# quarkus
+#
+quarkus.banner.enabled = false
+quarkus.log.level = INFO
+quarkus.log.category."org.apache.camel".level = INFO
+
+#
+# camel - main
+#
+camel.main.name = camel-q
+camel.main.stream-caching-enabled = true
+camel.main.stream-caching-spool-directory = ${java.io.tmpdir}/camel-q
+
+#
+# camel - aws2-s3
+#
+camel.component.aws2-s3.accessKey=xxxx
+camel.component.aws2-s3.secretKey=yyyy
+camel.component.aws2-s3.region=region
+
+#
+# camel - kafka
+#
+kafkatopic=testtopic
+camel.component.kafka.brokers=localhost:9092
+
diff --git a/examples/kafka-source-s3/pom.xml b/examples/kafka-source-s3/pom.xml
new file mode 100644
index 0000000..57a9e77
--- /dev/null
+++ b/examples/kafka-source-s3/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>org.apache.camel.k</groupId>
+        <artifactId>camel-k-runtime-examples</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>camel-k-runtime-example-java-kafka-s3</artifactId>
+
+    <properties>
+        <noDeps>true</noDeps>
+        <quarkus.camel.main.routes-discovery.enabled>false</quarkus.camel.main.routes-discovery.enabled>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.k</groupId>
+            <artifactId>camel-k-runtime</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.k</groupId>
+            <artifactId>camel-k-loader-java</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-aws2-s3</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>io.quarkus</groupId>
+                <artifactId>quarkus-bootstrap-maven-plugin</artifactId>
+                <version>${quarkus-version}</version>
+            </plugin>
+            <plugin>
+                <groupId>io.quarkus</groupId>
+                <artifactId>quarkus-maven-plugin</artifactId>
+                <version>${quarkus-version}</version>
+                <executions>
+                    <execution>
+                        <id>build</id>
+                        <goals>
+                            <goal>build</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${project.artifactId}</finalName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>${exec-maven-plugin-version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <executable>java</executable>
+                    <workingDirectory>${project.basedir}</workingDirectory>
+                    <arguments>
+                        <argument>-jar</argument>
+                        <argument>${project.build.directory}/${project.artifactId}-runner.jar</argument>
+                    </arguments>
+                    <environmentVariables>
+                        <CAMEL_K_CONF>${project.basedir}/data/application.properties</CAMEL_K_CONF>
+                        <CAMEL_K_ROUTES>file:${project.basedir}/data/MyRoutes.java</CAMEL_K_ROUTES>
+                    </environmentVariables>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/examples/kafka-source-s3/src/main/resources/application.properties b/examples/kafka-source-s3/src/main/resources/application.properties
new file mode 100644
index 0000000..fa7a54b
--- /dev/null
+++ b/examples/kafka-source-s3/src/main/resources/application.properties
@@ -0,0 +1,16 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 9e000a0..624a698 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -40,6 +40,7 @@
         <module>kotlin</module>
         <module>groovy</module>
         <module>java</module>
+        <module>kafka-source-s3</module>
         <module>xml</module>
         <module>cron</module>
     </modules>