You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/09/10 16:20:14 UTC

[incubator-plc4x] branch skorikov-feature/api-redesign-chris-c updated (398f4b9 -> aff5bb3)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git.


    from 398f4b9  - Implemented getByte for booleanfield - Fixed tbe IotElasticsearchFactory example
     new d6d13cc  adjusted kafka source connector to new api
     new 1b0ca49  implemented throttling
     new 28a70be  refactoring
     new d74d905  removed unnecessary code
     new 334a4eb  added further connect schemas
     new 5ee9fa9  implemented kafka sink connector
     new 955ad74  implemented kafka sink connector
     new aff5bb3  Merge branch 'feature/api-redesign-chris-c' of https://github.com/skorikov/incubator-plc4x into skorikov-feature/api-redesign-chris-c

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |  41 +++--
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 101 +++++++++++
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |  50 ++++--
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 197 +++++++++++++++++++++
 .../org/apache/plc4x/kafka/common/Plc4xConfig.java |  92 ----------
 .../apache/plc4x/kafka/sink/Plc4xSinkConfig.java   |  49 -----
 .../org/apache/plc4x/kafka/sink/Plc4xSinkTask.java | 119 -------------
 .../plc4x/kafka/source/Plc4xSourceConfig.java      |  49 -----
 .../apache/plc4x/kafka/source/Plc4xSourceTask.java | 133 --------------
 .../org/apache/plc4x/kafka/util/VersionUtil.java   |   2 +-
 .../apache/plc4x/kafka/Plc4XSinkConfigTest.java    |  31 ----
 .../apache/plc4x/kafka/Plc4XSourceConfigTest.java  |  31 ----
 plc4j/protocols/pom.xml                            |   2 +-
 .../org/apache/plc4x/java/test/TestDevice.java     |   7 +-
 .../java/org/apache/plc4x/java/test/TestField.java |   5 +
 .../java/org/apache/plc4x/java/test/TestType.java  |   3 +-
 16 files changed, 371 insertions(+), 541 deletions(-)
 create mode 100644 integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
 create mode 100644 integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
 delete mode 100644 integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
 delete mode 100644 integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java
 delete mode 100644 integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java
 delete mode 100644 integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java
 delete mode 100644 integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
 delete mode 100644 integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java
 delete mode 100644 integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java


[incubator-plc4x] 08/08: Merge branch 'feature/api-redesign-chris-c' of https://github.com/skorikov/incubator-plc4x into skorikov-feature/api-redesign-chris-c

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit aff5bb300684f60032a7722482b9a0bc9f111069
Merge: 398f4b9 955ad74
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Sep 10 18:17:50 2018 +0200

    Merge branch 'feature/api-redesign-chris-c' of https://github.com/skorikov/incubator-plc4x into skorikov-feature/api-redesign-chris-c

 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |  41 +++--
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 101 +++++++++++
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |  50 ++++--
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 197 +++++++++++++++++++++
 .../org/apache/plc4x/kafka/common/Plc4xConfig.java |  92 ----------
 .../apache/plc4x/kafka/sink/Plc4xSinkConfig.java   |  49 -----
 .../org/apache/plc4x/kafka/sink/Plc4xSinkTask.java | 119 -------------
 .../plc4x/kafka/source/Plc4xSourceConfig.java      |  49 -----
 .../apache/plc4x/kafka/source/Plc4xSourceTask.java | 133 --------------
 .../org/apache/plc4x/kafka/util/VersionUtil.java   |   2 +-
 .../apache/plc4x/kafka/Plc4XSinkConfigTest.java    |  31 ----
 .../apache/plc4x/kafka/Plc4XSourceConfigTest.java  |  31 ----
 plc4j/protocols/pom.xml                            |   2 +-
 .../org/apache/plc4x/java/test/TestDevice.java     |   7 +-
 .../java/org/apache/plc4x/java/test/TestField.java |   5 +
 .../java/org/apache/plc4x/java/test/TestType.java  |   3 +-
 16 files changed, 371 insertions(+), 541 deletions(-)


[incubator-plc4x] 01/08: adjusted kafka source connector to new api

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit d6d13cc6ee608d2219674befb6ce93b9576890f2
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 11:05:38 2018 +0200

    adjusted kafka source connector to new api
