You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/28 07:51:03 UTC

[camel-kafka-connector-examples] branch master updated: Added an Exec sink connector example

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new 048c690  Added an Exec sink connector example
048c690 is described below

commit 048c6909361c0d67dff4af79bc64cd349184d56d
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Oct 28 08:48:36 2020 +0100

    Added an Exec sink connector example
---
 exec/exec-sink/README.adoc                         | 81 ++++++++++++++++++++++
 .../camel-kafka-exec-simple-producer/.gitignore    |  4 ++
 .../camel-kafka-exec-simple-producer/README.md     |  7 ++
 .../camel-kafka-exec-simple-producer/pom.xml       | 77 ++++++++++++++++++++
 .../camel/kafkaconnector/SimpleProducer.java       | 29 ++++++++
 .../src/main/resources/log4j.properties            |  9 +++
 .../config/CamelExecSinkConnector.properties       | 27 ++++++++
 7 files changed, 234 insertions(+)

diff --git a/exec/exec-sink/README.adoc b/exec/exec-sink/README.adoc
new file mode 100644
index 0000000..f06ef7f
--- /dev/null
+++ b/exec/exec-sink/README.adoc
@@ -0,0 +1,81 @@
+# Camel-Kafka-connector Exec Sink
+
+This is an example for Camel-Kafka-connector Exec Sink 
+
+## Standalone
+
+### What is needed
+
+- The touch command
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+### Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+In this example we'll use `/home/oscerd/connectors/`
+
+```
+> cd /home/oscerd/connectors/
+> wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-exec-kafka-connector/0.6.0/camel-exec-kafka-connector-0.6.0-package.zip
+> unzip camel-exec-kafka-connector-0.6.0-package.zip
+```
+
+Now it's time to setup the connectors
+
+Open the Exec sink configuration file
+
+```
+name=CamelExecSinkConnector
+connector.class=org.apache.camel.kafkaconnector.exec.CamelExecSinkConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.path.executable=touch
+camel.sink.endpoint.args=/tmp/${body}-${headers.detail}.txt 
+```
+
+Set the correct options in the file.
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelExecSinkConnector.properties
+```
+
+You'll need to send messages to mytopic. You can run the simple producer included in this project. So cd in the folder:
+
+```
+> cd camel-kafka-exec-simple-producer
+> mvn compile exec:exec -Dkafka.topic.name=mytopic -Dkafka.key=1 -Dcamel.body="FileName" -Dcamel.header.detail="detail1"
+```
+
+Under tmp folder you should see the file created
+
+```
+> cd /tmp/
+> ls | grep FileName
+FileName-detail1.txt
+```
+
+In the mytopic topic you should see the message too
+
+```
+> ./kafkacat -b localhost:9092 -t mytopic -f 'Headers: %h: Message value: %s\n'
+Headers: CamelHeader.detail=detail1: Message value: FileName
+% Reached end of topic mytopic [0] at offset 1
+```
+
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore b/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore
new file mode 100644
index 0000000..72bba6e
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/.gitignore
@@ -0,0 +1,4 @@
+.project
+.classpath
+.settings/
+/target
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/README.md b/exec/exec-sink/camel-kafka-exec-simple-producer/README.md
new file mode 100644
index 0000000..4a4622d
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/README.md
@@ -0,0 +1,7 @@
+## Infinispan Kafka Producer
+
+Related to https://github.com/oscerd/infinispan-kafka-demo
+
+To run the producer:
+
+mvn clean compile exec:exec
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml b/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml
new file mode 100644
index 0000000..9f64f04
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+	<artifactId>camel-kafka-exec-simple-producer</artifactId>
+	<name>Camel Kafka Exec Simple Producer</name>
+        <version>0.0.1-SNAPSHOT</version>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.12</artifactId>
+			<version>2.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>2.5.0</version>
+		</dependency>
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-databind</artifactId>
+			<version>2.11.3</version>
+		</dependency>
+                <dependency>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                        <version>1.2.17</version>
+                </dependency>
+	</dependencies>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>exec-maven-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>exec</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<executable>java</executable>
+					<arguments>
+						<argument>-classpath</argument>
+						<classpath />
+						<argument>org.apache.camel.kafkaconnector.SimpleProducer</argument>
+						<argument>${kafka.topic.name}</argument>
+						<argument>${kafka.key}</argument>
+						<argument>${camel.body}</argument>
+						<argument>${camel.header.detail}</argument>
+					</arguments>
+				</configuration>
+			</plugin>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-compiler-plugin</artifactId>
+					<version>3.2</version>
+					<configuration>
+						<source>1.8</source>
+						<target>1.8</target>
+					</configuration>
+				</plugin>
+		</plugins>
+	</build>
+
+
+
+</project>
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java
new file mode 100644
index 0000000..dfc156e
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/java/org/apache/camel/kafkaconnector/SimpleProducer.java
@@ -0,0 +1,29 @@
+package org.apache.camel.kafkaconnector;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+public class SimpleProducer {
+
+    public static void main(String[] args) {
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+
+        KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
+       
+        
+        ProducerRecord<String, String> rec = new ProducerRecord<String, String>(args[0], args[1], args[2]);
+        
+        rec.headers().add("CamelHeader.detail", args[3].getBytes());
+
+        prod.send(rec);
+
+        prod.close();
+    }
+}
diff --git a/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties
new file mode 100644
index 0000000..36af21f
--- /dev/null
+++ b/exec/exec-sink/camel-kafka-exec-simple-producer/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Root logger option
+log4j.rootLogger=INFO, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+
diff --git a/exec/exec-sink/config/CamelExecSinkConnector.properties b/exec/exec-sink/config/CamelExecSinkConnector.properties
new file mode 100644
index 0000000..8537766
--- /dev/null
+++ b/exec/exec-sink/config/CamelExecSinkConnector.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelExecSinkConnector
+connector.class=org.apache.camel.kafkaconnector.exec.CamelExecSinkConnector
+tasks.max=1
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+
+topics=mytopic
+
+camel.sink.path.executable=touch
+camel.sink.endpoint.args=/tmp/${body}-${headers.detail}.txt