You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@plc4x.apache.org by GitBox <gi...@apache.org> on 2018/09/10 16:20:53 UTC

[GitHub] chrisdutz closed pull request #16: Implemented throttling in Kafka Source Connector

chrisdutz closed pull request #16: Implemented throttling in Kafka Source Connector
URL: https://github.com/apache/incubator-plc4x/pull/16
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 021b0233f..45ae926eb 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -21,20 +21,26 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.plc4x.kafka.sink.Plc4xSinkConfig;
-import org.apache.plc4x.kafka.sink.Plc4xSinkTask;
 import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class Plc4xSinkConnector extends SinkConnector {
-    private static Logger log = LoggerFactory.getLogger(Plc4xSinkConnector.class);
+    static final String URL_CONFIG = "url";
+    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
 
-    private Map<String, String> configProperties;
+    static final String QUERY_CONFIG = "query";
+    private static final String QUERY_DOC = "Field query to be sent to the PLC";
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC);
+
+    private String url;
+    private String query;
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -43,27 +49,26 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        log.info("Setting task configurations for {} workers.", maxTasks);
-        final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
-        for (int i = 0; i < maxTasks; ++i) {
-            configs.add(configProperties);
-        }
-        return configs;
+        Map<String, String> taskConfig = new HashMap<>();
+        taskConfig.put(URL_CONFIG, url);
+        taskConfig.put(QUERY_CONFIG, query);
+
+        // Only one task will be created; ignoring maxTasks for now
+        return Collections.singletonList(taskConfig);
     }
 
     @Override
     public void start(Map<String, String> props) {
-        configProperties = props;
+        url = props.get(URL_CONFIG);
+        query = props.get(QUERY_CONFIG);
     }
 
     @Override