---
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |  41 ++++---
 .../plc4x/kafka/source/Plc4xSourceConfig.java      |  49 --------
 .../apache/plc4x/kafka/source/Plc4xSourceTask.java | 126 ++++++++++-----------
 .../org/apache/plc4x/kafka/util/VersionUtil.java   |   2 +-
 .../apache/plc4x/kafka/Plc4XSourceConfigTest.java  |   9 +-
 .../java/org/apache/plc4x/java/test/TestField.java |   3 +
 6 files changed, 98 insertions(+), 132 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4de195b..4885771 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -21,20 +21,32 @@ package org.apache.plc4x.kafka;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.plc4x.kafka.source.Plc4xSourceConfig;
 import org.apache.plc4x.kafka.source.Plc4xSourceTask;
 import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class Plc4xSourceConnector extends SourceConnector {
-    private static Logger log = LoggerFactory.getLogger(Plc4xSourceConnector.class);
+    public static final String TOPIC_CONFIG = "topic";
+    private static final String TOPIC_DOC = "Kafka topic to publish to";
 
-    private Map<String, String> configProperties;
+    public static final String URL_CONFIG = "url";
+    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
+
+    public static final String QUERY_CONFIG = "query";
+    private static final String QUERY_DOC = "Field query to be sent to the PLC";
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
+
+    private String topic;
+    private String url;
+    private String query;
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -43,27 +55,28 @@ public class Plc4xSourceConnector extends SourceConnector {
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        log.info("Setting task configurations for {} workers.", maxTasks);
+        Map<String, String> taskConfig = new HashMap<>();
+        taskConfig.put(TOPIC_CONFIG, topic);
+        taskConfig.put(URL_CONFIG, url);
+        taskConfig.put(QUERY_CONFIG, query);
 
         // Only one task will be created; ignoring maxTasks for now
-        final List<Map<String, String>> configs = Collections.singletonList(configProperties);
-
-        return configs;
+        return Collections.singletonList(taskConfig);
     }
 
     @Override
     public void start(Map<String, String> props) {
-        configProperties = props;
+        topic = props.get(TOPIC_CONFIG);
+        url = props.get(URL_CONFIG);
+        query = props.get(QUERY_CONFIG);
     }
 
     @Override
-    public void stop() {
-        // Nothing to do here ...
-    }
+    public void stop() {}
 
     @Override
     public ConfigDef config() {
-        return Plc4xSourceConfig.CONFIG_DEF;
+        return CONFIG_DEF;
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java
deleted file mode 100644
index 3904343..0000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.source;
-
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.plc4x.kafka.common.Plc4xConfig;
-
-import java.util.Map;
-
-
-public class Plc4xSourceConfig extends Plc4xConfig {
-
-    public static ConfigDef baseConfigDef() {
-        ConfigDef config = Plc4xConfig.baseConfigDef();
-        addPlcOptions(config);
-        return config;
-    }
-
-    private static final void addPlcOptions(ConfigDef config) {
-        // TODO: Add things needed here.
-    }
-
-    public static final ConfigDef CONFIG_DEF = baseConfigDef();
-
-    public Plc4xSourceConfig(ConfigDef config, Map<String, String> parsedConfig) {
-        super(config, parsedConfig);
-    }
-
-    public Plc4xSourceConfig(Map<String, String> parsedConfig) {
-        this(CONFIG_DEF, parsedConfig);
-    }
-
-}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index c008ce5..9c3014b 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -18,7 +18,6 @@ under the License.
 */
 package org.apache.plc4x.kafka.source;
 
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -29,28 +28,27 @@ import org.apache.plc4x.java.api.connection.PlcReader;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.kafka.Plc4xSourceConnector;
 import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class Plc4xSourceTask extends SourceTask {
+    private final static String FIELD_KEY = "key";
 
-    static final Logger log = LoggerFactory.getLogger(Plc4xSourceTask.class);
+    private String topic;
+    private String url;
+    private String query;
 
-    private Plc4xSourceConfig config;
+    private volatile boolean running = false;
     private PlcConnection plcConnection;
-    private PlcReader reader;
-    private PlcReadRequest readRequest;
-    private AtomicBoolean running = new AtomicBoolean(false);
-    private String topic;
-    private Schema keySchema = Schema.STRING_SCHEMA;
-    private Schema valueSchema;
-    private long offset = 0;
-    private final Map<Class<?>, Schema> typeSchemas = initTypeSchemas();
+    private PlcReader plcReader;
+    private PlcReadRequest plcRequest;
 
     @Override
     public String version() {
@@ -58,44 +56,30 @@ public class Plc4xSourceTask extends SourceTask {
     }
 
     @Override
-    public void start(Map<String, String> properties) {
-        try {
-            config = new Plc4xSourceConfig(properties);
-        } catch (ConfigException e) {
-            throw new ConnectException("Couldn't start Plc4xSourceTask due to configuration error", e);
-        }
-        final String url = config.getString(Plc4xSourceConfig.PLC_CONNECTION_STRING_CONFIG);
+    public void start(Map<String, String> props) {
+        topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
+        url = props.get(Plc4xSourceConnector.URL_CONFIG);
+        query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
 
         try {
             plcConnection = new PlcDriverManager().getConnection(url);
-            Optional<PlcReader> readerOptional = plcConnection.getReader();
-            if(!readerOptional.isPresent()) {
-                throw new ConnectException("PlcReader not available for this type of connection");
-            }
-            reader = readerOptional.get();
-            Class<?> dataType = config.getClass(Plc4xSourceConfig.PLC_DATATYPE_CONFIG);
-            String addressString = config.getString(Plc4xSourceConfig.PLC_ADDRESS);
-            readRequest = reader.readRequestBuilder().addItem("value", addressString).build();
-            topic = config.getString(Plc4xSourceConfig.PLC_TOPIC);
-            valueSchema = typeSchemas.get(dataType);
-            running.set(true);
+            plcConnection.connect();
         } catch (PlcConnectionException e) {
-            throw new ConnectException("Caught exception while connecting to PLC", e);
+            throw new ConnectException("Could not establish a PLC connection", e);
         }
-    }
 
-    private Map<Class<?>, Schema> initTypeSchemas() {
-        Map<Class<?>, Schema> map = new HashMap<>();
-        map.put(Boolean.class, Schema.BOOLEAN_SCHEMA);
-        map.put(Integer.class, Schema.INT32_SCHEMA);
-        // TODO add other
-        return map;
+        plcReader = plcConnection.getReader()
+            .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
+
+        plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
+
+        running = true;
     }
 
     @Override
     public void stop() {
-        if(plcConnection != null) {
-            running.set(false);
+        running = false;
+        if (plcConnection != null) {
             try {
                 plcConnection.close();
             } catch (Exception e) {
@@ -105,29 +89,45 @@ public class Plc4xSourceTask extends SourceTask {
     }
 
     @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        if((plcConnection != null) && plcConnection.isConnected() && (reader != null)) {
-            final List<SourceRecord> results = new LinkedList<>();
+    public List<SourceRecord> poll() {
+        if (!running)
+            return null;
 
-            try {
-                PlcReadResponse<?> plcReadResponse = reader.read(readRequest).get();
-                for (String fieldName : plcReadResponse.getFieldNames()) {
-                    for (int i = 0; i < plcReadResponse.getNumberOfValues(fieldName); i++) {
-                        Object value = plcReadResponse.getObject(fieldName, i);
-                        Map<String, String> sourcePartition = Collections.singletonMap("field-name", fieldName);
-                        Map<String, Long> sourceOffset = Collections.singletonMap("offset", offset);
-                        SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, keySchema, fieldName, valueSchema, value);
-                        results.add(record);
-                        offset++; // TODO: figure out how to track offsets
-                    }
-                }
-            } catch (ExecutionException e) {
-                log.error("Error reading values from PLC", e);
+        List<SourceRecord> result = new LinkedList<>();
+        try {
+            PlcReadResponse<?> response = plcReader.read(plcRequest).get();
+            if (response.getResponseCode(FIELD_KEY).equals(PlcResponseCode.OK)) {
+                Object rawValue = response.getObject(FIELD_KEY);
+                Schema valueSchema = getSchema(rawValue.getClass());
+                Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+                Long timestamp = System.currentTimeMillis();
+                Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+                Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+                SourceRecord record =
+                    new SourceRecord(
+                        sourcePartition,
+                        sourceOffset,
+                        topic,
+                        Schema.STRING_SCHEMA,
+                        query,
+                        valueSchema,
+                        value
+                    );
+
+                result.add(record);
             }
-
-            return results;
+            return result;
+        } catch (InterruptedException | ExecutionException e) {
+            return null;
         }
-        return null;
+    }
+
+    private Schema getSchema(Class<?> type) {
+        if (type.equals(Integer.class))
+            return Schema.INT32_SCHEMA;
+
+        return Schema.STRING_SCHEMA;
     }
 
 }
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java
index b94dee5..20488de 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java
@@ -25,7 +25,7 @@ public class VersionUtil {
     public static String getVersion() {
         try {
             return VersionUtil.class.getPackage().getImplementationVersion();
-        } catch (Exception ex) {
+        } catch (Exception ignored) {
             return "0.0.0.0";
         }
     }
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
index 2a969db..dbefc62 100644
--- a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
+++ b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
@@ -18,14 +18,13 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
-import org.apache.plc4x.kafka.source.Plc4xSourceConfig;
 import org.junit.Test;
 
 public class Plc4XSourceConfigTest {
 
-    @Test
-    public void doc() {
-        System.out.println(Plc4xSourceConfig.CONFIG_DEF.toRst());
-    }
+    //@Test
+    //public void doc() {
+    //    System.out.println(Plc4xSourceConfig.CONFIG_DEF.toRst());
+    //}
 
 }
\ No newline at end of file
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
index b1a6b2e..c8215dc 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
@@ -30,6 +30,9 @@ import java.util.regex.Pattern;
  */
 class TestField implements PlcField {
 
+    /**
+     * Example: {@code RANDOM/foo:INTEGER}
+     */
     private static final Pattern ADDRESS_PATTERN = Pattern.compile("^(?<type>\\w+)/(?<name>\\w+):(?<dataType>.+)(\\[(?<numElements>\\d)])?$");
 
     static boolean matches(String fieldString) {


[incubator-plc4x] 02/08: implemented throttling

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 1b0ca49c45bb91cebab09d820003ad8f89c4b040
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 11:50:00 2018 +0200

    implemented throttling
---
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   | 10 ++-
 .../apache/plc4x/kafka/source/Plc4xSourceTask.java | 81 ++++++++++++++--------
 2 files changed, 62 insertions(+), 29 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4885771..cc91267 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -39,14 +39,20 @@ public class Plc4xSourceConnector extends SourceConnector {
     public static final String QUERY_CONFIG = "query";
     private static final String QUERY_DOC = "Field query to be sent to the PLC";
 
+    public static final String RATE_CONFIG = "rate";
+    private static final Integer RATE_DEFAULT = 1000;
+    private static final String RATE_DOC = "Polling rate";
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
         .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
-        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
+        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC)
+        .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
 
     private String topic;
     private String url;
     private String query;
+    private Integer rate;
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -59,6 +65,7 @@ public class Plc4xSourceConnector extends SourceConnector {
         taskConfig.put(TOPIC_CONFIG, topic);
         taskConfig.put(URL_CONFIG, url);
         taskConfig.put(QUERY_CONFIG, query);
+        taskConfig.put(RATE_CONFIG, rate.toString());
 
         // Only one task will be created; ignoring maxTasks for now
         return Collections.singletonList(taskConfig);
@@ -69,6 +76,7 @@ public class Plc4xSourceConnector extends SourceConnector {
         topic = props.get(TOPIC_CONFIG);
         url = props.get(URL_CONFIG);
         query = props.get(QUERY_CONFIG);
+        rate = Integer.valueOf(props.get(RATE_CONFIG));
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index 9c3014b..7c373fe 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -36,7 +36,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.*;
 
 public class Plc4xSourceTask extends SourceTask {
     private final static String FIELD_KEY = "key";
@@ -44,12 +44,17 @@ public class Plc4xSourceTask extends SourceTask {
     private String topic;
     private String url;
     private String query;
+    private Integer rate;
 
     private volatile boolean running = false;
     private PlcConnection plcConnection;
     private PlcReader plcReader;
     private PlcReadRequest plcRequest;
 
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+    private ScheduledFuture<?> timer;
+    private boolean fetch = true;
+
     @Override
     public String version() {
         return VersionUtil.getVersion();
@@ -60,6 +65,7 @@ public class Plc4xSourceTask extends SourceTask {
         topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
         url = props.get(Plc4xSourceConnector.URL_CONFIG);
         query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+        rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
 
         try {
             plcConnection = new PlcDriverManager().getConnection(url);
@@ -73,12 +79,20 @@ public class Plc4xSourceTask extends SourceTask {
 
         plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
 
+        timer = scheduler.scheduleAtFixedRate(() -> {
+            synchronized (Plc4xSourceTask.this) {
+                Plc4xSourceTask.this.fetch = true;
+                notify();
+            }
+        }, 0, rate, TimeUnit.MILLISECONDS);
+
         running = true;
     }
 
     @Override
     public void stop() {
         running = false;
+        timer.cancel(true);
         if (plcConnection != null) {
             try {
                 plcConnection.close();
@@ -93,33 +107,44 @@ public class Plc4xSourceTask extends SourceTask {
         if (!running)
             return null;
 
-        List<SourceRecord> result = new LinkedList<>();
-        try {
-            PlcReadResponse<?> response = plcReader.read(plcRequest).get();
-            if (response.getResponseCode(FIELD_KEY).equals(PlcResponseCode.OK)) {
-                Object rawValue = response.getObject(FIELD_KEY);
-                Schema valueSchema = getSchema(rawValue.getClass());
-                Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
-                Long timestamp = System.currentTimeMillis();
-                Map<String, String> sourcePartition = Collections.singletonMap("url", url);
-                Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-                SourceRecord record =
-                    new SourceRecord(
-                        sourcePartition,
-                        sourceOffset,
-                        topic,
-                        Schema.STRING_SCHEMA,
-                        query,
-                        valueSchema,
-                        value
-                    );
-
-                result.add(record);
+        synchronized (this) {
+            while (!fetch) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    // continue
+                }
+            }
+            List<SourceRecord> result = new LinkedList<>();
+            try {
+                PlcReadResponse<?> response = plcReader.read(plcRequest).get();
+                if (response.getResponseCode(FIELD_KEY).equals(PlcResponseCode.OK)) {
+                    Object rawValue = response.getObject(FIELD_KEY);
+                    Schema valueSchema = getSchema(rawValue.getClass());
+                    Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+                    Long timestamp = System.currentTimeMillis();
+                    Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+                    Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+                    SourceRecord record =
+                        new SourceRecord(
+                            sourcePartition,
+                            sourceOffset,
+                            topic,
+                            Schema.STRING_SCHEMA,
+                            query,
+                            valueSchema,
+                            value
+                        );
+
+                    result.add(record);
+                }
+                return result;
+            } catch (InterruptedException | ExecutionException e) {
+                return null;
+            } finally {
+                fetch = false;
             }
-            return result;
-        } catch (InterruptedException | ExecutionException e) {
-            return null;
         }
     }
 
@@ -127,7 +152,7 @@ public class Plc4xSourceTask extends SourceTask {
         if (type.equals(Integer.class))
             return Schema.INT32_SCHEMA;
 
-        return Schema.STRING_SCHEMA;
+        return Schema.STRING_SCHEMA; // default schema
     }
 
 }
\ No newline at end of file


[incubator-plc4x] 05/08: added further connect schemas

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 334a4eb23b78c812b907569f9943a076c8283f41
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 14:57:09 2018 +0200

    added further connect schemas
---
 .../main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index 2c04256..6f73276 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -180,9 +180,18 @@ public class Plc4xSourceTask extends SourceTask {
     }
 
     private Schema getSchema(Class<?> type) {
+        if (type.equals(Byte.class))
+            return Schema.INT8_SCHEMA;
+
+        if (type.equals(Short.class))
+            return Schema.INT16_SCHEMA;
+
         if (type.equals(Integer.class))
             return Schema.INT32_SCHEMA;
 
+        if (type.equals(Long.class))
+            return Schema.INT64_SCHEMA;
+
         return Schema.STRING_SCHEMA; // default case; invoke .toString on value
     }
 


[incubator-plc4x] 06/08: implemented kafka sink connector

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 5ee9fa9c3e3dd1993c4ffbf4d67cf73549db599d
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 15:57:55 2018 +0200

    implemented kafka sink connector
---
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |  34 +++---
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 100 +++++++++++++++++
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |   9 +-
 .../plc4x/kafka/{source => }/Plc4xSourceTask.java  |   3 +-
 .../org/apache/plc4x/kafka/common/Plc4xConfig.java |  92 ----------------
 .../apache/plc4x/kafka/sink/Plc4xSinkConfig.java   |  49 ---------
 .../org/apache/plc4x/kafka/sink/Plc4xSinkTask.java | 119 ---------------------
 .../apache/plc4x/kafka/Plc4XSinkConfigTest.java    |  31 ------
 .../apache/plc4x/kafka/Plc4XSourceConfigTest.java  |  30 ------
 9 files changed, 121 insertions(+), 346 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 021b023..6646dac 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -21,20 +21,21 @@ package org.apache.plc4x.kafka;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.plc4x.kafka.sink.Plc4xSinkConfig;
-import org.apache.plc4x.kafka.sink.Plc4xSinkTask;
 import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class Plc4xSinkConnector extends SinkConnector {
-    private static Logger log = LoggerFactory.getLogger(Plc4xSinkConnector.class);
+    static final String URL_CONFIG = "url";
+    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    private Map<String, String> configProperties;
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
+
+    private String url;
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -43,27 +44,24 @@ public class Plc4xSinkConnector extends SinkConnector {
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        log.info("Setting task configurations for {} workers.", maxTasks);
-        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
-        for (int i = 0; i < maxTasks; ++i) {
-            configs.add(configProperties);
-        }
-        return configs;
+        Map<String, String> taskConfig = new HashMap<>();
+        taskConfig.put(URL_CONFIG, url);
+
+        // Only one task will be created; ignoring maxTasks for now
+        return Collections.singletonList(taskConfig);
     }
 
     @Override
     public void start(Map<String, String> props) {
-        configProperties = props;
+        url = props.get(URL_CONFIG);
     }
 
     @Override
-    public void stop() {
-        // Nothing to do here ...
-    }
+    public void stop() {}
 
     @Override
     public ConfigDef config() {
-        return Plc4xSinkConfig.CONFIG_DEF;
+        return CONFIG_DEF;
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
new file mode 100644
index 0000000..a27b08a
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.kafka.util.VersionUtil;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class Plc4xSinkTask extends SinkTask {
+    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
+
+    private String url;
+
+    private PlcConnection plcConnection;
+    private PlcWriter plcWriter;
+
+    @Override
+    public String version() {
+        return VersionUtil.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        url = props.get(Plc4xSinkConnector.URL_CONFIG);
+
+        openConnection();
+
+        plcWriter = plcConnection.getWriter()
+            .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
+    }
+
+    @Override
+    public void stop() {
+        closeConnection();
+    }
+
+    @Override
+    public void put(Collection<SinkRecord> records) {
+        for (SinkRecord record: records) {
+            String query = record.key().toString();
+            String value = record.value().toString(); // TODO: implement other data types
+            PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+            doWrite(plcRequest);
+        }
+    }
+
+    private void openConnection() {
+        try {
+            plcConnection = new PlcDriverManager().getConnection(url);
+            plcConnection.connect();
+        } catch (PlcConnectionException e) {
+            throw new ConnectException("Could not establish a PLC connection", e);
+        }
+    }
+
+    private void closeConnection() {
+        if (plcConnection != null) {
+            try {
+                plcConnection.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Caught exception while closing connection to PLC", e);
+            }
+        }
+    }
+
+    private void doWrite(PlcWriteRequest request) {
+        try {
+            plcWriter.write(request).get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new ConnectException("Caught exception during write", e);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index cc91267..4d1d9d0 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -21,7 +21,6 @@ package org.apache.plc4x.kafka;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.plc4x.kafka.source.Plc4xSourceTask;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
 import java.util.Collections;
@@ -30,16 +29,16 @@ import java.util.List;
 import java.util.Map;
 
 public class Plc4xSourceConnector extends SourceConnector {
-    public static final String TOPIC_CONFIG = "topic";
+    static final String TOPIC_CONFIG = "topic";
     private static final String TOPIC_DOC = "Kafka topic to publish to";
 
-    public static final String URL_CONFIG = "url";
+    static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    public static final String QUERY_CONFIG = "query";
+    static final String QUERY_CONFIG = "query";
     private static final String QUERY_DOC = "Field query to be sent to the PLC";
 
-    public static final String RATE_CONFIG = "rate";
+    static final String RATE_CONFIG = "rate";
     private static final Integer RATE_DEFAULT = 1000;
     private static final String RATE_DOC = "Polling rate";
 
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
similarity index 98%
rename from integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
rename to integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 6f73276..798ae31 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -16,7 +16,7 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.plc4x.kafka.source;
+package org.apache.plc4x.kafka;
 
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -29,7 +29,6 @@ import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.kafka.Plc4xSourceConnector;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
 import java.util.Collections;
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
deleted file mode 100644
index b906fab..0000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.common;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
-
-import java.util.Map;
-
-
-public class Plc4xConfig extends AbstractConfig {
-
-    public static final String PLC_CONNECTION_STRING_CONFIG = "my.setting";
-    public static final String PLC_CONNECTION_STRING_DISPLAY = "PLC Connection String";
-    public static final String PLC_CONNECTION_STRING_DOC = "Connection string used by PLC4X to connect to the PLC.";
-
-    public static final String PLC_TOPIC = "topic";
-    public static final String PLC_TOPIC_DOC = "Kafka topic to publish messages to.";
-
-    public static final String PLC_DATATYPE_CONFIG = "type";
-    public static final String PLC_DATATYPE_DOC = "Data type of values sent or received by PLC.";
-
-    public static final String PLC_ADDRESS = "address";
-    public static final String PLC_ADDRESS_DOC = "PLC address to sent to or receive data from.";
-
-    public static ConfigDef baseConfigDef() {
-        ConfigDef config = new ConfigDef();
-        addPlcOptions(config);
-        return config;
-    }
-
-    private static final void addPlcOptions(ConfigDef config) {
-        config.define(
-            PLC_CONNECTION_STRING_CONFIG,
-            Type.STRING,
-            Importance.HIGH,
-            PLC_CONNECTION_STRING_DOC)
-        .define(
-            PLC_DATATYPE_CONFIG,
-            Type.CLASS,
-            Importance.HIGH,
-            PLC_DATATYPE_DOC)
-        .define(
-            PLC_TOPIC,
-            Type.STRING,
-            Importance.HIGH,
-            PLC_TOPIC_DOC)
-        .define(
-            PLC_ADDRESS,
-            Type.STRING,
-            Importance.HIGH,
-            PLC_ADDRESS_DOC);
-    }
-
-    public static final ConfigDef CONFIG_DEF = baseConfigDef();
-
-    public Plc4xConfig(ConfigDef config, Map<String, String> parsedConfig) {
-        super(config, parsedConfig);
-        String plcConnectionString = getString(PLC_CONNECTION_STRING_CONFIG);
-        if (plcConnectionString == null) {
-            throw new ConfigException("'PLC Connection String' must be specified");
-        }
-    }
-
-    public Plc4xConfig(Map<String, String> parsedConfig) {
-        this(CONFIG_DEF, parsedConfig);
-    }
-
-    public String getPlcConnectionString() {
-        return this.getString(PLC_CONNECTION_STRING_CONFIG);
-    }
-
-}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java
deleted file mode 100644
index b85cb0a..0000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.sink;
-
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.plc4x.kafka.common.Plc4xConfig;
-
-import java.util.Map;
-
-
-public class Plc4xSinkConfig extends Plc4xConfig {
-
-    public static ConfigDef baseConfigDef() {
-        ConfigDef config = Plc4xConfig.baseConfigDef();
-        addPlcOptions(config);
-        return config;
-    }
-
-    private static final void addPlcOptions(ConfigDef config) {
-        // TODO: Add things needed here.
-    }
-
-    public static final ConfigDef CONFIG_DEF = baseConfigDef();
-
-    public Plc4xSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
-        super(config, parsedConfig);
-    }
-
-    public Plc4xSinkConfig(Map<String, String> parsedConfig) {
-        this(CONFIG_DEF, parsedConfig);
-    }
-
-}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java
deleted file mode 100644
index 12a746e..0000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.sink;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteResponse;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class Plc4xSinkTask extends SinkTask {
-
-    private static Logger log = LoggerFactory.getLogger(Plc4xSinkTask.class);
-
-    private Plc4xSinkConfig config;
-    private PlcConnection plcConnection;
-    private PlcWriter writer;
-    private AtomicBoolean running = new AtomicBoolean(false);
-
-    @Override
-    public String version() {
-        return VersionUtil.getVersion();
-    }
-
-    @Override
-    public void start(Map<String, String> properties) {
-        try {
-            config = new Plc4xSinkConfig(properties);
-        } catch (ConfigException e) {
-            throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
-        }
-        final String url = config.getString(Plc4xSinkConfig.PLC_CONNECTION_STRING_CONFIG);
-
-        try {
-            plcConnection = new PlcDriverManager().getConnection(url);
-            Optional<PlcWriter> writerOptional = plcConnection.getWriter();
-            if(!writerOptional.isPresent()) {
-                throw new ConnectException("PlcWriter not available for this type of connection");
-            }
-            writer = writerOptional.get();
-            running.set(true);
-        } catch (PlcConnectionException e) {
-            throw new ConnectException("Caught exception while connecting to PLC", e);
-        }
-    }
-
-    @Override
-    public void stop() {
-        if(plcConnection != null) {
-            running.set(false);
-            try {
-                plcConnection.close();
-            } catch (Exception e) {
-                throw new RuntimeException("Caught exception while closing connection to PLC", e);
-            }
-        }
-    }
-
-    @Override
-    public void put(Collection<SinkRecord> records) {
-        if((plcConnection != null) && plcConnection.isConnected() && (writer != null)) {
-            // Prepare the write request.
-            PlcWriteRequest.Builder builder = writer.writeRequestBuilder();
-            for (SinkRecord record : records) {
-                // TODO: Somehow get the payload from the kafka SinkRecord and create a writeRequestItem from that ...
-                // TODO: Replace this dummy with something real ...
-                Map<String, Object> value = new HashMap<>(); //(Map<String, String>) record.value()
-                String addressString = (String) value.get("address");
-                List<Byte> values = (List<Byte>) value.get("values");
-
-                builder.addItem(addressString, addressString, values.toArray(new Byte[0]));
-            }
-            PlcWriteRequest writeRequest = builder.build();
-
-            // Send the write request to the PLC.
-            try {
-                PlcWriteResponse<?> plcWriteResponse = writer.write(writeRequest).get();
-                for (String fieldName : plcWriteResponse.getFieldNames()) {
-                    if(plcWriteResponse.getResponseCode(fieldName) != PlcResponseCode.OK) {
-                        // TODO: Do Something if writing this particular item wasn't successful ...
-                        log.error("Error writing a value to PLC");
-                    }
-                }
-            } catch (ExecutionException | InterruptedException e) {
-                log.error("Error writing values to PLC", e);
-            }
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java
deleted file mode 100644
index b0ded1c..0000000
--- a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.plc4x.kafka.sink.Plc4xSinkConfig;
-import org.junit.Test;
-
-public class Plc4XSinkConfigTest {
-
-    @Test
-    public void doc() {
-        System.out.println(Plc4xSinkConfig.CONFIG_DEF.toRst());
-    }
-
-}
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
deleted file mode 100644
index dbefc62..0000000
--- a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.junit.Test;
-
-public class Plc4XSourceConfigTest {
-
-    //@Test
-    //public void doc() {
-    //    System.out.println(Plc4xSourceConfig.CONFIG_DEF.toRst());
-    //}
-
-}
\ No newline at end of file


[incubator-plc4x] 04/08: removed unnecessary code

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit d74d905c4b30c9de4ca49f8574da204b9b5e2395
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 14:22:56 2018 +0200

    removed unnecessary code
---
 .../src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java    | 2 --
 1 file changed, 2 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index c1d1c47..2c04256 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -111,8 +111,6 @@ public class Plc4xSourceTask extends SourceTask {
                 plcConnection.close();
             } catch (Exception e) {
                 throw new RuntimeException("Caught exception while closing connection to PLC", e);
-            } finally {
-                plcConnection = null;
             }
         }
     }


[incubator-plc4x] 03/08: refactoring

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 28a70be675efff73215de8f72d3dce764ea67a6a
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 14:16:32 2018 +0200

    refactoring
---
 .../apache/plc4x/kafka/source/Plc4xSourceTask.java | 163 +++++++++++++--------
 1 file changed, 98 insertions(+), 65 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
index 7c373fe..c1d1c47 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
@@ -33,25 +33,31 @@ import org.apache.plc4x.kafka.Plc4xSourceConnector;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
 
+/**
+ * Source Connector Task polling the data source at a given rate.
+ * A timer thread is scheduled which sets the fetch flag to true every rate milliseconds.
+ * When poll() is invoked, the calling thread waits until the fetch flag is set for WAIT_LIMIT_MILLIS.
+ * If the flag does not become true, the method returns null, otherwise a fetch is performed.
+ */
 public class Plc4xSourceTask extends SourceTask {
-    private final static String FIELD_KEY = "key";
+    private final static long WAIT_LIMIT_MILLIS = 100;
+    private final static long TIMEOUT_LIMIT_MILLIS = 5000;
+    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
 
     private String topic;
     private String url;
     private String query;
-    private Integer rate;
 
-    private volatile boolean running = false;
     private PlcConnection plcConnection;
     private PlcReader plcReader;
     private PlcReadRequest plcRequest;
 
-    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+    // TODO: should we use shared (static) thread pool for this?
+    private ScheduledExecutorService scheduler;
     private ScheduledFuture<?> timer;
     private boolean fetch = true;
 
@@ -65,94 +71,121 @@ public class Plc4xSourceTask extends SourceTask {
         topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
         url = props.get(Plc4xSourceConnector.URL_CONFIG);
         query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
-        rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
 
-        try {
-            plcConnection = new PlcDriverManager().getConnection(url);
-            plcConnection.connect();
-        } catch (PlcConnectionException e) {
-            throw new ConnectException("Could not establish a PLC connection", e);
-        }
+        openConnection();
 
         plcReader = plcConnection.getReader()
             .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
 
         plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
 
-        timer = scheduler.scheduleAtFixedRate(() -> {
-            synchronized (Plc4xSourceTask.this) {
-                Plc4xSourceTask.this.fetch = true;
-                notify();
-            }
-        }, 0, rate, TimeUnit.MILLISECONDS);
-
-        running = true;
+        int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
+        scheduler = Executors.newScheduledThreadPool(1);
+        timer = scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void stop() {
-        running = false;
         timer.cancel(true);
+        scheduler.shutdown();
+        closeConnection();
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        return awaitFetch(WAIT_LIMIT_MILLIS) ? doFetch() : null;
+    }
+
+    private void openConnection() {
+        try {
+            plcConnection = new PlcDriverManager().getConnection(url);
+            plcConnection.connect();
+        } catch (PlcConnectionException e) {
+            throw new ConnectException("Could not establish a PLC connection", e);
+        }
+    }
+
+    private void closeConnection() {
         if (plcConnection != null) {
             try {
                 plcConnection.close();
             } catch (Exception e) {
                 throw new RuntimeException("Caught exception while closing connection to PLC", e);
+            } finally {
+                plcConnection = null;
             }
         }
     }
 
-    @Override
-    public List<SourceRecord> poll() {
-        if (!running)
-            return null;
-
-        synchronized (this) {
-            while (!fetch) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    // continue
-                }
-            }
-            List<SourceRecord> result = new LinkedList<>();
-            try {
-                PlcReadResponse<?> response = plcReader.read(plcRequest).get();
-                if (response.getResponseCode(FIELD_KEY).equals(PlcResponseCode.OK)) {
-                    Object rawValue = response.getObject(FIELD_KEY);
-                    Schema valueSchema = getSchema(rawValue.getClass());
-                    Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
-                    Long timestamp = System.currentTimeMillis();
-                    Map<String, String> sourcePartition = Collections.singletonMap("url", url);
-                    Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-                    SourceRecord record =
-                        new SourceRecord(
-                            sourcePartition,
-                            sourceOffset,
-                            topic,
-                            Schema.STRING_SCHEMA,
-                            query,
-                            valueSchema,
-                            value
-                        );
-
-                    result.add(record);
-                }
-                return result;
-            } catch (InterruptedException | ExecutionException e) {
-                return null;
-            } finally {
-                fetch = false;
-            }
+    /**
+     * Schedule next fetch operation.
+     */
+    private synchronized void scheduleFetch() {
+        fetch = true;
+        notify();
+    }
+
+    /**
+     * Wait for next scheduled fetch operation.
+     * @param milliseconds maximum time to wait
+     * @throws InterruptedException if the thread is interrupted
+     * @return true if a fetch should be performed, false otherwise
+     */
+    private synchronized boolean awaitFetch(long milliseconds) throws InterruptedException {
+        if (!fetch) {
+            wait(milliseconds);
+        }
+        try {
+            return fetch;
+        } finally {
+            fetch = false;
         }
     }
 
+    private List<SourceRecord> doFetch() throws InterruptedException {
+        final CompletableFuture<PlcReadResponse<?>> response = plcReader.read(plcRequest);
+        try {
+            final PlcReadResponse<?> received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS);
+            return extractValues(received);
+        } catch (ExecutionException e) {
+            throw new ConnectException("Could not fetch data from source", e);
+        } catch (TimeoutException e) {
+            throw new ConnectException("Timed out waiting for data from source", e);
+        }
+    }
+
+    private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
+        final PlcResponseCode rc = response.getResponseCode(FIELD_KEY);
+
+        if (!rc.equals(PlcResponseCode.OK))
+            return null; // TODO: should we really ignore this?
+
+        Object rawValue = response.getObject(FIELD_KEY);
+        Schema valueSchema = getSchema(rawValue.getClass());
+        Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+        Long timestamp = System.currentTimeMillis();
+        Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+        Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+        SourceRecord record =
+            new SourceRecord(
+                sourcePartition,
+                sourceOffset,
+                topic,
+                Schema.STRING_SCHEMA,
+                query,
+                valueSchema,
+                value
+            );
+
+        return Collections.singletonList(record); // TODO: what if there are multiple values?
+    }
+
     private Schema getSchema(Class<?> type) {
         if (type.equals(Integer.class))
             return Schema.INT32_SCHEMA;
 
-        return Schema.STRING_SCHEMA; // default schema
+        return Schema.STRING_SCHEMA; // default case; invoke .toString on value
     }
 
 }
\ No newline at end of file


[incubator-plc4x] 07/08: implemented kafka sink connector

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch skorikov-feature/api-redesign-chris-c
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 955ad7432c1e47e5850e516fde01f4a9476ee3b4
Author: Andrey Skorikov <an...@codecentric.de>
AuthorDate: Mon Sep 10 16:26:54 2018 +0200

    implemented kafka sink connector
---
 .../src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java | 9 ++++++++-
 .../src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java      | 3 ++-
 plc4j/protocols/pom.xml                                          | 2 +-
 .../src/main/java/org/apache/plc4x/java/test/TestDevice.java     | 7 ++++++-
 .../test/src/main/java/org/apache/plc4x/java/test/TestField.java | 4 +++-
 .../test/src/main/java/org/apache/plc4x/java/test/TestType.java  | 3 ++-
 6 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 6646dac..45ae926 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -32,10 +32,15 @@ public class Plc4xSinkConnector extends SinkConnector {
     static final String URL_CONFIG = "url";
     private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
+    static final String QUERY_CONFIG = "query";
+    private static final String QUERY_DOC = "Field query to be sent to the PLC";
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
-        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC);
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
 
     private String url;
+    private String query;
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -46,6 +51,7 @@ public class Plc4xSinkConnector extends SinkConnector {
     public List<Map<String, String>> taskConfigs(int maxTasks) {
         Map<String, String> taskConfig = new HashMap<>();
         taskConfig.put(URL_CONFIG, url);
+        taskConfig.put(QUERY_CONFIG, query);
 
         // Only one task will be created; ignoring maxTasks for now
         return Collections.singletonList(taskConfig);
@@ -54,6 +60,7 @@ public class Plc4xSinkConnector extends SinkConnector {
     @Override
     public void start(Map<String, String> props) {
         url = props.get(URL_CONFIG);
+        query = props.get(QUERY_CONFIG);
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index a27b08a..b29418f 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -36,6 +36,7 @@ public class Plc4xSinkTask extends SinkTask {
     private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
 
     private String url;
+    private String query;
 
     private PlcConnection plcConnection;
     private PlcWriter plcWriter;
@@ -48,6 +49,7 @@ public class Plc4xSinkTask extends SinkTask {
     @Override
     public void start(Map<String, String> props) {
         url = props.get(Plc4xSinkConnector.URL_CONFIG);
+        query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
 
         openConnection();
 
@@ -63,7 +65,6 @@ public class Plc4xSinkTask extends SinkTask {
     @Override
     public void put(Collection<SinkRecord> records) {
         for (SinkRecord record: records) {
-            String query = record.key().toString();
             String value = record.value().toString(); // TODO: implement other data types
             PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
             doWrite(plcRequest);
diff --git a/plc4j/protocols/pom.xml b/plc4j/protocols/pom.xml
index ffb7d3e..734c3e6 100644
--- a/plc4j/protocols/pom.xml
+++ b/plc4j/protocols/pom.xml
@@ -46,4 +46,4 @@
     <module>benchmarks</module>
   </modules>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index c046610..65cce8a 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -43,6 +43,8 @@ class TestDevice {
                 return Optional.ofNullable(state.get(field));
             case RANDOM:
                 return Optional.of(randomValue(field.getDataType()));
+            case STDOUT:
+                return Optional.empty();
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
@@ -52,9 +54,12 @@ class TestDevice {
         switch(field.getType()) {
             case STATE:
                 state.put(field, value);
+                return;
+            case STDOUT:
+                System.out.printf("TEST PLC: %s%n", value.getObject(0).toString());
+                return;
             case RANDOM:
                 // Just ignore this ...
-                return;
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
index c8215dc..2488282 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
@@ -31,7 +31,9 @@ import java.util.regex.Pattern;
 class TestField implements PlcField {
 
     /**
-     * Example: {@code RANDOM/foo:INTEGER}
+     * Examples:
+     *  - {@code RANDOM/foo:INTEGER}
+     *  - {@code STDOUT/foo:STRING}
      */
     private static final Pattern ADDRESS_PATTERN = Pattern.compile("^(?<type>\\w+)/(?<name>\\w+):(?<dataType>.+)(\\[(?<numElements>\\d)])?$");
 
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
index 6483780..6654bb0 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
@@ -22,6 +22,7 @@ package org.apache.plc4x.java.test;
 public enum TestType {
 
     RANDOM,
-    STATE
+    STATE,
+    STDOUT
 
 }