You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/14 17:08:16 UTC
[plc4x] 01/04: - Working on refactoring the Kafka Connect
configuration.
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 39ccdcb5370cd1d1e0220e06fd2ee3853c765838
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Aug 13 08:26:52 2019 +0200
- Working on refactoring the Kafka Connect configuration.
---
plc4j/integrations/apache-kafka/README.md | 54 ++++++--
.../apache-kafka/config/sink.properties | 34 +++--
.../apache-kafka/config/source.properties | 59 ++++++---
plc4j/integrations/apache-kafka/pom.xml | 38 ++----
.../org/apache/plc4x/kafka/Plc4xSinkConnector.java | 9 +-
.../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 6 +-
.../apache/plc4x/kafka/Plc4xSourceConnector.java | 41 +++---
.../org/apache/plc4x/kafka/Plc4xSourceTask.java | 1 +
.../org/apache/plc4x/kafka/config/Defaults.java | 33 +++++
.../java/org/apache/plc4x/kafka/config/Job.java | 48 +++++++
.../apache/plc4x/kafka/config/JobReference.java | 45 +++++++
.../java/org/apache/plc4x/kafka/config/Source.java | 45 +++++++
.../apache/plc4x/kafka/config/SourceConfig.java | 146 +++++++++++++++++++++
.../kafka/exceptions/ConfigurationException.java | 31 +++++
.../plc4x/kafka/config/SourceConfigTest.java | 78 +++++++++++
15 files changed, 565 insertions(+), 103 deletions(-)
diff --git a/plc4j/integrations/apache-kafka/README.md b/plc4j/integrations/apache-kafka/README.md
index 2f9326a..a2a073e 100644
--- a/plc4j/integrations/apache-kafka/README.md
+++ b/plc4j/integrations/apache-kafka/README.md
@@ -31,33 +31,71 @@ See `config/sink.properties` for example configuration.
## Quickstart
+A Kafka Connect worker can be run in two modes:
+- Standalone
+- Distributed
+
+Both modes require a Kafka Broker instance to be available.
+Kafka Connect is part of the Kafka distribution.
+
+In order to start a Kafka Connect system the following steps have to be performed:
+
1) Download the latest version of Apache Kafka binaries from here: https://kafka.apache.org/downloads
2) Unpack the archive.
3) Copy the target/apache-kafka-0.5.0-SNAPSHOT.jar to the Kafka "libs" directory.
4) Copy the files in the "config" to Kafka's "configs" directory (maybe inside a "plc4x" subdirectory)
-5) Open 4 console windows and change directory into that directory
-6) Start Zookeeper:
+
+### Start a Kafka Broker
+
+1) Open 4 console windows and change directory into that directory
+2) Start Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
-7) Start Kafka:
+
+3) Start Kafka:
bin/kafka-server-start.sh config/server.properties
-8) Create the "test" topic:
+
+4) Create the "test" topic:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
-9) Start the consumer:
+
+5) Start the consumer:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
-**Note:** Not quite sure here ... have to continue working on this ...
+### Start a Kafka Connect Worker (Standalone)
+
+Ideal for testing.
-10) Start Kafka connect:
+1) Start Kafka connect:
bin/connect-standalone.sh config/connect-standalone.properties config/plc4x/source.properties
+
Now watch the console window with the "kafka-console-consumer".
If you want to debug the connector, be sure to set some environment variables before starting Kafka-Connect:
export KAFKA_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
-In this case the startup will suspend till an IDE is connected via a remote-debugging session.
\ No newline at end of file
+In this case the startup will suspend till an IDE is connected via a remote-debugging session.
+
+### Start Kafka Connect Worker (Distributed Mode)
+
+Ideal for production.
+
+In this case the state of the node is handled by Zookeeper and the configuration of the connectors are distributed via Kafka topics.
+
+ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
+ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
+ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
+
+Starting the worker is then as simple as this:
+
+ bin /connect-distributed.sh config/connect-distributed.properties
+
+The configuration of the Connectors is then provided via REST interface:
+
+ curl -X POST -H "Content-Type: application/json" --data '{"name": "plc-source-test", "config": {"connector.class":"org.apache.plc4x.kafka.Plc4xSourceConnector",
+ // TODO: Continue here ...
+ "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
diff --git a/plc4j/integrations/apache-kafka/config/sink.properties b/plc4j/integrations/apache-kafka/config/sink.properties
index ae9ccba..8da6eae 100644
--- a/plc4j/integrations/apache-kafka/config/sink.properties
+++ b/plc4j/integrations/apache-kafka/config/sink.properties
@@ -1,21 +1,19 @@
-<!--
-
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
--->
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
name=plc-sink-test
connector.class=org.apache.plc4x.kafka.Plc4xSinkConnector
topics=test
diff --git a/plc4j/integrations/apache-kafka/config/source.properties b/plc4j/integrations/apache-kafka/config/source.properties
index afa7e93..83d81dd 100644
--- a/plc4j/integrations/apache-kafka/config/source.properties
+++ b/plc4j/integrations/apache-kafka/config/source.properties
@@ -1,23 +1,44 @@
-<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+name=plc-source-test
+connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
+defaults.topic=some/default
-http://www.apache.org/licenses/LICENSE-2.0
+sources.machineA.connectionString=s7://1.2.3.4/1/1
+sources.machineA.jobReferences.s7-dashboard.enabled=true
+sources.machineA.jobReferences.s7-heartbeat.enabled=true
+sources.machineA.jobReferences.s7-heartbeat.topic=heartbeat
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
+sources.machineB.connectionString=s7://10.20.30.40/1/1
+sources.machineB.topic=heartbeat
+sources.machineB.jobReferences.s7-heartbeat.enabled=true
--->
-name=plc-source-test
-connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
-topic=test
-queries=test:unused#RANDOM/foo:INTEGER,test:another#RANDOM/bar:STRING
-rate=2000
\ No newline at end of file
+sources.machineC.connectionString=ads://1.2.3.4.5.6
+sources.machineC.topic=heartbeat
+sources.machineC.jobReferences.ads-heartbeat.enabled=true
+
+jobs.s7-dashboard.interval=500
+jobs.s7-dashboard.fields.inputPreasure=%DB.DB1.4:INT
+jobs.s7-dashboard.fields.outputPreasure=%Q1:BYT
+jobs.s7-dashboard.fields.temperature=%I3:INT
+
+jobs.s7-heartbeat.interval=1000
+jobs.s7-heartbeat.fields.active=%I0.2:BOOL
+
+jobs.ads-heartbeat.interval=1000
+jobs.ads-heartbeat.fields.active=Main.running
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/pom.xml b/plc4j/integrations/apache-kafka/pom.xml
index 33b698e..3e38ccb 100644
--- a/plc4j/integrations/apache-kafka/pom.xml
+++ b/plc4j/integrations/apache-kafka/pom.xml
@@ -32,15 +32,26 @@
<description>Integration module for integrating PLC4X into Apache Kafka (Kafka-Connect-Plugin)</description>
<properties>
- <kafka.version>2.0.0</kafka.version>
+ <kafka.version>2.3.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-api</artifactId>
- <version>${project.version}</version>
+ <version>0.5.0-SNAPSHOT</version>
</dependency>
+ <!--dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-scraper</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ </dependency-->
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
@@ -48,15 +59,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <!-- TODO: Maybe this can be removed (and the exclusion in the dependency plugin) -->
- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
@@ -64,18 +66,4 @@
</dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <ignoredDependencies combine.children="append">
- <ignoredDependency>org.apache.kafka:connect-api</ignoredDependency>
- </ignoredDependencies>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
</project>
\ No newline at end of file
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index c68a1a8..7e0b477 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -30,14 +30,13 @@ import java.util.List;
import java.util.Map;
public class Plc4xSinkConnector extends SinkConnector {
- static final String URL_CONFIG = "url";
- private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
- static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
+ private static final String URL_CONFIG = "url";
+ private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
+ private static final ConfigDef CONFIG_DEF =
+ new ConfigDef().define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
private String url;
- private String query;
@Override
public Class<? extends Task> taskClass() {
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index 424e857..855e759 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,7 +18,6 @@ under the License.
*/
package org.apache.plc4x.kafka;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -34,6 +33,7 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
public class Plc4xSinkTask extends SinkTask {
+
private String url;
private PlcConnection plcConnection;
@@ -45,8 +45,8 @@ public class Plc4xSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
- AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
- url = config.getString(Plc4xSinkConnector.URL_CONFIG);
+ /*AbstractConfig config = new AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+ url = config.getString(Plc4xSinkConnector.URL_CONFIG);*/
openConnection();
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index cddef09..200dff5 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,21 +18,16 @@ under the License.
*/
package org.apache.plc4x.kafka;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.plc4x.kafka.config.SourceConfig;
+import org.apache.plc4x.kafka.exceptions.ConfigurationException;
import org.apache.plc4x.kafka.util.VersionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
import java.util.*;
-import java.util.stream.Collectors;
public class Plc4xSourceConnector extends SourceConnector {
@@ -58,10 +53,18 @@ public class Plc4xSourceConnector extends SourceConnector {
.define(JSON_CONFIG, ConfigDef.Type.STRING, JSON_DEFAULT, ConfigDef.Importance.HIGH, JSON_DOC)
.define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
- private String topic;
- private List<String> queries;
- private String json;
- private Integer rate;
+ private SourceConfig sourceConfig;
+
+ @Override
+ public void start(Map<String, String> props) {
+ try {
+ sourceConfig = SourceConfig.fromPropertyMap(props);
+ } catch (ConfigurationException e) {
+ }
+ }
+
+ @Override
+ public void stop() {}
@Override
public Class<? extends Task> taskClass() {
@@ -72,7 +75,7 @@ public class Plc4xSourceConnector extends SourceConnector {
@SuppressWarnings("unchecked")
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new LinkedList<>();
- if (json.isEmpty()) {
+ /*if (json.isEmpty()) {
Map<String, List<String>> groupedByHost = new HashMap<>();
queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, q) ->
groupedByHost.put(host, q.stream().map(parts -> parts[1]).collect(Collectors.toList())));
@@ -121,23 +124,11 @@ public class Plc4xSourceConnector extends SourceConnector {
} catch (IOException e) {
log.error("ERROR CONFIGURING TASK", e);
}
- }
+ }*/
return configs;
}
@Override
- public void start(Map<String, String> props) {
- AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
- topic = config.getString(TOPIC_CONFIG);
- queries = config.getList(QUERIES_CONFIG);
- rate = config.getInt(RATE_CONFIG);
- json = config.getString(JSON_CONFIG);
- }
-
- @Override
- public void stop() {}
-
- @Override
public ConfigDef config() {
return CONFIG_DEF;
}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index d353e54..dfc1c04 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -45,6 +45,7 @@ import java.util.concurrent.*;
* If the flag does not become true, the method returns null, otherwise a fetch is performed.
*/
public class Plc4xSourceTask extends SourceTask {
+
static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Kafka topic to publish to";
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Defaults.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Defaults.java
new file mode 100644
index 0000000..fcdb99e
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Defaults.java
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+public class Defaults {
+
+ private final String topic;
+
+ public Defaults(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Job.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Job.java
new file mode 100644
index 0000000..eb67052
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Job.java
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+import java.time.Duration;
+import java.util.Map;
+
+public class Job {
+
+ private final String name;
+ private final int interval;
+ private final Map<String, String> fields;
+
+ public Job(String name, int interval, Map<String, String> fields) {
+ this.name = name;
+ this.interval = interval;
+ this.fields = fields;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getInterval() {
+ return interval;
+ }
+
+ public Map<String, String> getFields() {
+ return fields;
+ }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/JobReference.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/JobReference.java
new file mode 100644
index 0000000..dc58ce0
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/JobReference.java
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+public class JobReference {
+
+ private final String name;
+ private final boolean enabled;
+ private final String topic;
+
+ public JobReference(String name, boolean enabled, String topic) {
+ this.name = name;
+ this.enabled = enabled;
+ this.topic = topic;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Source.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Source.java
new file mode 100644
index 0000000..837e9b5
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Source.java
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+public class Source {
+
+ private final String name;
+ private final String connectionString;
+ private final JobReference[] jobReferences;
+
+ public Source(String name, String connectionString, JobReference[] jobReferences) {
+ this.name = name;
+ this.connectionString = connectionString;
+ this.jobReferences = jobReferences;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getConnectionString() {
+ return connectionString;
+ }
+
+ public JobReference[] getJobReferences() {
+ return jobReferences;
+ }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
new file mode 100644
index 0000000..08d133e
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SourceConfig.java
@@ -0,0 +1,146 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+import com.google.gson.*;
+import org.apache.plc4x.kafka.exceptions.ConfigurationException;
+
+import java.util.List;
+import java.util.Map;
+
+public class SourceConfig {
+
+ private final List<Source> sources;
+ private final List<Job> jobs;
+
+ public static SourceConfig fromPropertyMap(Map<String, String> properties) throws ConfigurationException {
+ try {
+ JsonObject jsonConfig = propertiesToJson(properties);
+ return new Gson().fromJson(jsonConfig, SourceConfig.class);
+ } catch (Exception e) {
+ throw new ConfigurationException("Error configuring.", e);
+ }
+ }
+
+ private static JsonObject propertiesToJson(Map<String, String> properties) {
+ JsonObject config = new JsonObject();
+
+ String defaultTopic = properties.getOrDefault("defaults.topic", null);
+ // Generally create the JSON tree structure.
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ String key = entry.getKey();
+ if("defaults.topic".equals(key)) {
+ continue;
+ }
+ JsonObject context = config;
+ String[] segments = key.split("\\.");
+ for (int i = 0; i < segments.length; i++) {
+ String segment = segments[i];
+ // If this is the leaf segment, set the value.
+ if(i == segments.length - 1) {
+ context.add(segment, new JsonPrimitive(properties.get(key)));
+ }
+ // if it's not a leaf segment, ensure the structure exists.
+ else {
+ if (!context.has(segment)) {
+ context.add(segment, new JsonObject());
+ }
+ context = (JsonObject) context.get(segment);
+ }
+ }
+ }
+
+ // Transform the structure for the sources into a real JSON (Jackson serializable) form.
+ JsonObject oldSources = config.getAsJsonObject("sources");
+ JsonArray sources = new JsonArray();
+ for (String sourceName : oldSources.keySet()) {
+ JsonObject oldSource = oldSources.getAsJsonObject(sourceName);
+
+ // Create a new JSON object with the correct structure.
+ JsonObject source = new JsonObject();
+ source.add("name", new JsonPrimitive(sourceName));
+ String sourceTopic = oldSource.has("topic") ? oldSource.get("topic").getAsString() : defaultTopic;
+ // Copy all the other properties over.
+ for (String sourceProperty : oldSource.keySet()) {
+ if("topic".equals(sourceProperty)) {
+ // Filter out the topic setting as this is set on jobReference level.
+ } else if("jobReferences".equals(sourceProperty)) {
+ JsonObject oldJobReferences = oldSource.getAsJsonObject(sourceProperty);
+ JsonArray jobReferences = new JsonArray();
+ for (String jobReferenceName : oldJobReferences.keySet()) {
+ JsonObject oldJobReference = oldJobReferences.getAsJsonObject(jobReferenceName);
+ JsonObject jobReference = new JsonObject();
+ jobReference.add("name", new JsonPrimitive(jobReferenceName));
+ for (String jobReferenceProperty : oldJobReference.keySet()) {
+ jobReference.add(jobReferenceProperty, oldJobReference.get(jobReferenceProperty));
+ }
+ if(!jobReference.has("topic")) {
+ jobReference.add("topic", new JsonPrimitive(sourceTopic));
+ }
+ jobReferences.add(jobReference);
+ }
+ source.add(sourceProperty, jobReferences);
+ } else {
+ source.add(sourceProperty, oldSource.get(sourceProperty));
+ }
+ }
+
+ // Add the new source to the existing sources.
+ sources.add(source);
+ }
+ config.remove("sources");
+ config.add("sources", sources);
+
+ // Transform the structure of the jobs into a real JSON form.
+ JsonObject oldJobs = config.getAsJsonObject("jobs");
+ JsonArray jobs = new JsonArray();
+ for (String jobName : oldJobs.keySet()) {
+ JsonObject oldJob = oldJobs.getAsJsonObject(jobName);
+
+ // Create a new JSON object with the correct structure.
+ JsonObject job = new JsonObject();
+ job.add("name", new JsonPrimitive(jobName));
+ // Copy all the other properties over.
+ for (String jobProperty : oldJob.keySet()) {
+ job.add(jobProperty, oldJob.get(jobProperty));
+ }
+
+ // Add the new source to the existing sources.
+ jobs.add(job);
+ }
+ config.remove("jobs");
+ config.add("jobs", jobs);
+
+ return config;
+ }
+
+ public SourceConfig(List<Source> sources, List<Job> jobs) {
+ this.sources = sources;
+ this.jobs = jobs;
+ }
+
+ public List<Source> getSources() {
+ return sources;
+ }
+
+ public List<Job> getJobs() {
+ return jobs;
+ }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/exceptions/ConfigurationException.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/exceptions/ConfigurationException.java
new file mode 100644
index 0000000..c41578a
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/exceptions/ConfigurationException.java
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.exceptions;
+
+public class ConfigurationException extends Exception {
+
+ public ConfigurationException(String message) {
+ super(message);
+ }
+
+ public ConfigurationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/plc4j/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/config/SourceConfigTest.java b/plc4j/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/config/SourceConfigTest.java
new file mode 100644
index 0000000..1fe0d5b
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/config/SourceConfigTest.java
@@ -0,0 +1,78 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka.config;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class SourceConfigTest {
+
+ @Test
+ public void parseConfig() throws Exception {
+ Properties properties = new Properties();
+ properties.load(new StringReader("name=plc-source-test\n" +
+ "connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector\n" +
+ "\n" +
+ "defaults.topic=some/default\n" +
+ "\n" +
+ "sources.machineA.connectionString=s7://1.2.3.4/1/1\n" +
+ "sources.machineA.jobReferences.s7-dashboard.enabled=true\n" +
+ "sources.machineA.jobReferences.s7-heartbeat.enabled=true\n" +
+ "sources.machineA.jobReferences.s7-heartbeat.topic=heartbeat\n" +
+ "\n" +
+ "sources.machineB.connectionString=s7://10.20.30.40/1/1\n" +
+ "sources.machineB.topic=heartbeat\n" +
+ "sources.machineB.jobReferences.s7-heartbeat.enabled=true\n" +
+ "\n" +
+ "sources.machineC.connectionString=ads://1.2.3.4.5.6\n" +
+ "sources.machineC.topic=heartbeat\n" +
+ "sources.machineC.jobReferences.ads-heartbeat.enabled=true\n" +
+ "\n" +
+ "jobs.s7-dashboard.interval=500\n" +
+ "jobs.s7-dashboard.fields.inputPreasure=%DB.DB1.4:INT\n" +
+ "jobs.s7-dashboard.fields.outputPreasure=%Q1:BYT\n" +
+ "jobs.s7-dashboard.fields.temperature=%I3:INT\n" +
+ "\n" +
+ "jobs.s7-heartbeat.interval=1000\n" +
+ "jobs.s7-heartbeat.fields.active=%I0.2:BOOL\n" +
+ "\n" +
+ "jobs.ads-heartbeat.interval=1000\n" +
+ "jobs.ads-heartbeat.fields.active=Main.running\n"));
+ SourceConfig sourceConfig = SourceConfig.fromPropertyMap(toStringMap(properties));
+
+ assertNotNull(sourceConfig);
+ assertEquals(3, sourceConfig.getSources().size(), "Expected 3 sources");
+ assertEquals(3, sourceConfig.getJobs().size(), "Expected 3 jobs");
+ }
+
+ private static Map<String, String> toStringMap(Properties properties) {
+ Map<String, String> map = new HashMap<>();
+ for (String stringPropertyName : properties.stringPropertyNames()) {
+ map.put(stringPropertyName, properties.getProperty(stringPropertyName));
+ }
+ return map;
+ }
+
+}