You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/14 17:08:16 UTC

[plc4x] 01/04: - Working on refactoring the Kafka Connect configuration.

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 39ccdcb5370cd1d1e0220e06fd2ee3853c765838
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Aug 13 08:26:52 2019 +0200

    - Working on refactoring the Kafka Connect configuration.
---
 plc4j/integrations/apache-kafka/README.md          |  54 ++++++--
 .../apache-kafka/config/sink.properties            |  34 +++--
 .../apache-kafka/config/source.properties          |  59 ++++++---
 plc4j/integrations/apache-kafka/pom.xml            |  38 ++----
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |   9 +-
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java |   6 +-
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |  41 +++---
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    |   1 +
 .../org/apache/plc4x/kafka/config/Defaults.java    |  33 +++++
 .../java/org/apache/plc4x/kafka/config/Job.java    |  48 +++++++
 .../apache/plc4x/kafka/config/JobReference.java    |  45 +++++++
 .../java/org/apache/plc4x/kafka/config/Source.java |  45 +++++++
 .../apache/plc4x/kafka/config/SourceConfig.java    | 146 +++++++++++++++++++++
 .../kafka/exceptions/ConfigurationException.java   |  31 +++++
 .../plc4x/kafka/config/SourceConfigTest.java       |  78 +++++++++++
 15 files changed, 565 insertions(+), 103 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/README.md b/plc4j/integrations/apache-kafka/README.md
index 2f9326a..a2a073e 100644
--- a/plc4j/integrations/apache-kafka/README.md
+++ b/plc4j/integrations/apache-kafka/README.md
@@ -31,33 +31,71 @@ See `config/sink.properties` for example configuration.
 
 ## Quickstart
 
+A Kafka Connect worker can be run in two modes: 
+- Standalone
+- Distributed
+
+Both modes require a Kafka Broker instance to be available.
+Kafka Connect is part of the Kafka distribution. 
+
+In order to start a Kafka Connect system the following steps have to be performed:
+
 1) Download the latest version of Apache Kafka binaries from here: https://kafka.apache.org/downloads
 2) Unpack the archive.
 3) Copy the target/apache-kafka-0.5.0-SNAPSHOT.jar to the Kafka "libs" directory.
 4) Copy the files in the "config" to Kafka's "configs" directory (maybe inside a "plc4x" subdirectory)
-5) Open 4 console windows and change directory into that directory
-6) Start Zookeeper: 
+
+### Start a Kafka Broker
+
+1) Open 4 console windows and change directory into that directory
+2) Start Zookeeper: 
         
         bin/zookeeper-server-start.sh config/zookeeper.properties 
-7) Start Kafka:
+
+3) Start Kafka:
         
         bin/kafka-server-start.sh config/server.properties
-8) Create the "test" topic:
+
+4) Create the "test" topic:
         
         bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
-9) Start the consumer:
+
+5) Start the consumer:
         
         bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
 
-**Note:** Not quite sure here ... have to continue working on this ...
+### Start a Kafka Connect Worker (Standalone)
+
+Ideal for testing. 
 
-10) Start Kafka connect:
+1) Start Kafka connect:
         
         bin/connect-standalone.sh config/connect-standalone.properties config/plc4x/source.properties
+
 Now watch the console window with the "kafka-console-consumer". 
 
 If you want to debug the connector, be sure to set some environment variables before starting Kafka-Connect:
 
         export KAFKA_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
 
-In this case the startup will suspend till an IDE is connected via a remote-debugging session.
\ No newline at end of file
+In this case the startup will suspend till an IDE is connected via a remote-debugging session.
+
+### Start Kafka Connect Worker (Distributed Mode)
+
+Ideal for production.
+
+In this case the state of the node is handled by Zookeeper and the configuration of the connectors are distributed via Kafka topics.
+
+    bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
+    bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
+    bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
+
+Starting the worker is then as simple as this:
+
+    bin /connect-distributed.sh config/connect-distributed.properties
+    
+The configuration of the Connectors is then provided via REST interface:
+
+    curl -X POST -H "Content-Type: application/json" --data '{"name": "plc-source-test", "config": {"connector.class":"org.apache.plc4x.kafka.Plc4xSourceConnector", 
+    // TODO: Continue here ...
+    "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
diff --git a/plc4j/integrations/apache-kafka/config/sink.properties b/plc4j/integrations/apache-kafka/config/sink.properties
index ae9ccba..8da6eae 100644
--- a/plc4j/integrations/apache-kafka/config/sink.properties
+++ b/plc4j/integrations/apache-kafka/config/sink.properties
@@ -1,21 +1,19 @@
-<!--
-
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
--->
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 name=plc-sink-test
 connector.class=org.apache.plc4x.kafka.Plc4xSinkConnector
 topics=test
