You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/28 07:49:10 UTC
[camel-kafka-connector-examples] 01/01: Added an Exec sink
connector example
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch exec-example
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git
commit 8aeda83b34fdfcdf459722442e1e344f7df8c944
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Oct 28 08:48:36 2020 +0100
Added an Exec sink connector example
---
exec/exec-sink/README.adoc | 81 ++++++++++++++++++++++
.../camel-kafka-exec-simple-producer/.gitignore | 4 ++
.../camel-kafka-exec-simple-producer/README.md | 7 ++
.../camel-kafka-exec-simple-producer/pom.xml | 77 ++++++++++++++++++++
.../camel/kafkaconnector/SimpleProducer.java | 29 ++++++++
.../src/main/resources/log4j.properties | 9 +++
.../config/CamelExecSinkConnector.properties | 27 ++++++++
7 files changed, 234 insertions(+)
diff --git a/exec/exec-sink/README.adoc b/exec/exec-sink/README.adoc
new file mode 100644
index 0000000..f06ef7f
--- /dev/null
+++ b/exec/exec-sink/README.adoc
@@ -0,0 +1,81 @@
+# Camel-Kafka-connector Exec Sink
+
+This is an example for Camel-Kafka-connector Exec Sink
+
+## Standalone
+
+### What is needed
+
+- The touch command
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+### Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+In this example we'll use `/home/oscerd/connectors/`
+
+```
+> cd /home/oscerd/connectors/
+> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-exec-kafka-connector/0.6.0/camel-exec-kafka-connector-0.6.0-package.zip
+> unzip camel-exec-kafka-connector-0.6.0-package.zip
+```
+
+Now it's time to setup the connectors
+
+Open the Exec sink configuration file
+
+```
+name=CamelExecSinkConnector
+connector.class=org.apache.camel.kafkaconnector.exec.CamelExecSinkConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.path.executable=touch
+camel.sink.endpoint.args=/tmp/${body}-${headers.detail}.txt
+```
+
+Set the correct options in the file.
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelExecSinkConnector.properties
+```
+
+You'll need to send messages to mytopic. You can run the simple producer included in this project. So cd in the folder:
+
+```
+> cd camel-kafka-exec-simple-producer
+> mvn compile exec:exec -Dkafka.topic.name=mytopic -Dkafka.key=1 -Dcamel.body="FileName" -Dcamel.header.detail="detail1"
+```
+
+Under tmp folder you should see the file created
+
+```
+> cd /tmp/
+> ls | grep FileName
+FileName-detail1.txt
+```
+
+In the mytopic topic you should see the message too
+
+```
+> ./kafkacat -b localhost:9092 -t mytopic -f 'Headers: %h: Message value: %s\n'
+Headers: CamelHeader.detail=detail1: Message value: FileName
+% Reached end of topic mytopic [0] at offset 1
+```
+
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore b/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore
new file mode 100644
index 0000000..72bba6e
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore
@@ -0,0 +1,4 @@
+.project
+.classpath
+.settings/
+/target
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/README.md b/exec/exec-sink/camel-kafka-exec-simple-producer/README.md
new file mode 100644
index 0000000..4a4622d
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/README.md
@@ -0,0 +1,7 @@
+## Infinispan Kafka Producer
+
+Related to https://github.com/oscerd/infinispan-kafka-demo
+
+To run the producer:
+
+mvn clean compile exec:exec
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml b/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml
new file mode 100644
index 0000000..9f64f04
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0"?>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>camel-kafka-exec-simple-producer</artifactId>
+ <name>Camel Kafka Exec Simple Producer</name>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.11.3</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>java</executable>
+ <arguments>
+ <argument>-classpath</argument>
+ <classpath />
+ <argument>org.apache.camel.kafkaconnector.SimpleProducer</argument>
+ <argument>${kafka.topic.name}</argument>
+ <argument>${kafka.key}</argument>
+ <argument>${camel.body}</argument>
+ <argument>${camel.header.detail}</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.2</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+
+
+</project>
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java
new file mode 100644
index 0000000..dfc156e
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java
@@ -0,0 +1,29 @@
+package org.apache.camel.kafkaconnector;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+public class SimpleProducer {
+
+ public static void main(String[] args) {
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092");
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
+
+
+ ProducerRecord<String, String> rec = new ProducerRecord<String, String>(args[0], args[1], args[2]);
+
+ rec.headers().add("CamelHeader.detail", args[3].getBytes());
+
+ prod.send(rec);
+
+ prod.close();
+ }
+}
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties
new file mode 100644
index 0000000..36af21f
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+
diff --git a/exec/exec-sink/config/CamelExecSinkConnector.properties b/exec/exec-sink/config/CamelExecSinkConnector.properties
new file mode 100644
index 0000000..8537766
--- /dev/null
+++ b/exec/exec-sink/config/CamelExecSinkConnector.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelExecSinkConnector
+connector.class=org.apache.camel.kafkaconnector.exec.CamelExecSinkConnector
+tasks.max=1
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.path.executable=touch
+camel.sink.endpoint.args=/tmp/${body}-${headers.detail}.txt