You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2020/11/06 11:36:47 UTC

[plc4x] branch feature/kafkasink updated: Documentation and multiple tasks can be configured.

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch feature/kafkasink
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/feature/kafkasink by this push:
     new 19ad493  Documentation and multiple tasks can be configured.
19ad493 is described below

commit 19ad493c5112420f0660b1736d3a29450badfc86
Author: hutcheb <be...@gmail.com>
AuthorDate: Fri Nov 6 06:35:53 2020 -0500

    Documentation and multiple tasks can be configured.
    
    Added topic check when having multiple tasks so that each task will only
    read from one topic.
---
 plc4j/integrations/apache-kafka/README.md          | 61 ++++++++++++++++++++--
 .../apache-kafka/config/plc4x-sink.properties      |  3 +-
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |  9 +---
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 24 ++++++---
 .../java/org/apache/plc4x/kafka/config/Sink.java   |  8 ++-
 .../org/apache/plc4x/kafka/config/SinkConfig.java  |  7 ++-
 6 files changed, 86 insertions(+), 26 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/README.md b/plc4j/integrations/apache-kafka/README.md
index 37e75868..b82f7dd 100644
--- a/plc4j/integrations/apache-kafka/README.md
+++ b/plc4j/integrations/apache-kafka/README.md
@@ -19,12 +19,18 @@
 
 # Kafka Connect PLC4X Connector
 
-The PLC4X Connector streams data from any device accessible through the PLC4X interface.
+The PLC4X Connector provide both a source and sink connector.
+The source connector streams data from any device accessible through the PLC4X interface.
+The sink connector writes data from a Kafka topic to a device.
 
 ## Source Connector
 
 See `config/source.properties` for example configuration.
 
+## Sink Connector
+
+See `config/sink.properties` for example configuration.
+
 ## Quickstart
 
 A Kafka Connect worker can be run in two modes: 
@@ -60,7 +66,7 @@ In order to start a Kafka Connect system the following steps have to be performe
         
         bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
 
-### Start a Kafka Connect Worker (Standalone)
+### Start a Kafka Source Connect Worker (Standalone)
 
 Ideal for testing. 
 
@@ -76,7 +82,7 @@ If you want to debug the connector, be sure to set some environment variables be
 
 In this case the startup will suspend till an IDE is connected via a remote-debugging session.
 
-### Start Kafka Connect Worker (Distributed Mode)
+### Start Kafka Source Connect Worker (Distributed Mode)
 
 Ideal for production.
 
@@ -95,3 +101,52 @@ The configuration of the Connectors is then provided via REST interface:
     curl -X POST -H "Content-Type: application/json" --data '{"name": "plc-source-test", "config": {"connector.class":"org.apache.plc4x.kafka.Plc4xSourceConnector", 
     // TODO: Continue here ...
     "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
+
+
+### Start a Kafka Sink Connect Worker (Standalone)
+
+Ideal for testing. 
+
+1) Start Kafka connect:
+        
+        bin/connect-standalone.sh config/connect-standalone.properties config/plc4x-sink.properties
+
+Now open console window with "kafka-console-producer".
+
+Producing to the kafka topic using the sample packet shown below should result in the array being sent to the modbus device.
+
+    {"schema":{"type":"struct",
+               "fields":[
+                   {"type":"string","optional":false,"field":"address"},
+                   {"type":"string","optional":false,"field":"value"},
+                   {"type":"int64","optional":true,"default":0,"field":"expires"}]},
+     "payload":{
+         "address":"400001:INT[2]",
+         "value":"[655,9]",
+         "expires":1605575611044}}
+
+If you want to debug the connector, be sure to set some environment variables before starting Kafka-Connect:
+
+        export KAFKA_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
+
+In this case the startup will suspend till an IDE is connected via a remote-debugging session.
+
+### Start Kafka Sink Connect Worker (Distributed Mode)
+
+Ideal for production.
+
+In this case the state of the node is handled by Zookeeper and the configuration of the connectors are distributed via Kafka topics.
+
+    bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
+    bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
+    bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
+
+Starting the worker is then as simple as this:
+
+    bin /connect-distributed.sh config/connect-distributed.properties
+    
+The configuration of the Connectors is then provided via REST interface:
+
+    curl -X POST -H "Content-Type: application/json" --data '{"name": "plc-sink-test", "config": {"connector.class":"org.apache.plc4x.kafka.Plc4xSinkConnector", 
+    // TODO: Continue here ...
+    "tasks.max":"1", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
diff --git a/plc4j/integrations/apache-kafka/config/plc4x-sink.properties b/plc4j/integrations/apache-kafka/config/plc4x-sink.properties
index 83ff4d2..e680b5c 100644
--- a/plc4j/integrations/apache-kafka/config/plc4x-sink.properties
+++ b/plc4j/integrations/apache-kafka/config/plc4x-sink.properties
@@ -16,13 +16,12 @@
 #
 name=plc-1
 connector.class=org.apache.plc4x.kafka.Plc4xSinkConnector
