You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/06 21:14:54 UTC

[incubator-plc4x] 03/05: added configuration options

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/apache-kafka
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit b430e28a55684762e8ea93212c210677f6cd2448
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Thu Sep 6 11:59:19 2018 +0200

    added configuration options
---
 .../org/apache/plc4x/kafka/common/Plc4xConfig.java | 26 ++++++++++++-
 .../apache/plc4x/kafka/source/Plc4xSourceTask.java | 44 ++++++++++++++++------
 2 files changed, 57 insertions(+), 13 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
index bceedfc..b906fab 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
@@ -33,6 +33,15 @@ public class Plc4xConfig extends AbstractConfig {
     public static final String PLC_CONNECTION_STRING_DISPLAY = "PLC Connection String";
     public static final String PLC_CONNECTION_STRING_DOC = "Connection string used by PLC4X to connect to the PLC.";
 
+    public static final String PLC_TOPIC = "topic";
+    public static final String PLC_TOPIC_DOC = "Kafka topic to publish messages to.";
+
+    public static final String PLC_DATATYPE_CONFIG = "type";
+    public static final String PLC_DATATYPE_DOC = "Data type of values sent or received by PLC.";
+
+    public static final String PLC_ADDRESS = "address";
+    public static final String PLC_ADDRESS_DOC = "PLC address to sent to or receive data from.";
+
     public static ConfigDef baseConfigDef() {
         ConfigDef config = new ConfigDef();
         addPlcOptions(config);
@@ -44,7 +53,22 @@ public class Plc4xConfig extends AbstractConfig {
             PLC_CONNECTION_STRING_CONFIG,
             Type.STRING,
             Importance.HIGH,
-            PLC_CONNECTION_STRING_DOC);
+            PLC_CONNECTION_STRING_DOC)
+        .define(
+            PLC_DATATYPE_CONFIG,
+            Type.CLASS,
+            Importance.HIGH,
+            PLC_DATATYPE_DOC)
+        .define(
+            PLC_TOPIC,
+            Type.STRING,
+            Importance.HIGH,
+            PLC_TOPIC_DOC)
+        .define(
+            PLC_ADDRESS,
+            Type.STRING,
+            Importance.HIGH,
+            PLC_ADDRESS_DOC);
     }
 
     public static final ConfigDef CONFIG_DEF = baseConfigDef();
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index 36e37a0..040df2f 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -19,8 +19,7 @@ under the License.
 package org.apache.plc4x.kafka.source;
 
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -28,6 +27,7 @@ import org.apache.plc4x.java.PlcDriverManager;
 import org.apache.plc4x.java.api.connection.PlcConnection;
 import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcInvalidAddressException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.messages.items.ReadResponseItem;
@@ -36,11 +36,7 @@ import org.apache.plc4x.kafka.util.VersionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -53,6 +49,11 @@ public class Plc4xSourceTask extends SourceTask {
     private PlcReader reader;
     private PlcReadRequest readRequest;
     private AtomicBoolean running = new AtomicBoolean(false);
+    private String topic;
+    private Schema keySchema = Schema.STRING_SCHEMA;
+    private Schema valueSchema;
+    private long offset = 0;
+    private final Map<Class<?>, Schema> typeSchemas = initTypeSchemas();
 
     @Override
     public String version() {
@@ -64,7 +65,7 @@ public class Plc4xSourceTask extends SourceTask {
         try {
             config = new Plc4xSourceConfig(properties);
         } catch (ConfigException e) {
-            throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
+            throw new ConnectException("Couldn't start Plc4xSourceTask due to configuration error", e);
         }
         final String url = config.getString(Plc4xSourceConfig.PLC_CONNECTION_STRING_CONFIG);
 
@@ -75,12 +76,28 @@ public class Plc4xSourceTask extends SourceTask {
                 throw new ConnectException("PlcReader not available for this type of connection");
             }
             reader = readerOptional.get();
+            Class<?> dataType = config.getClass(Plc4xSourceConfig.PLC_DATATYPE_CONFIG);
+            String addressString = config.getString(Plc4xSourceConfig.PLC_ADDRESS);
+            Address address = plcConnection.parseAddress(addressString);
+            readRequest = new PlcReadRequest(dataType, address);
+            topic = config.getString(Plc4xSourceConfig.PLC_TOPIC);
+            valueSchema = typeSchemas.get(dataType);
             running.set(true);
         } catch (PlcConnectionException e) {
             throw new ConnectException("Caught exception while connecting to PLC", e);
+        } catch (PlcInvalidAddressException e) {
+            throw new ConnectException("Invalid PLC address", e);
         }
     }
 
+    private Map<Class<?>, Schema> initTypeSchemas() {
+        Map<Class<?>, Schema> map = new HashMap<>();
+        map.put(Boolean.class, Schema.BOOLEAN_SCHEMA);
+        map.put(Integer.class, Schema.INT32_SCHEMA);
+        // TODO add other
+        return map;
+    }
+
     @Override
     public void stop() {
         if(plcConnection != null) {
@@ -103,10 +120,13 @@ public class Plc4xSourceTask extends SourceTask {
 
                 for (ReadResponseItem<?> responseItem : plcReadResponse.getResponseItems()) {
                     Address address = responseItem.getRequestItem().getAddress();
-                    List<?> values = responseItem.getValues();
-
-                    // TODO: Implement Sending this information to Kafka ...
-                    //results.add(new SourceRecord())
+                    for (Object value : responseItem.getValues()) {
+                        Map<String, String> sourcePartition = Collections.singletonMap("address", address.toString());
+                        Map<String, Long> sourceOffset = Collections.singletonMap("offset", offset);
+                        SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, keySchema, address.toString(), valueSchema, value);
+                        results.add(record);
+                        offset++; // TODO: figure out how to track offsets
+                    }
                 }
             } catch (ExecutionException e) {
                 log.error("Error reading values from PLC", e);