diff --git a/plc4j/integrations/apache-kafka/config/source.properties b/plc4j/integrations/apache-kafka/config/source.properties
index afa7e93..83d81dd 100644
--- a/plc4j/integrations/apache-kafka/config/source.properties
+++ b/plc4j/integrations/apache-kafka/config/source.properties
@@ -1,23 +1,44 @@
-<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+name=plc-source-test
+connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
 
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
+defaults.topic=some/default
 
-http://www.apache.org/licenses/LICENSE-2.0
+sources.machineA.connectionString=s7://1.2.3.4/1/1
+sources.machineA.jobReferences.s7-dashboard.enabled=true
+sources.machineA.jobReferences.s7-heartbeat.enabled=true
+sources.machineA.jobReferences.s7-heartbeat.topic=heartbeat
 
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
+sources.machineB.connectionString=s7://10.20.30.40/1/1
+sources.machineB.topic=heartbeat
+sources.machineB.jobReferences.s7-heartbeat.enabled=true
 
--->
-name=plc-source-test
-connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
-topic=test
-queries=test:unused#RANDOM/foo:INTEGER,test:another#RANDOM/bar:STRING
-rate=2000
\ No newline at end of file
+sources.machineC.connectionString=ads://1.2.3.4.5.6
+sources.machineC.topic=heartbeat
+sources.machineC.jobReferences.ads-heartbeat.enabled=true
+
+jobs.s7-dashboard.interval=500
+jobs.s7-dashboard.fields.inputPreasure=%DB.DB1.4:INT
+jobs.s7-dashboard.fields.outputPreasure=%Q1:BYT
+jobs.s7-dashboard.fields.temperature=%I3:INT
+
+jobs.s7-heartbeat.interval=1000
+jobs.s7-heartbeat.fields.active=%I0.2:BOOL
+
+jobs.ads-heartbeat.interval=1000
+jobs.ads-heartbeat.fields.active=Main.running
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/pom.xml b/plc4j/integrations/apache-kafka/pom.xml
index 33b698e..3e38ccb 100644
--- a/plc4j/integrations/apache-kafka/pom.xml
+++ b/plc4j/integrations/apache-kafka/pom.xml
@@ -32,15 +32,26 @@
   <description>Integration module for integrating PLC4X into Apache Kafka (Kafka-Connect-Plugin)</description>
 
   <properties>
-    <kafka.version>2.0.0</kafka.version>
+    <kafka.version>2.3.0</kafka.version>
   </properties>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.plc4x</groupId>
       <artifactId>plc4j-api</artifactId>
-      <version>${project.version}</version>
+      <version>0.5.0-SNAPSHOT</version>
     </dependency>
+    <!--dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-scraper</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+    </dependency-->
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
@@ -48,15 +59,6 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-    <!-- TODO: Maybe this can be removed (and the exclusion in the dependency plugin) -->
-    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>connect-api</artifactId>
       <version>${kafka.version}</version>