-    public void stop() {
-        // Nothing to do here ...
-    }
+    public void stop() {}
 
     @Override
     public ConfigDef config() {
-        return Plc4xSinkConfig.CONFIG_DEF;
+        return CONFIG_DEF;
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
new file mode 100644
index 000000000..b29418f18
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -0,0 +1,101 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.connection.PlcWriter;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.kafka.util.VersionUtil;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+public class Plc4xSinkTask extends SinkTask {
+    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
+
+    private String url;
+    private String query;
+
+    private PlcConnection plcConnection;
+    private PlcWriter plcWriter;
+
+    @Override
+    public String version() {
+        return VersionUtil.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        url = props.get(Plc4xSinkConnector.URL_CONFIG);
+        query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
+
+        openConnection();
+
+        plcWriter = plcConnection.getWriter()
+            .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
+    }
+
+    @Override
+    public void stop() {
+        closeConnection();
+    }
+
+    @Override
+    public void put(Collection<SinkRecord> records) {
+        for (SinkRecord record: records) {
+            String value = record.value().toString(); // TODO: implement other data types
+            PlcWriteRequest plcRequest = plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+            doWrite(plcRequest);
+        }
+    }
+
+    private void openConnection() {
+        try {
+            plcConnection = new PlcDriverManager().getConnection(url);
+            plcConnection.connect();
+        } catch (PlcConnectionException e) {
+            throw new ConnectException("Could not establish a PLC connection", e);
+        }
+    }
+
+    private void closeConnection() {
+        if (plcConnection != null) {
+            try {
+                plcConnection.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Caught exception while closing connection to PLC", e);
+            }
+        }
+    }
+
+    private void doWrite(PlcWriteRequest request) {
+        try {
+            plcWriter.write(request).get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new ConnectException("Caught exception during write", e);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4de195b44..4d1d9d026 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -21,20 +21,37 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.plc4x.kafka.source.Plc4xSourceConfig;
-import org.apache.plc4x.kafka.source.Plc4xSourceTask;
 import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class Plc4xSourceConnector extends SourceConnector {
-    private static Logger log = LoggerFactory.getLogger(Plc4xSourceConnector.class);
+    static final String TOPIC_CONFIG = "topic";
+    private static final String TOPIC_DOC = "Kafka topic to publish to";
 
-    private Map<String, String> configProperties;
+    static final String URL_CONFIG = "url";
+    private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";
+
+    static final String QUERY_CONFIG = "query";
+    private static final String QUERY_DOC = "Field query to be sent to the PLC";
+
+    static final String RATE_CONFIG = "rate";
+    private static final Integer RATE_DEFAULT = 1000;
+    private static final String RATE_DOC = "Polling rate";
+
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
+        .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
+        .define(QUERY_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, QUERY_DOC)
+        .define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);
+
+    private String topic;
+    private String url;
+    private String query;
+    private Integer rate;
 
     @Override
     public Class<? extends Task> taskClass() {
@@ -43,27 +60,30 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        log.info("Setting task configurations for {} workers.", maxTasks);
+        Map<String, String> taskConfig = new HashMap<>();
+        taskConfig.put(TOPIC_CONFIG, topic);
+        taskConfig.put(URL_CONFIG, url);
+        taskConfig.put(QUERY_CONFIG, query);
+        taskConfig.put(RATE_CONFIG, rate.toString());
 
         // Only one task will be created; ignoring maxTasks for now
-        final List<Map<String, String>> configs = Collections.singletonList(configProperties);
-
-        return configs;
+        return Collections.singletonList(taskConfig);
     }
 
     @Override
     public void start(Map<String, String> props) {
-        configProperties = props;
+        topic = props.get(TOPIC_CONFIG);
+        url = props.get(URL_CONFIG);
+        query = props.get(QUERY_CONFIG);
+        rate = Integer.valueOf(props.get(RATE_CONFIG));
     }
 
     @Override
-    public void stop() {
-        // Nothing to do here ...
-    }
+    public void stop() {}
 
     @Override
     public ConfigDef config() {
-        return Plc4xSourceConfig.CONFIG_DEF;
+        return CONFIG_DEF;
     }
 
     @Override
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
new file mode 100644
index 000000000..798ae3113
--- /dev/null
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -0,0 +1,197 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.kafka;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.connection.PlcConnection;
+import org.apache.plc4x.java.api.connection.PlcReader;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.kafka.util.VersionUtil;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ * Source Connector Task polling the data source at a given rate.
+ * A timer thread is scheduled which sets the fetch flag to true every rate milliseconds.
+ * When poll() is invoked, the calling thread waits until the fetch flag is set for WAIT_LIMIT_MILLIS.
+ * If the flag does not become true, the method returns null, otherwise a fetch is performed.
+ */
+public class Plc4xSourceTask extends SourceTask {
+    private final static long WAIT_LIMIT_MILLIS = 100;
+    private final static long TIMEOUT_LIMIT_MILLIS = 5000;
+    private final static String FIELD_KEY = "key"; // TODO: is this really necessary?
+
+    private String topic;
+    private String url;
+    private String query;
+
+    private PlcConnection plcConnection;
+    private PlcReader plcReader;
+    private PlcReadRequest plcRequest;
+
+    // TODO: should we use shared (static) thread pool for this?
+    private ScheduledExecutorService scheduler;
+    private ScheduledFuture<?> timer;
+    private boolean fetch = true;
+
+    @Override
+    public String version() {
+        return VersionUtil.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        topic = props.get(Plc4xSourceConnector.TOPIC_CONFIG);
+        url = props.get(Plc4xSourceConnector.URL_CONFIG);
+        query = props.get(Plc4xSourceConnector.QUERY_CONFIG);
+
+        openConnection();
+
+        plcReader = plcConnection.getReader()
+            .orElseThrow(() -> new ConnectException("PlcReader not available for this type of connection"));
+
+        plcRequest = plcReader.readRequestBuilder().addItem(FIELD_KEY, query).build();
+
+        int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
+        scheduler = Executors.newScheduledThreadPool(1);
+        timer = scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void stop() {
+        timer.cancel(true);
+        scheduler.shutdown();
+        closeConnection();
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        return awaitFetch(WAIT_LIMIT_MILLIS) ? doFetch() : null;
+    }
+
+    private void openConnection() {
+        try {
+            plcConnection = new PlcDriverManager().getConnection(url);
+            plcConnection.connect();
+        } catch (PlcConnectionException e) {
+            throw new ConnectException("Could not establish a PLC connection", e);
+        }
+    }
+
+    private void closeConnection() {
+        if (plcConnection != null) {
+            try {
+                plcConnection.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Caught exception while closing connection to PLC", e);
+            }
+        }
+    }
+
+    /**
+     * Schedule next fetch operation.
+     */
+    private synchronized void scheduleFetch() {
+        fetch = true;
+        notify();
+    }
+
+    /**
+     * Wait for next scheduled fetch operation.
+     * @param milliseconds maximum time to wait
+     * @throws InterruptedException if the thread is interrupted
+     * @return true if a fetch should be performed, false otherwise
+     */
+    private synchronized boolean awaitFetch(long milliseconds) throws InterruptedException {
+        if (!fetch) {
+            wait(milliseconds);
+        }
+        try {
+            return fetch;
+        } finally {
+            fetch = false;
+        }
+    }
+
+    private List<SourceRecord> doFetch() throws InterruptedException {
+        final CompletableFuture<PlcReadResponse<?>> response = plcReader.read(plcRequest);
+        try {
+            final PlcReadResponse<?> received = response.get(TIMEOUT_LIMIT_MILLIS, TimeUnit.MILLISECONDS);
+            return extractValues(received);
+        } catch (ExecutionException e) {
+            throw new ConnectException("Could not fetch data from source", e);
+        } catch (TimeoutException e) {
+            throw new ConnectException("Timed out waiting for data from source", e);
+        }
+    }
+
+    private List<SourceRecord> extractValues(PlcReadResponse<?> response) {
+        final PlcResponseCode rc = response.getResponseCode(FIELD_KEY);
+
+        if (!rc.equals(PlcResponseCode.OK))
+            return null; // TODO: should we really ignore this?
+
+        Object rawValue = response.getObject(FIELD_KEY);
+        Schema valueSchema = getSchema(rawValue.getClass());
+        Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue;
+        Long timestamp = System.currentTimeMillis();
+        Map<String, String> sourcePartition = Collections.singletonMap("url", url);
+        Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+        SourceRecord record =
+            new SourceRecord(
+                sourcePartition,
+                sourceOffset,
+                topic,
+                Schema.STRING_SCHEMA,
+                query,
+                valueSchema,
+                value
+            );
+
+        return Collections.singletonList(record); // TODO: what if there are multiple values?
+    }
+
+    private Schema getSchema(Class<?> type) {
+        if (type.equals(Byte.class))
+            return Schema.INT8_SCHEMA;
+
+        if (type.equals(Short.class))
+            return Schema.INT16_SCHEMA;
+
+        if (type.equals(Integer.class))
+            return Schema.INT32_SCHEMA;
+
+        if (type.equals(Long.class))
+            return Schema.INT64_SCHEMA;
+
+        return Schema.STRING_SCHEMA; // default case; invoke .toString on value
+    }
+
+}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
deleted file mode 100644
index b906fab82..000000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/common/Plc4xConfig.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.common;
-
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
-
-import java.util.Map;
-
-
-public class Plc4xConfig extends AbstractConfig {
-
-    public static final String PLC_CONNECTION_STRING_CONFIG = "my.setting";
-    public static final String PLC_CONNECTION_STRING_DISPLAY = "PLC Connection String";
-    public static final String PLC_CONNECTION_STRING_DOC = "Connection string used by PLC4X to connect to the PLC.";
-
-    public static final String PLC_TOPIC = "topic";
-    public static final String PLC_TOPIC_DOC = "Kafka topic to publish messages to.";
-
-    public static final String PLC_DATATYPE_CONFIG = "type";
-    public static final String PLC_DATATYPE_DOC = "Data type of values sent or received by PLC.";
-
-    public static final String PLC_ADDRESS = "address";
-    public static final String PLC_ADDRESS_DOC = "PLC address to sent to or receive data from.";
-
-    public static ConfigDef baseConfigDef() {
-        ConfigDef config = new ConfigDef();
-        addPlcOptions(config);
-        return config;
-    }
-
-    private static final void addPlcOptions(ConfigDef config) {
-        config.define(
-            PLC_CONNECTION_STRING_CONFIG,
-            Type.STRING,
-            Importance.HIGH,
-            PLC_CONNECTION_STRING_DOC)
-        .define(
-            PLC_DATATYPE_CONFIG,
-            Type.CLASS,
-            Importance.HIGH,
-            PLC_DATATYPE_DOC)
-        .define(
-            PLC_TOPIC,
-            Type.STRING,
-            Importance.HIGH,
-            PLC_TOPIC_DOC)
-        .define(
-            PLC_ADDRESS,
-            Type.STRING,
-            Importance.HIGH,
-            PLC_ADDRESS_DOC);
-    }
-
-    public static final ConfigDef CONFIG_DEF = baseConfigDef();
-
-    public Plc4xConfig(ConfigDef config, Map<String, String> parsedConfig) {
-        super(config, parsedConfig);
-        String plcConnectionString = getString(PLC_CONNECTION_STRING_CONFIG);
-        if (plcConnectionString == null) {
-            throw new ConfigException("'PLC Connection String' must be specified");
-        }
-    }
-
-    public Plc4xConfig(Map<String, String> parsedConfig) {
-        this(CONFIG_DEF, parsedConfig);
-    }
-
-    public String getPlcConnectionString() {
-        return this.getString(PLC_CONNECTION_STRING_CONFIG);
-    }
-
-}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java
deleted file mode 100644
index b85cb0a8e..000000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkConfig.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.sink;
-
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.plc4x.kafka.common.Plc4xConfig;
-
-import java.util.Map;
-
-
-public class Plc4xSinkConfig extends Plc4xConfig {
-
-    public static ConfigDef baseConfigDef() {
-        ConfigDef config = Plc4xConfig.baseConfigDef();
-        addPlcOptions(config);
-        return config;
-    }
-
-    private static final void addPlcOptions(ConfigDef config) {
-        // TODO: Add things needed here.
-    }
-
-    public static final ConfigDef CONFIG_DEF = baseConfigDef();
-
-    public Plc4xSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
-        super(config, parsedConfig);
-    }
-
-    public Plc4xSinkConfig(Map<String, String> parsedConfig) {
-        this(CONFIG_DEF, parsedConfig);
-    }
-
-}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java
deleted file mode 100644
index 12a746e43..000000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/sink/Plc4xSinkTask.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.sink;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcWriter;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.messages.PlcWriteRequest;
-import org.apache.plc4x.java.api.messages.PlcWriteResponse;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class Plc4xSinkTask extends SinkTask {
-
-    private static Logger log = LoggerFactory.getLogger(Plc4xSinkTask.class);
-
-    private Plc4xSinkConfig config;
-    private PlcConnection plcConnection;
-    private PlcWriter writer;
-    private AtomicBoolean running = new AtomicBoolean(false);
-
-    @Override
-    public String version() {
-        return VersionUtil.getVersion();
-    }
-
-    @Override
-    public void start(Map<String, String> properties) {
-        try {
-            config = new Plc4xSinkConfig(properties);
-        } catch (ConfigException e) {
-            throw new ConnectException("Couldn't start JdbcSourceTask due to configuration error", e);
-        }
-        final String url = config.getString(Plc4xSinkConfig.PLC_CONNECTION_STRING_CONFIG);
-
-        try {
-            plcConnection = new PlcDriverManager().getConnection(url);
-            Optional<PlcWriter> writerOptional = plcConnection.getWriter();
-            if(!writerOptional.isPresent()) {
-                throw new ConnectException("PlcWriter not available for this type of connection");
-            }
-            writer = writerOptional.get();
-            running.set(true);
-        } catch (PlcConnectionException e) {
-            throw new ConnectException("Caught exception while connecting to PLC", e);
-        }
-    }
-
-    @Override
-    public void stop() {
-        if(plcConnection != null) {
-            running.set(false);
-            try {
-                plcConnection.close();
-            } catch (Exception e) {
-                throw new RuntimeException("Caught exception while closing connection to PLC", e);
-            }
-        }
-    }
-
-    @Override
-    public void put(Collection<SinkRecord> records) {
-        if((plcConnection != null) && plcConnection.isConnected() && (writer != null)) {
-            // Prepare the write request.
-            PlcWriteRequest.Builder builder = writer.writeRequestBuilder();
-            for (SinkRecord record : records) {
-                // TODO: Somehow get the payload from the kafka SinkRecord and create a writeRequestItem from that ...
-                // TODO: Replace this dummy with something real ...
-                Map<String, Object> value = new HashMap<>(); //(Map<String, String>) record.value()
-                String addressString = (String) value.get("address");
-                List<Byte> values = (List<Byte>) value.get("values");
-
-                builder.addItem(addressString, addressString, values.toArray(new Byte[0]));
-            }
-            PlcWriteRequest writeRequest = builder.build();
-
-            // Send the write request to the PLC.
-            try {
-                PlcWriteResponse<?> plcWriteResponse = writer.write(writeRequest).get();
-                for (String fieldName : plcWriteResponse.getFieldNames()) {
-                    if(plcWriteResponse.getResponseCode(fieldName) != PlcResponseCode.OK) {
-                        // TODO: Do Something if writing this particular item wasn't successful ...
-                        log.error("Error writing a value to PLC");
-                    }
-                }
-            } catch (ExecutionException | InterruptedException e) {
-                log.error("Error writing values to PLC", e);
-            }
-        }
-    }
-
-}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java
deleted file mode 100644
index 39043430f..000000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceConfig.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.source;
-
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.plc4x.kafka.common.Plc4xConfig;
-
-import java.util.Map;
-
-
-public class Plc4xSourceConfig extends Plc4xConfig {
-
-    public static ConfigDef baseConfigDef() {
-        ConfigDef config = Plc4xConfig.baseConfigDef();
-        addPlcOptions(config);
-        return config;
-    }
-
-    private static final void addPlcOptions(ConfigDef config) {
-        // TODO: Add things needed here.
-    }
-
-    public static final ConfigDef CONFIG_DEF = baseConfigDef();
-
-    public Plc4xSourceConfig(ConfigDef config, Map<String, String> parsedConfig) {
-        super(config, parsedConfig);
-    }
-
-    public Plc4xSourceConfig(Map<String, String> parsedConfig) {
-        this(CONFIG_DEF, parsedConfig);
-    }
-
-}
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
deleted file mode 100644
index c008ce579..000000000
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka.source;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.source.SourceTask;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.connection.PlcConnection;
-import org.apache.plc4x.java.api.connection.PlcReader;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.kafka.util.VersionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class Plc4xSourceTask extends SourceTask {
-
-    static final Logger log = LoggerFactory.getLogger(Plc4xSourceTask.class);
-
-    private Plc4xSourceConfig config;
-    private PlcConnection plcConnection;
-    private PlcReader reader;
-    private PlcReadRequest readRequest;
-    private AtomicBoolean running = new AtomicBoolean(false);
-    private String topic;
-    private Schema keySchema = Schema.STRING_SCHEMA;
-    private Schema valueSchema;
-    private long offset = 0;
-    private final Map<Class<?>, Schema> typeSchemas = initTypeSchemas();
-
-    @Override
-    public String version() {
-        return VersionUtil.getVersion();
-    }
-
-    @Override
-    public void start(Map<String, String> properties) {
-        try {
-            config = new Plc4xSourceConfig(properties);
-        } catch (ConfigException e) {
-            throw new ConnectException("Couldn't start Plc4xSourceTask due to configuration error", e);
-        }
-        final String url = config.getString(Plc4xSourceConfig.PLC_CONNECTION_STRING_CONFIG);
-
-        try {
-            plcConnection = new PlcDriverManager().getConnection(url);
-            Optional<PlcReader> readerOptional = plcConnection.getReader();
-            if(!readerOptional.isPresent()) {
-                throw new ConnectException("PlcReader not available for this type of connection");
-            }
-            reader = readerOptional.get();
-            Class<?> dataType = config.getClass(Plc4xSourceConfig.PLC_DATATYPE_CONFIG);
-            String addressString = config.getString(Plc4xSourceConfig.PLC_ADDRESS);
-            readRequest = reader.readRequestBuilder().addItem("value", addressString).build();
-            topic = config.getString(Plc4xSourceConfig.PLC_TOPIC);
-            valueSchema = typeSchemas.get(dataType);
-            running.set(true);
-        } catch (PlcConnectionException e) {
-            throw new ConnectException("Caught exception while connecting to PLC", e);
-        }
-    }
-
-    private Map<Class<?>, Schema> initTypeSchemas() {
-        Map<Class<?>, Schema> map = new HashMap<>();
-        map.put(Boolean.class, Schema.BOOLEAN_SCHEMA);
-        map.put(Integer.class, Schema.INT32_SCHEMA);
-        // TODO add other
-        return map;
-    }
-
-    @Override
-    public void stop() {
-        if(plcConnection != null) {
-            running.set(false);
-            try {
-                plcConnection.close();
-            } catch (Exception e) {
-                throw new RuntimeException("Caught exception while closing connection to PLC", e);
-            }
-        }
-    }
-
-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        if((plcConnection != null) && plcConnection.isConnected() && (reader != null)) {
-            final List<SourceRecord> results = new LinkedList<>();
-
-            try {
-                PlcReadResponse<?> plcReadResponse = reader.read(readRequest).get();
-                for (String fieldName : plcReadResponse.getFieldNames()) {
-                    for (int i = 0; i < plcReadResponse.getNumberOfValues(fieldName); i++) {
-                        Object value = plcReadResponse.getObject(fieldName, i);
-                        Map<String, String> sourcePartition = Collections.singletonMap("field-name", fieldName);
-                        Map<String, Long> sourceOffset = Collections.singletonMap("offset", offset);
-                        SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, keySchema, fieldName, valueSchema, value);
-                        results.add(record);
-                        offset++; // TODO: figure out how to track offsets
-                    }
-                }
-            } catch (ExecutionException e) {
-                log.error("Error reading values from PLC", e);
-            }
-
-            return results;
-        }
-        return null;
-    }
-
-}
\ No newline at end of file
diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java
index b94dee595..20488de97 100644
--- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java
+++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/util/VersionUtil.java
@@ -25,7 +25,7 @@ Licensed to the Apache Software Foundation (ASF) under one
     public static String getVersion() {
         try {
             return VersionUtil.class.getPackage().getImplementationVersion();
-        } catch (Exception ex) {
+        } catch (Exception ignored) {
             return "0.0.0.0";
         }
     }
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java
deleted file mode 100644
index b0ded1c0d..000000000
--- a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSinkConfigTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.plc4x.kafka.sink.Plc4xSinkConfig;
-import org.junit.Test;
-
-public class Plc4XSinkConfigTest {
-
-    @Test
-    public void doc() {
-        System.out.println(Plc4xSinkConfig.CONFIG_DEF.toRst());
-    }
-
-}
diff --git a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java b/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
deleted file mode 100644
index 2a969db77..000000000
--- a/integrations/apache-kafka/src/test/java/org/apache/plc4x/kafka/Plc4XSourceConfigTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.plc4x.kafka;
-
-import org.apache.plc4x.kafka.source.Plc4xSourceConfig;
-import org.junit.Test;
-
-public class Plc4XSourceConfigTest {
-
-    @Test
-    public void doc() {
-        System.out.println(Plc4xSourceConfig.CONFIG_DEF.toRst());
-    }
-
-}
\ No newline at end of file
diff --git a/plc4j/protocols/pom.xml b/plc4j/protocols/pom.xml
index ffb7d3ef1..734c3e636 100644
--- a/plc4j/protocols/pom.xml
+++ b/plc4j/protocols/pom.xml
@@ -46,4 +46,4 @@
     <module>benchmarks</module>
   </modules>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
index c046610b6..65cce8aed 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java
@@ -43,6 +43,8 @@ Licensed to the Apache Software Foundation (ASF) under one
                 return Optional.ofNullable(state.get(field));
             case RANDOM:
                 return Optional.of(randomValue(field.getDataType()));
+            case STDOUT:
+                return Optional.empty();
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
@@ -52,9 +54,12 @@ void set(TestField field, FieldItem value) {
         switch(field.getType()) {
             case STATE:
                 state.put(field, value);
+                return;
+            case STDOUT:
+                System.out.printf("TEST PLC: %s%n", value.getObject(0).toString());
+                return;
             case RANDOM:
                 // Just ignore this ...
-                return;
         }
         throw new IllegalArgumentException("Unsupported field type: " + field.getType().name());
     }
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
index b1a6b2e8b..2488282c7 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestField.java
@@ -30,6 +30,11 @@ Licensed to the Apache Software Foundation (ASF) under one
  */
 class TestField implements PlcField {
 
+    /**
+     * Examples:
+     *  - {@code RANDOM/foo:INTEGER}
+     *  - {@code STDOUT/foo:STRING}
+     */
     private static final Pattern ADDRESS_PATTERN = Pattern.compile("^(?<type>\\w+)/(?<name>\\w+):(?<dataType>.+)(\\[(?<numElements>\\d)])?$");
 
     static boolean matches(String fieldString) {
diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
index 6483780fc..6654bb0b4 100644
--- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
+++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestType.java
@@ -22,6 +22,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 public enum TestType {
 
     RANDOM,
-    STATE
+    STATE,
+    STDOUT
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services