-default-topic=machineData
 topics=machineData
 tasks.max=3
 
 sinks=machineA
 sinks.machineA.connectionString=modbus://localhost
-sinks.machineA.topc=machineData
+sinks.machineA.topic=machineData
 
 bootstrap.servers=127.0.0.1:9092
 key.converter=org.apache.kafka.connect.json.JsonConverter
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 8a83552..3a76cee 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -38,9 +38,6 @@ public class Plc4xSinkConnector extends SinkConnector {
 
     private static final Logger log = LoggerFactory.getLogger(Plc4xSinkConnector.class);
 
-    public static final String DEFAULT_TOPIC_CONFIG = "default-topic";
-    private static final String DEFAULT_TOPIC_DOC = "Default topic to be used, if not otherwise configured.";
-
     public static final String SINK_CONFIG = "sinks";
     private static final String SINK_DOC = "List of sink names that will be configured.";
 
@@ -76,6 +73,7 @@ public class Plc4xSinkConnector extends SinkConnector {
             Map<String, String> taskConfig = new HashMap<>();
             taskConfig.put(Plc4xSinkTask.CONNECTION_NAME_CONFIG, sink.getName());
             taskConfig.put(Plc4xSinkTask.PLC4X_CONNECTION_STRING_CONFIG, sink.getConnectionString());
+            taskConfig.put(Plc4xSinkTask.PLC4X_TOPIC_CONFIG, sink.getTopic());
             configs.add(taskConfig);
         }
         return configs;
@@ -92,14 +90,10 @@ public class Plc4xSinkConnector extends SinkConnector {
         // Add the dynamic parts of the config
 
         // Find the important config elements
-        String defaultTopic = null;
         ConfigValue sinks = null;
 
         for (ConfigValue configValue : config.configValues()) {
             switch (configValue.name()) {
-                case DEFAULT_TOPIC_CONFIG:
-                    defaultTopic = (String) configValue.value();
-                    break;
                 case SINK_CONFIG:
                     sinks = configValue;
                     break;
@@ -136,7 +130,6 @@ public class Plc4xSinkConnector extends SinkConnector {
     @Override
     public ConfigDef config() {
         return new ConfigDef()
-            .define(DEFAULT_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, DEFAULT_TOPIC_DOC)
             .define(SINK_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, SINK_DOC);
     }
 
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index 7ea7985..922799d 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -58,21 +58,20 @@ public class Plc4xSinkTask extends SinkTask {
     static final String PLC4X_CONNECTION_STRING_CONFIG = "plc4x-connection-string";
     private static final String PLC4X_CONNECTION_STRING_DOC = "PLC4X Connection String";
 
+    static final String PLC4X_TOPIC_CONFIG = "topic";
+    private static final String PLC4X_TOPIC_DOC = "Task Topic";
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CONNECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CONNECTION_NAME_STRING_DOC)
-        .define(PLC4X_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PLC4X_CONNECTION_STRING_DOC);
+        .define(PLC4X_CONNECTION_STRING_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PLC4X_CONNECTION_STRING_DOC)
+        .define(PLC4X_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PLC4X_TOPIC_DOC);
 
     /*
      * Configuration of the output.
      */
     private static final String SINK_NAME_FIELD = "sink-name";
-    private static final String JOB_NAME_FIELD = "job-name";
+    private static final String SINK_TOPIC_FIELD = "topic";
 
-    private static final Schema KEY_SCHEMA =
-        new SchemaBuilder(Schema.Type.STRUCT)
-            .field(SINK_NAME_FIELD, Schema.STRING_SCHEMA)
-            .field(JOB_NAME_FIELD, Schema.STRING_SCHEMA)
-            .build();
 
     @Override
     public String version() {
@@ -82,12 +81,14 @@ public class Plc4xSinkTask extends SinkTask {
     private PlcDriverManager driverManager;
     private Transformation<SinkRecord> transformation;
     private String plc4xConnectionString;
+    private String plc4xTopic;
 
     @Override
     public void start(Map<String, String> props) {
         AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
         String connectionName = config.getString(CONNECTION_NAME_CONFIG);
         plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
+        plc4xTopic = config.getString(PLC4X_TOPIC_CONFIG);
         Map<String, String> topics = new HashMap<>();
         log.info("Creating Pooled PLC4x driver manager");
         driverManager = new PooledPlcDriverManager();
@@ -108,10 +109,16 @@ public class Plc4xSinkTask extends SinkTask {
 
         for (SinkRecord r: records) {
             Struct record = (Struct) r.value();
+            String topic = r.topic();
             String address = record.getString("address");
             String value = record.getString("value");
             Long expires = record.getInt64("expires");
 
+            if (!topic.equals(plc4xTopic)) {
+                log.debug("Ignoring write request recived on wrong topic");
+                return;
+            }
+
             if ((System.currentTimeMillis() > expires) & !(expires == 0)) {
                 log.warn("Write request has expired {}, discarding {}", System.currentTimeMillis(), address);
                 return;
@@ -120,7 +127,7 @@ public class Plc4xSinkTask extends SinkTask {
             PlcConnection connection = null;
             try {
                 connection = driverManager.getConnection(plc4xConnectionString);
-            } catch (PlcConnectionException e) {                
+            } catch (PlcConnectionException e) {
                 log.warn("Failed to Open Connection {}", plc4xConnectionString);
             }
 
@@ -144,6 +151,7 @@ public class Plc4xSinkTask extends SinkTask {
 
             try {
                 writeRequest.execute().get();
+                log.info("Wrote {} to device {}", address, plc4xConnectionString);
             } catch (InterruptedException | ExecutionException e) {
                 log.warn("Failed to Write to {}", plc4xConnectionString);
             }
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java
index 07ba33f..35192d2 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/Sink.java
@@ -22,10 +22,12 @@ public class Sink {
 
     private final String name;
     private final String connectionString;
+    private final String topic;
 
-    public Sink(String name, String connectionString) {
+    public Sink(String name, String connectionString, String topic) {
         this.name = name;
         this.connectionString = connectionString;
+        this.topic = topic;
     }
 
     public String getName() {
@@ -36,4 +38,8 @@ public class Sink {
         return connectionString;
     }
 
+    public String getTopic() {
+        return topic;
+    }
+
 }
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java
index 29de508..dd95496 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/config/SinkConfig.java
@@ -33,16 +33,15 @@ public class SinkConfig {
     private final List<Sink> sinks;
 
     public static SinkConfig fromPropertyMap(Map<String, String> properties) {
-        String defaultTopic = properties.getOrDefault(Plc4xSinkConnector.DEFAULT_TOPIC_CONFIG, null);
 
         String[] sinkNames = properties.getOrDefault(Plc4xSinkConnector.SINK_CONFIG, "").split(",");
         List<Sink> sinks = new ArrayList<>(sinkNames.length);
         for (String sinkName : sinkNames) {
             String connectionString = properties.get(
                 Plc4xSinkConnector.SINK_CONFIG + "." + sinkName + "." + CONNECTION_STRING_CONFIG);
-            String sinkTopic = properties.getOrDefault(
-                Plc4xSinkConnector.SINK_CONFIG + "." + sinkName + "." + TOPIC_CONFIG, defaultTopic);
-            Sink sink = new Sink(sinkName, connectionString);
+            String sinkTopic = properties.get(
+                Plc4xSinkConnector.SINK_CONFIG + "." + sinkName + "." + TOPIC_CONFIG);
+            Sink sink = new Sink(sinkName, connectionString, sinkTopic);
             sinks.add(sink);
         }