@@ -64,18 +66,4 @@
     </dependency>
   </dependencies>
 
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <configuration>
-          <ignoredDependencies combine.children="append">
-            <ignoredDependency>org.apache.kafka:connect-api</ignoredDependency>
-          </ignoredDependencies>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
 </project>
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index c68a1a8..7e0b477 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -30,14 +30,13 @@ import java.util.List;
 import java.util.Map;
 
 public class Plc4xSinkConnector extends SinkConnector {
-    static final String URL_CONFIG = "url";
-    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
+    private static final String URL_CONFIG = "url";
+    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
+    private static final ConfigDef CONFIG_DEF =
+        new ConfigDef().define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
 
     private String url;
-    private String query;
 
     @Override
     public Class<? extends Task> taskClass() {
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index 424e857..855e759 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,7 +18,6 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -34,6 +33,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public class Plc4xSinkTask extends SinkTask {
+
     private String url;
 
     private PlcConnection plcConnection;
@@ -45,8 +45,8 @@ public class Plc4xSinkTask extends SinkTask {
 
     @Override
     public void start(Map<String, String> props) {
-        AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
-        url = config.getString(Plc4xSinkConnector.URL_CONFIG);
+        /*AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+        url = config.getString(Plc4xSinkConnector.URL_CONFIG);*/
 
         openConnection();
 
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index cddef09..200dff5 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,21 +18,16 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.plc4x.kafka.config.SourceConfig;
+import org.apache.plc4x.kafka.exceptions.ConfigurationException;
 import org.apache.plc4x.kafka.util.VersionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
 import java.util.*;
-import java.util.stream.Collectors;
 
 public class Plc4xSourceConnector extends SourceConnector {
 
@@ -58,10 +53,18 @@ public class Plc4xSourceConnector extends SourceConnector {
         .define(JSON_CONFIG, ConfigDef.Type.STRING, JSON_DEFAULT, ConfigDef.Importance.HIGH, JSON_DOC)
         .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
 
-    private String topic;
-    private List<String> queries;
-    private String json;
-    private Integer rate;
+    private SourceConfig sourceConfig;
+
+    @Override
+    public void start(Map<String, String> props) {
+        try {
+            sourceConfig = SourceConfig.fromPropertyMap(props);
+        } catch (ConfigurationException e) {
+        }
+    }
+
+    @Override
+    public void stop() {}
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -72,7 +75,7 @@ public class Plc4xSourceConnector extends SourceConnector {
     @SuppressWarnings("unchecked")
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         List<Map<String, String>> configs = new LinkedList<>();
-        if (json.isEmpty()) {
+        /*if (json.isEmpty()) {
             Map<String, List<String>> groupedByHost = new HashMap<>();
             queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, q) ->
                 groupedByHost.put(host, q.stream().map(parts -> parts[1]).collect(Collectors.toList())));
@@ -121,23 +124,11 @@ public class Plc4xSourceConnector extends SourceConnector {
             } catch (IOException e) {
                 log.error("ERROR CONFIGURING TASK", e);
             }
-        }
+        }*/
         return configs;
     }
 
     @Override
-    public void start(Map<String, String> props) {
-        AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
-        topic = config.getString(TOPIC_CONFIG);
-        queries = config.getList(QUERIES_CONFIG);
-        rate = config.getInt(RATE_CONFIG);
-        json = config.getString(JSON_CONFIG);
-    }
-
-    @Override
-    public void stop() {}
-
-    @Override
     public ConfigDef config() {
         return CONFIG_DEF;
     }
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index d353e54..dfc1c04 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -45,6 +45,7 @@ import java.util.concurrent.*;
  * If the flag does not become true, the method returns null, otherwise a fetch is performed.
  */
 public class Plc4xSourceTask extends SourceTask {
+
     static final String TOPIC_CONFIG = "topic";
     private static final String TOPIC_DOC = "Kafka topic to publish to";
 
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Defaults.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Defaults.java
new file mode 100644
index 0000000..fcdb99e
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Defaults.java
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+public class Defaults {
+
+    private final String topic;
+
+    public Defaults(String topic) {
+        this.topic = topic;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Job.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Job.java
new file mode 100644
index 0000000..eb67052
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Job.java
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+import java.time.Duration;
+import java.util.Map;
+
+public class Job {
+
+    private final String name;
+    private final int interval;
+    private final Map<String, String> fields;
+
+    public Job(String name, int interval, Map<String, String> fields) {
+        this.name = name;
+        this.interval = interval;
+        this.fields = fields;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getInterval() {
+        return interval;
+    }
+
+    public Map<String, String> getFields() {
+        return fields;
+    }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/JobReference.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/JobReference.java
new file mode 100644
index 0000000..dc58ce0
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/JobReference.java
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+public class JobReference {
+
+    private final String name;
+    private final boolean enabled;
+    private final String topic;
+
+    public JobReference(String name, boolean enabled, String topic) {
+        this.name = name;
+        this.enabled = enabled;
+        this.topic = topic;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Source.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Source.java
new file mode 100644
index 0000000..837e9b5
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Source.java
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+public class Source {
+
+    private final String name;
+    private final String connectionString;
+    private final JobReference[] jobReferences;
+
+    public Source(String name, String connectionString, JobReference[] jobReferences) {
+        this.name = name;
+        this.connectionString = connectionString;
+        this.jobReferences = jobReferences;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getConnectionString() {
+        return connectionString;
+    }
+
+    public JobReference[] getJobReferences() {
+        return jobReferences;
+    }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
new file mode 100644
index 0000000..08d133e
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
@@ -0,0 +1,146 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+import com.google.gson.*;
+import org.apache.plc4x.kafka.exceptions.ConfigurationException;
+
+import java.util.List;
+import java.util.Map;
+
+public class SourceConfig {
+
+    private final List<Source> sources;
+    private final List<Job> jobs;
+
+    public static SourceConfig fromPropertyMap(Map<String, String> properties) throws ConfigurationException {
+        try {
+            JsonObject jsonConfig = propertiesToJson(properties);
+            return new Gson().fromJson(jsonConfig, SourceConfig.class);
+        } catch (Exception e) {
+            throw new ConfigurationException("Error configuring.", e);
+        }
+    }
+
+    private static JsonObject propertiesToJson(Map<String, String> properties) {
+        JsonObject config = new JsonObject();
+
+        String defaultTopic = properties.getOrDefault("defaults.topic", null);
+        // Generally create the JSON tree structure.
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            String key = entry.getKey();
+            if("defaults.topic".equals(key)) {
+                continue;
+            }
+            JsonObject context = config;
+            String[] segments = key.split("\\.");
+            for (int i = 0; i < segments.length; i++) {
+                String segment = segments[i];
+                // If this is the leaf segment, set the value.
+                if(i == segments.length - 1) {
+                    context.add(segment, new JsonPrimitive(properties.get(key)));
+                }
+                // if it's not a leaf segment, ensure the structure exists.
+                else {
+                    if (!context.has(segment)) {
+                        context.add(segment, new JsonObject());
+                    }
+                    context = (JsonObject) context.get(segment);
+                }
+            }
+        }
+
+        // Transform the structure for the sources into a real JSON (Jackson serializable) form.
+        JsonObject oldSources = config.getAsJsonObject("sources");
+        JsonArray sources = new JsonArray();
+        for (String sourceName : oldSources.keySet()) {
+            JsonObject oldSource = oldSources.getAsJsonObject(sourceName);
+
+            // Create a new JSON object with the correct structure.
+            JsonObject source = new JsonObject();
+            source.add("name", new JsonPrimitive(sourceName));
+            String sourceTopic = oldSource.has("topic") ? oldSource.get("topic").getAsString() : defaultTopic;
+            // Copy all the other properties over.
+            for (String sourceProperty : oldSource.keySet()) {
+                if("topic".equals(sourceProperty)) {
+                    // Filter out the topic setting as this is set on jobReference level.
+                } else if("jobReferences".equals(sourceProperty)) {
+                    JsonObject oldJobReferences = oldSource.getAsJsonObject(sourceProperty);
+                    JsonArray jobReferences = new JsonArray();
+                    for (String jobReferenceName : oldJobReferences.keySet()) {
+                        JsonObject oldJobReference = oldJobReferences.getAsJsonObject(jobReferenceName);
+                        JsonObject jobReference = new JsonObject();
+                        jobReference.add("name", new JsonPrimitive(jobReferenceName));
+                        for (String jobReferenceProperty : oldJobReference.keySet()) {
+                            jobReference.add(jobReferenceProperty, oldJobReference.get(jobReferenceProperty));
+                        }
+                        if(!jobReference.has("topic")) {
+                            jobReference.add("topic", new JsonPrimitive(sourceTopic));
+                        }
+                        jobReferences.add(jobReference);
+                    }
+                    source.add(sourceProperty, jobReferences);
+                } else {
+                    source.add(sourceProperty, oldSource.get(sourceProperty));
+                }
+            }
+
+            // Add the new source to the existing sources.
+            sources.add(source);
+        }
+        config.remove("sources");
+        config.add("sources", sources);
+
+        // Transform the structure of the jobs into a real JSON form.
+        JsonObject oldJobs = config.getAsJsonObject("jobs");
+        JsonArray jobs = new JsonArray();
+        for (String jobName : oldJobs.keySet()) {
+            JsonObject oldJob = oldJobs.getAsJsonObject(jobName);
+
+            // Create a new JSON object with the correct structure.
+            JsonObject job = new JsonObject();
+            job.add("name", new JsonPrimitive(jobName));
+            // Copy all the other properties over.
+            for (String jobProperty : oldJob.keySet()) {
+                job.add(jobProperty, oldJob.get(jobProperty));
+            }
+
+            // Add the new source to the existing sources.
+            jobs.add(job);
+        }
+        config.remove("jobs");
+        config.add("jobs", jobs);
+
+        return config;
+    }
+
+    public SourceConfig(List<Source> sources, List<Job> jobs) {
+        this.sources = sources;
+        this.jobs = jobs;
+    }
+
+    public List<Source> getSources() {
+        return sources;
+    }
+
+    public List<Job> getJobs() {
+        return jobs;
+    }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/exceptions/ConfigurationException.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/exceptions/ConfigurationException.java
new file mode 100644
index 0000000..c41578a
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/exceptions/ConfigurationException.java
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.exceptions;
+
+public class ConfigurationException extends Exception {
+
+    public ConfigurationException(String message) {
+        super(message);
+    }
+
+    public ConfigurationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/config/SourceConfigTest.java b/plc4j/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/config/SourceConfigTest.java
new file mode 100644
index 0000000..1fe0d5b
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/config/SourceConfigTest.java
@@ -0,0 +1,78 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class SourceConfigTest {
+
+    @Test
+    public void parseConfig() throws Exception {
+        Properties properties = new Properties();
+        properties.load(new StringReader("name=plc-source-test\n" +
+            "connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector\n" +
+            "\n" +
+            "defaults.topic=some/default\n" +
+            "\n" +
+            "sources.machineA.connectionString=s7://1.2.3.4/1/1\n" +
+            "sources.machineA.jobReferences.s7-dashboard.enabled=true\n" +
+            "sources.machineA.jobReferences.s7-heartbeat.enabled=true\n" +
+            "sources.machineA.jobReferences.s7-heartbeat.topic=heartbeat\n" +
+            "\n" +
+            "sources.machineB.connectionString=s7://10.20.30.40/1/1\n" +
+            "sources.machineB.topic=heartbeat\n" +
+            "sources.machineB.jobReferences.s7-heartbeat.enabled=true\n" +
+            "\n" +
+            "sources.machineC.connectionString=ads://1.2.3.4.5.6\n" +
+            "sources.machineC.topic=heartbeat\n" +
+            "sources.machineC.jobReferences.ads-heartbeat.enabled=true\n" +
+            "\n" +
+            "jobs.s7-dashboard.interval=500\n" +
+            "jobs.s7-dashboard.fields.inputPreasure=%DB.DB1.4:INT\n" +
+            "jobs.s7-dashboard.fields.outputPreasure=%Q1:BYT\n" +
+            "jobs.s7-dashboard.fields.temperature=%I3:INT\n" +
+            "\n" +
+            "jobs.s7-heartbeat.interval=1000\n" +
+            "jobs.s7-heartbeat.fields.active=%I0.2:BOOL\n" +
+            "\n" +
+            "jobs.ads-heartbeat.interval=1000\n" +
+            "jobs.ads-heartbeat.fields.active=Main.running\n"));
+        SourceConfig sourceConfig = SourceConfig.fromPropertyMap(toStringMap(properties));
+
+        assertNotNull(sourceConfig);
+        assertEquals(3, sourceConfig.getSources().size(), "Expected 3 sources");
+        assertEquals(3, sourceConfig.getJobs().size(), "Expected 3 jobs");
+    }
+
+    private static Map<String, String> toStringMap(Properties properties) {
+        Map<String, String> map = new HashMap<>();
+        for (String stringPropertyName : properties.stringPropertyNames()) {
+            map.put(stringPropertyName, properties.getProperty(stringPropertyName));
+        }
+        return map;
+    }
+
+}