You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/08/15 14:47:47 UTC

[incubator-plc4x] branch feature/apache-kafka created (now 6f8287f)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch feature/apache-kafka
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


      at 6f8287f  Stub of kafka-connect

This branch includes the following new commits:

     new 6f8287f  Stub of kafka-connect

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-plc4x] 01/01: Stub of kafka-connect

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/apache-kafka
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 6f8287fece19c80630c4392cc467f7ee94ae8e90
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Aug 15 16:47:43 2018 +0200

    Stub of kafka-connect
---
 integrations/apache-kafka/README.md                |  52 +++++++++
 integrations/apache-kafka/bin/debug.sh             |  29 +++++
 .../config/Plc4xSinkConnector.properties           |  23 ++++
 .../config/Plc4xSourceConnector.properties         |  21 ++++
 .../config/connect-avro-docker.properties          |  44 +++++++
 integrations/apache-kafka/docker-compose.yml       |  45 +++++++
 integrations/apache-kafka/pom.xml                  | 103 ++++++++++++++++
 .../apache-kafka/src/main/assembly/package.xml     |  59 ++++++++++
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |  74 ++++++++++++
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |  74 ++++++++++++
 .../org/apache/plc4x/kafka/common/Plc4xConfig.java |  68 +++++++++++
 .../apache/plc4x/kafka/sink/Plc4xSinkConfig.java   |  49 ++++++++
 .../org/apache/plc4x/kafka/sink/Plc4xSinkTask.java | 129 +++++++++++++++++++++
 .../plc4x/kafka/source/Plc4xSourceConfig.java      |  49 ++++++++
 .../apache/plc4x/kafka/source/Plc4xSourceTask.java | 120 +++++++++++++++++++
 .../org/apache/plc4x/kafka/util/VersionUtil.java   |  32 +++++
 .../apache/plc4x/kafka/Plc4XSinkConfigTest.java    |  31 +++++
 .../apache/plc4x/kafka/Plc4XSourceConfigTest.java  |  31 +++++
 .../apache/plc4x/kafka/Plc4xSinkConnectorTest.java |  30 +++++
 .../org/apache/plc4x/kafka/Plc4xSinkTaskTest.java  |  30 +++++
 .../plc4x/kafka/Plc4xSourceConnectorTest.java      |  30 +++++
 .../apache/plc4x/kafka/Plc4xSourceTaskTest.java    |  30 +++++
 .../src/test/resources/logback.xml}                |  35 ++----
 integrations/pom.xml                               |   1 +
 .../apache/plc4x/java/api/messages/PlcRequest.java |   6 +-
 src/site/asciidoc/index.adoc                       |   8 +-
 26 files changed, 1171 insertions(+), 32 deletions(-)

diff --git a/integrations/apache-kafka/README.md b/integrations/apache-kafka/README.md
new file mode 100644
index 0000000..5b9e3be
--- /dev/null
+++ b/integrations/apache-kafka/README.md
@@ -0,0 +1,52 @@
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+Welcome to your new Kafka Connect connector!
+
+# Running in development
+
+
+The [docker-compose.yml](docker-compose.yml) that is included in this repository is based on the Confluent Platform Docker
+images. Take a look at the [quickstart](http://docs.confluent.io/3.0.1/cp-docker-images/docs/quickstart.html#getting-started-with-docker-client)
+for the Docker images. 
+
+The hostname `confluent` must be resolvable by your host. You will need to determine the ip address of your docker-machine using `docker-machine ip confluent` 
+and add this to your `/etc/hosts` file. For example if `docker-machine ip confluent` returns `192.168.99.100` add this:
+
+```
+192.168.99.100  confluent
+```
+
+
+```
+docker-compose up -d
+```
+
+
+Start the connector with debugging enabled.
+ 
+```
+./bin/debug.sh
+```
+
+Start the connector with debugging enabled. This will wait for a debugger to attach.
+
+```
+export SUSPEND='y'
+./bin/debug.sh
+```
\ No newline at end of file
diff --git a/integrations/apache-kafka/bin/debug.sh b/integrations/apache-kafka/bin/debug.sh
new file mode 100644
index 0000000..7be498d
--- /dev/null
+++ b/integrations/apache-kafka/bin/debug.sh
@@ -0,0 +1,29 @@
+#!/usr/bin/env bash
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+: ${SUSPEND:='n'}
+
+set -e
+
+mvn clean package
+export KAFKA_JMX_OPTS="-Xdebug -agentlib:jdwp=transport=dt_socket,server=y,suspend=${SUSPEND},address=5005"
+export CLASSPATH="$(find target/kafka-connect-target/usr/share/java -type f -name '*.jar' | tr '\n' ':')"
+
+connect-standalone config/connect-avro-docker.properties config/Plc4xSinkConnector.properties config/Plc4xSourceConnector.properties
diff --git a/integrations/apache-kafka/config/Plc4xSinkConnector.properties b/integrations/apache-kafka/config/Plc4xSinkConnector.properties
new file mode 100644
index 0000000..29ff1a8
--- /dev/null
+++ b/integrations/apache-kafka/config/Plc4xSinkConnector.properties
@@ -0,0 +1,23 @@
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+name=Plc4xSinkConnector
+topics=plc4x-topic
+tasks.max=1
+connector.class=org.apache.plc4x.kafka.Plc4xSinkConnector
diff --git a/integrations/apache-kafka/config/Plc4xSourceConnector.properties b/integrations/apache-kafka/config/Plc4xSourceConnector.properties
new file mode 100644
index 0000000..3c54029
--- /dev/null
+++ b/integrations/apache-kafka/config/Plc4xSourceConnector.properties
@@ -0,0 +1,21 @@
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+name=Plc4xSourceConnector
+tasks.max=1
+connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
\ No newline at end of file
diff --git a/integrations/apache-kafka/config/connect-avro-docker.properties b/integrations/apache-kafka/config/connect-avro-docker.properties
new file mode 100644
index 0000000..5f8a265
--- /dev/null
+++ b/integrations/apache-kafka/config/connect-avro-docker.properties
@@ -0,0 +1,44 @@
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
+# integrates the the SchemaConfig Registry. This sample configuration assumes a local installation of
+# Confluent Platform with all services running on their default ports.
+# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
+bootstrap.servers=confluent:9092
+# The converters specify the format of data in Kafka and how to translate it into Connect data.
+# Every Connect user will need to configure these based on the format they want their data in
+# when loaded from or stored into Kafka
+key.converter=io.confluent.connect.avro.AvroConverter
+key.converter.schema.registry.url=http://confluent:8081
+value.converter=io.confluent.connect.avro.AvroConverter
+value.converter.schema.registry.url=http://confluent:8081
+# The internal converter used for offsets and config data is configurable and must be specified,
+# but most users will always want to use the built-in default. Offset and config data is never
+# visible outside of Connect in this format.
+internal.key.converter=org.apache.kafka.connect.json.JsonConverter
+internal.value.converter=org.apache.kafka.connect.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+# Local storage file for offset data
+offset.storage.file.filename=/tmp/connect.offsets
+# Confuent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
+# that will report audit data that can be displayed and analyzed in Confluent Control Center
+# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
+# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
diff --git a/integrations/apache-kafka/docker-compose.yml b/integrations/apache-kafka/docker-compose.yml
new file mode 100644
index 0000000..215c28a
--- /dev/null
+++ b/integrations/apache-kafka/docker-compose.yml
@@ -0,0 +1,45 @@
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+version: "2"
+services:
+  zookeeper:
+    image: confluentinc/cp-zookeeper:3.2.2-1
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      zk_id: "1"
+  kafka:
+    hostname: kafka
+    image: confluentinc/cp-kafka:3.2.2-1
+    links:
+    - zookeeper
+    ports:
+    - "9092:9092"
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
+      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092"
+  schema-registry:
+    image: confluentinc/cp-schema-registry:3.2.2-1
+    links:
+    - kafka
+    - zookeeper
+    ports:
+    - "8081:8081"
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
+      SCHEMA_REGISTRY_HOST_NAME: schema-registry
\ No newline at end of file
diff --git a/integrations/apache-kafka/pom.xml b/integrations/apache-kafka/pom.xml
new file mode 100644
index 0000000..e09c85b
--- /dev/null
+++ b/integrations/apache-kafka/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.plc4x</groupId>
+    <artifactId>integrations</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>apache-kafka</artifactId>
+  <name>Integrations: Apache Kafka</name>
+  <description>Integration module for integrating PLC4X into Apache Kafka (Kafka-Connect-Plugin).</description>
+
+  <properties>
+    <kafka.version>2.0.0</kafka.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-api</artifactId>
+      <version>0.0.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-core</artifactId>
+      <version>0.0.1-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-api</artifactId>
+      <version>${kafka.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.5.3</version>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/package.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/assembly/package.xml b/integrations/apache-kafka/src/main/assembly/package.xml
new file mode 100644
index 0000000..48dea31
--- /dev/null
+++ b/integrations/apache-kafka/src/main/assembly/package.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
+          http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <!-- Assembles a packaged version targeting OS installation. -->
+  <id>package</id>
+  <formats>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}</directory>
+      <outputDirectory>share/doc/${project.name}/</outputDirectory>
+      <includes>
+        <include>README*</include>
+        <include>LICENSE*</include>
+        <include>NOTICE*</include>
+        <include>licenses/</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/config</directory>
+      <outputDirectory>etc/${project.name}</outputDirectory>
+      <includes>
+        <include>*</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>share/java/${project.name}</outputDirectory>
+      <useProjectArtifact>true</useProjectArtifact>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <excludes>
+        <exclude>org.apache.kafka:connect-api</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+</assembly>
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
new file mode 100644
index 0000000..021b023
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.plc4x.kafka.sink.Plc4xSinkConfig;
+import org.apache.plc4x.kafka.sink.Plc4xSinkTask;
+import org.apache.plc4x.kafka.util.VersionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class Plc4xSinkConnector extends SinkConnector {
+    private static Logger log = LoggerFactory.getLogger(Plc4xSinkConnector.class);
+
+    private Map<String, String> configProperties;
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return Plc4xSinkTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        log.info("Setting task configurations for {} workers.", maxTasks);
+        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
+        for (int i = 0; i < maxTasks; ++i) {
+            configs.add(configProperties);
+        }
+        return configs;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        configProperties = props;
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do here ...
+    }
+
+    @Override
+    public ConfigDef config() {
+        return Plc4xSinkConfig.CONFIG_DEF;
+    }
+
+    @Override
+    public String version() {
+        return VersionUtil.getVersion();
+    }
+
+}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
new file mode 100644
index 0000000..f20a8a5
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.plc4x.kafka.source.Plc4xSourceConfig;
+import org.apache.plc4x.kafka.source.Plc4xSourceTask;
+import org.apache.plc4x.kafka.util.VersionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class Plc4xSourceConnector extends SourceConnector {
+    private static Logger log = LoggerFactory.getLogger(Plc4xSourceConnector.class);
+
+    private Map<String, String> configProperties;
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return Plc4xSourceTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        log.info("Setting task configurations for {} workers.", maxTasks);
+        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
+        for (int i = 0; i < maxTasks; ++i) {
+            configs.add(configProperties);
+        }
+        return configs;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        configProperties = props;
+    }
+
+    @Override
+    public void stop() {
+        // Nothing to do here ...
+    }
+
+    @Override
+    public ConfigDef config() {
+        return Plc4xSourceConfig.CONFIG_DEF;
+    }
+
+    @Override
+    public String version() {
+        return VersionUtil.getVersion();
+    }
+
+}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
new file mode 100644
index 0000000..bceedfc
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.common;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.Map;
+
+
+public class Plc4xConfig extends AbstractConfig {
+
+    public static final String PLC_CONNECTION_STRING_CONFIG = "my.setting";
+    public static final String PLC_CONNECTION_STRING_DISPLAY = "PLC Connection String";
+    public static final String PLC_CONNECTION_STRING_DOC = "Connection string used by PLC4X to connect to the PLC.";
+
+    public static ConfigDef baseConfigDef() {
+        ConfigDef config = new ConfigDef();
+        addPlcOptions(config);
+        return config;
+    }
+
+    private static final void addPlcOptions(ConfigDef config) {
+        config.define(
+            PLC_CONNECTION_STRING_CONFIG,
+            Type.STRING,
+            Importance.HIGH,
+            PLC_CONNECTION_STRING_DOC);
+    }
+
+    public static final ConfigDef CONFIG_DEF = baseConfigDef();
+
+    public Plc4xConfig(ConfigDef config, Map<String, String> parsedConfig) {
+        super(config, parsedConfig);
+        String plcConnectionString = getString(PLC_CONNECTION_STRING_CONFIG);
+        if (plcConnectionString == null) {
+            throw new ConfigException("'PLC Connection String' must be specified");
+        }
+    }
+
+    public Plc4xConfig(Map<String, String> parsedConfig) {
+        this(CONFIG_DEF, parsedConfig);
+    }
+
+    public String getPlcConnectionString() {
+        return this.getString(PLC_CONNECTION_STRING_CONFIG);
+    }
+
+}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java
new file mode 100644
index 0000000..b85cb0a
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.sink;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.plc4x.kafka.common.Plc4xConfig;
+
+import java.util.Map;
+
+
+public class Plc4xSinkConfig extends Plc4xConfig {
+
+    public static ConfigDef baseConfigDef() {
+        ConfigDef config = Plc4xConfig.baseConfigDef();
+        addPlcOptions(config);
+        return config;
+    }
+
+    private static final void addPlcOptions(ConfigDef config) {
+        // TODO: Add things needed here.
+    }
+
+    public static final ConfigDef CONFIG_DEF = baseConfigDef();
+
+    public Plc4xSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
+        super(config, parsedConfig);
+    }
+
+    public Plc4xSinkConfig(Map<String, String> parsedConfig) {
+        this(CONFIG_DEF, parsedConfig);
+    }
+
+}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java
new file mode 100644
index 0000000..840d938
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java
@@ -0,0 +1,129 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.sink;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.messages.items.WriteRequestItem;
+import org.apache.plc4x.java.api.messages.items.WriteResponseItem;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.java.api.types.ResponseCode;
+import org.apache.plc4x.kafka.util.VersionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Plc4xSinkTask extends SinkTask {
+
+    private static Logger log = LoggerFactory.getLogger(Plc4xSinkTask.class);
+
+    private Plc4xSinkConfig config;
+    private PlcConnection plcConnection;
+    private PlcWriter writer;
+    private AtomicBoolean running = new AtomicBoolean(false);
+
+    @Override
+    public String version() {
+        return VersionUtil.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> properties) {
+        try {
+            config = new Plc4xSinkConfig(properties);
+        } catch (ConfigException e) {
+            throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
+        }
+        final String url = config.getString(Plc4xSinkConfig.PLC_CONNECTION_STRING_CONFIG);
+
+        try {
+            plcConnection = new PlcDriverManager().getConnection(url);
+            Optional<PlcWriter> writerOptional = plcConnection.getWriter();
+            if(!writerOptional.isPresent()) {
+                throw new ConnectException("PlcWriter not available for this type of connection");
+            }
+            writer = writerOptional.get();
+            running.set(true);
+        } catch (PlcConnectionException e) {
+            throw new ConnectException("Caught exception while connecting to PLC", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if(plcConnection != null) {
+            running.set(false);
+            try {
+                plcConnection.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Caught exception while closing connection to PLC", e);
+            }
+        }
+    }
+
+    @Override
+    public void put(Collection<SinkRecord> records) {
+        if((plcConnection != null) && plcConnection.isConnected() && (writer != null)) {
+            // Prepare the write request.
+            List<WriteRequestItem<?>> writeRequestItems = new ArrayList<>(records.size());
+            for (SinkRecord record : records) {
+                // TODO: Somehow get the payload from the kafka SinkRecord and create a writeRequestItem from that ...
+                // TODO: Replace this dummy with something real ...
+                Map<String, Object> value = new HashMap<>(); //(Map<String, String>) record.value()
+                String addressString = (String) value.get("address");
+                List<Byte> values = (List<Byte>) value.get("values");
+
+                try {
+                    Address address = plcConnection.parseAddress(addressString);
+                    writeRequestItems.add(new WriteRequestItem<>(Byte.class, address, values.toArray(new Byte[0])));
+                } catch (PlcException e) {
+                    // TODO: Do Something if the address string wasn't parsable by the current driver ...
+                    log.error("Error parsing address string " + addressString, e);
+                }
+            }
+            PlcWriteRequest writeRequest = new PlcWriteRequest(writeRequestItems);
+
+            // Send the write request to the PLC.
+            try {
+                PlcWriteResponse plcWriteResponse = writer.write(writeRequest).get();
+                for (WriteResponseItem<?> responseItem : plcWriteResponse.getResponseItems()) {
+                    if(responseItem.getResponseCode() != ResponseCode.OK) {
+                        // TODO: Do Something if writing this particular item wasn't successful ...
+                        log.error("Error writing a value to PLC");
+                    }
+                }
+            } catch (ExecutionException | InterruptedException e) {
+                log.error("Error writing values to PLC", e);
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java
new file mode 100644
index 0000000..3904343
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.source;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.plc4x.kafka.common.Plc4xConfig;
+
+import java.util.Map;
+
+
+public class Plc4xSourceConfig extends Plc4xConfig {
+
+    public static ConfigDef baseConfigDef() {
+        ConfigDef config = Plc4xConfig.baseConfigDef();
+        addPlcOptions(config);
+        return config;
+    }
+
+    private static final void addPlcOptions(ConfigDef config) {
+        // TODO: Add things needed here.
+    }
+
+    public static final ConfigDef CONFIG_DEF = baseConfigDef();
+
+    public Plc4xSourceConfig(ConfigDef config, Map<String, String> parsedConfig) {
+        super(config, parsedConfig);
+    }
+
+    public Plc4xSourceConfig(Map<String, String> parsedConfig) {
+        this(CONFIG_DEF, parsedConfig);
+    }
+
+}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
new file mode 100644
index 0000000..36e37a0
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -0,0 +1,120 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.source;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
+import org.apache.plc4x.java.api.model.Address;
+import org.apache.plc4x.kafka.util.VersionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Plc4xSourceTask extends SourceTask {
+
+    static final Logger log = LoggerFactory.getLogger(Plc4xSourceTask.class);
+
+    private Plc4xSourceConfig config;
+    private PlcConnection plcConnection;
+    private PlcReader reader;
+    private PlcReadRequest readRequest;
+    private AtomicBoolean running = new AtomicBoolean(false);
+
+    @Override
+    public String version() {
+        return VersionUtil.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> properties) {
+        try {
+            config = new Plc4xSourceConfig(properties);
+        } catch (ConfigException e) {
+            throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
+        }
+        final String url = config.getString(Plc4xSourceConfig.PLC_CONNECTION_STRING_CONFIG);
+
+        try {
+            plcConnection = new PlcDriverManager().getConnection(url);
+            Optional<PlcReader> readerOptional = plcConnection.getReader();
+            if(!readerOptional.isPresent()) {
+                throw new ConnectException("PlcReader not available for this type of connection");
+            }
+            reader = readerOptional.get();
+            running.set(true);
+        } catch (PlcConnectionException e) {
+            throw new ConnectException("Caught exception while connecting to PLC", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if(plcConnection != null) {
+            running.set(false);
+            try {
+                plcConnection.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Caught exception while closing connection to PLC", e);
+            }
+        }
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        if((plcConnection != null) && plcConnection.isConnected() && (reader != null)) {
+            final List<SourceRecord> results = new LinkedList<>();
+
+            try {
+                PlcReadResponse plcReadResponse = reader.read(readRequest).get();
+
+                for (ReadResponseItem<?> responseItem : plcReadResponse.getResponseItems()) {
+                    Address address = responseItem.getRequestItem().getAddress();
+                    List<?> values = responseItem.getValues();
+
+                    // TODO: Implement Sending this information to Kafka ...
+                    //results.add(new SourceRecord())
+                }
+            } catch (ExecutionException e) {
+                log.error("Error reading values from PLC", e);
+            }
+
+            return results;
+        }
+        return null;
+    }
+
+}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java
new file mode 100644
index 0000000..b94dee5
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.util;
+
+/**
+ * Utility to access version information from the jars MANIFEST.MF file, which is automatically generated by the build.
+ */
+public class VersionUtil {
+    public static String getVersion() {
+        try {
+            return VersionUtil.class.getPackage().getImplementationVersion();
+        } catch (Exception ex) {
+            return "0.0.0.0";
+        }
+    }
+}
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java
new file mode 100644
index 0000000..b0ded1c
--- /dev/null
+++ b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.apache.plc4x.kafka.sink.Plc4xSinkConfig;
+import org.junit.Test;
+
+public class Plc4XSinkConfigTest {
+
+    @Test
+    public void doc() {
+        System.out.println(Plc4xSinkConfig.CONFIG_DEF.toRst());
+    }
+
+}
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
new file mode 100644
index 0000000..2a969db
--- /dev/null
+++ b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.apache.plc4x.kafka.source.Plc4xSourceConfig;
+import org.junit.Test;
+
+public class Plc4XSourceConfigTest {
+
+    @Test
+    public void doc() {
+        System.out.println(Plc4xSourceConfig.CONFIG_DEF.toRst());
+    }
+
+}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSinkConnectorTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSinkConnectorTest.java
new file mode 100644
index 0000000..2777ed5
--- /dev/null
+++ b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSinkConnectorTest.java
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.junit.Test;
+
+public class Plc4xSinkConnectorTest {
+
+    @Test
+    public void test() {
+        // Congrats on a passing test!
+    }
+
+}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSinkTaskTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSinkTaskTest.java
new file mode 100644
index 0000000..9006e85
--- /dev/null
+++ b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSinkTaskTest.java
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.junit.Test;
+
+public class Plc4xSinkTaskTest {
+
+    @Test
+    public void test() {
+        // Congrats on a passing test!
+    }
+
+}
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSourceConnectorTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSourceConnectorTest.java
new file mode 100644
index 0000000..27d158d
--- /dev/null
+++ b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSourceConnectorTest.java
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.junit.Test;
+
+public class Plc4xSourceConnectorTest {
+
+    @Test
+    public void test() {
+        // Congrats on a passing test!
+    }
+
+}
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSourceTaskTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSourceTaskTest.java
new file mode 100644
index 0000000..3b5942b
--- /dev/null
+++ b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4xSourceTaskTest.java
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.junit.Test;
+
+public class Plc4xSourceTaskTest {
+
+    @Test
+    public void test() {
+        // Congrats on a passing test!
+    }
+
+}
\ No newline at end of file
diff --git a/integrations/pom.xml b/integrations/apache-kafka/src/test/resources/logback.xml
similarity index 52%
copy from integrations/pom.xml
copy to integrations/apache-kafka/src/test/resources/logback.xml
index 83c9ecc..c2bb522 100644
--- a/integrations/pom.xml
+++ b/integrations/apache-kafka/src/test/resources/logback.xml
@@ -17,28 +17,13 @@
   limitations under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.plc4x</groupId>
-    <artifactId>plc4x-parent</artifactId>
-    <version>0.0.1-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>integrations</artifactId>
-  <packaging>pom</packaging>
-
-  <name>Integrations</name>
-  <description>Parent of all product specific integration modules.</description>
-
-  <modules>
-    <module>apache-brooklyn</module>
-    <module>apache-camel</module>
-    <module>apache-edgent</module>
-    <module>apache-mynewt</module>
-  </modules>
-
-</project>
\ No newline at end of file
+<configuration>
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
+    </encoder>
+  </appender>
+  <root level="debug">
+    <appender-ref ref="STDOUT"/>
+  </root>
+</configuration>
\ No newline at end of file
diff --git a/integrations/pom.xml b/integrations/pom.xml
index 83c9ecc..1e37b30 100644
--- a/integrations/pom.xml
+++ b/integrations/pom.xml
@@ -38,6 +38,7 @@
     <module>apache-brooklyn</module>
     <module>apache-camel</module>
     <module>apache-edgent</module>
+    <module>apache-kafka</module>
     <module>apache-mynewt</module>
   </modules>
 
diff --git a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
index 3ce851b..b2d3f4f 100644
--- a/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
+++ b/plc4j/api/src/main/java/org/apache/plc4x/java/api/messages/PlcRequest.java
@@ -42,9 +42,9 @@ public abstract class PlcRequest<REQUEST_ITEM extends RequestItem> implements Pl
         this.requestItems = requestItems;
     }
 
-    public void addItem(REQUEST_ITEM readRequestItem) {
-        Objects.requireNonNull(readRequestItem, "Request item must not be null");
-        getRequestItems().add(readRequestItem);
+    public void addItem(REQUEST_ITEM requestItem) {
+        Objects.requireNonNull(requestItem, "Request item must not be null");
+        getRequestItems().add(requestItem);
     }
 
     public List<REQUEST_ITEM> getRequestItems() {
diff --git a/src/site/asciidoc/index.adoc b/src/site/asciidoc/index.adoc
index 7cdd7d6..31da326 100644
--- a/src/site/asciidoc/index.adoc
+++ b/src/site/asciidoc/index.adoc
@@ -37,11 +37,11 @@ This is where PLC4X comes in:
 In general we are trying to achieve the same goal OPC-UA is trying to address, but we try to do this by going a completely different path.
 
 While with OPC-UA every device has to be retrofitted with the ability to speak a new protocol and use a common client to speak with these devices,
-PLC4X tries to provide a unified API by also implementing drivers for communicating in the protocols native of most industrial controllers.
+PLC4X tries to provide a unified API by implementing drivers for communicating with most industrial controllers in the protocols they natively understand.
 
 Each of these drivers is being implemented from the specs or by reverse engineering protocols in order to be fully Apache 2.0 licensed.
 
-The major benefits of PLC4X over OPC-UA, from our point of view is:
+The major benefits of PLC4X over OPC-UA, from our point of view are:
 
 - No need to modify existing hardware
 - Activating OPC-UA support on existing PLCs greatly increases the load on the PLCs
@@ -53,8 +53,8 @@ At first we will be concentrating on providing adapters for the most widely used
 - icon:wrench[role=yellow] link:protocpls/delta-v/index.html[Emerson DeltaV (UDP)]
 - icon:wrench[role=yellow] link:protocols/ethernet-ip/index.html[EtherNet/IP (TCP)]
 - icon:check[role=green] link:protocols/modbus/index.html[Modbus (TCP)]
-- icon:times[role=red] link:protocols/opc-ua/index.html[OPC-UA]
-- icon:check[role=green] link:protocols/s7/index.html[S7 (TCP)]
+- icon:times[role=red] link:protocols/opc-ua/index.html[OPC-UA (TCP)]
+- icon:check[role=green] link:protocols/s7/index.html[Siemens S7 (TCP)]
 
 We are planning on providing support for the following programming languages: