You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2020/11/05 11:39:54 UTC

[plc4x] branch feature/kafkasink updated: Removed request connectionstring, added expires field

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch feature/kafkasink
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/feature/kafkasink by this push:
     new 4dc7a5a  Removed request connectionstring, added expires field
4dc7a5a is described below

commit 4dc7a5acb1aa3d92e99d572032315a284f56747c
Author: hutcheb <be...@gmail.com>
AuthorDate: Thu Nov 5 06:39:35 2020 -0500

    Removed request connectionstring, added expires field
    
    - Add an expires field which allows us to discard a write request if it
    is too old.
    
    - removed the connection string from the request, it is already defined
    in the config file.
    
    - Added handling/non handling of errors so that requests are discarded
    if an error occurs.
    
    - Added sample sink config file, need to fix topics field.
---
 .../apache-kafka/config/plc4x-sink.properties      | 36 +++++++++++++++
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |  1 -
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 53 +++++++++++++---------
 3 files changed, 67 insertions(+), 23 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/config/plc4x-sink.properties b/plc4j/integrations/apache-kafka/config/plc4x-sink.properties
new file mode 100644
index 0000000..83ff4d2
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/config/plc4x-sink.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+name=plc-1
+connector.class=org.apache.plc4x.kafka.Plc4xSinkConnector
+default-topic=machineData
+topics=machineData
+tasks.max=3
+
+sinks=machineA
+sinks.machineA.connectionString=modbus://localhost
+sinks.machineA.topc=machineData
+
+bootstrap.servers=127.0.0.1:9092
+key.converter=org.apache.kafka.connect.json.JsonConverter
+value.converter=org.apache.kafka.connect.json.JsonConverter
+key.converter.schemas.enable=true
+value.converter.schemas.enable=true
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+plugin.path=/usr/local/share/kafka/plugins
+errors.log.enable = true
+errors.tolerance=all
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 253843a..8a83552 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -121,7 +121,6 @@ public class Plc4xSinkConnector extends SinkConnector {
                     sinkConnectionStringConfigValue.addErrorMessage(connectionStringConfig + " is mandatory");
                 } else {
                     // TODO: Check if the connection string is valid.
-
                     String sinkTopicConfig = SINK_CONFIG + "." + sinkName + "." + TOPIC_CONFIG;
                     final ConfigValue sinkTopicConfigValue = new ConfigValue(sinkTopicConfig);
                     config.configValues().add(sinkTopicConfigValue);
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index f63b37e..7ea7985 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -81,12 +81,13 @@ public class Plc4xSinkTask extends SinkTask {
 
     private PlcDriverManager driverManager;
     private Transformation<SinkRecord> transformation;
+    private String plc4xConnectionString;
 
     @Override
     public void start(Map<String, String> props) {
         AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
         String connectionName = config.getString(CONNECTION_NAME_CONFIG);
-        String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
+        plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
         Map<String, String> topics = new HashMap<>();
         log.info("Creating Pooled PLC4x driver manager");
         driverManager = new PooledPlcDriverManager();
@@ -104,47 +105,55 @@ public class Plc4xSinkTask extends SinkTask {
         if (records.isEmpty()) {
             return;
         }
-        log.info(records.toString());
-        //ObjectMapper mapper = new ObjectMapper();
 
         for (SinkRecord r: records) {
             Struct record = (Struct) r.value();
-            String connectionString = (String) record.get("connectionString");
-            String address = (String) record.get("address");
-            String value = (String) record.get("value");
+            String address = record.getString("address");
+            String value = record.getString("value");
+            Long expires = record.getInt64("expires");
+
+            if ((System.currentTimeMillis() > expires) & !(expires == 0)) {
+                log.warn("Write request has expired {}, discarding {}", System.currentTimeMillis(), address);
+                return;
+            }
 
             PlcConnection connection = null;
             try {
-                connection = driverManager.getConnection(connectionString);
-            } catch (PlcConnectionException e) {
-                log.info("Failed to Open Connection {}" + connectionString);
+                connection = driverManager.getConnection(plc4xConnectionString);
+            } catch (PlcConnectionException e) {                
+                log.warn("Failed to Open Connection {}", plc4xConnectionString);
             }
 
             final PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
-
-            //If an array value is passed instead of a single value then convert to a String array
-            if ((value.charAt(0) == '[') && (value.charAt(value.length() - 1) == ']')) {
-                String[] values = value.substring(1,value.length() - 1).split(",");
-                log.info("Adding Tag " + Arrays.toString(values));
-                builder.addItem(address, address, values);
-            } else {
-                builder.addItem(address, address, value);
+            PlcWriteRequest writeRequest;
+            try {
+                //If an array value is passed instead of a single value then convert to a String array
+                if ((value.charAt(0) == '[') && (value.charAt(value.length() - 1) == ']')) {
+                    String[] values = value.substring(1,value.length() - 1).split(",");
+                    builder.addItem(address, address, values);
+                } else {
+                    builder.addItem(address, address, value);
+                }
+
+                writeRequest = builder.build();
+            } catch (Exception e) {
+                //When building a request we want to discard the write if there is an error.
+                log.warn("Failed to Write to {}", plc4xConnectionString);
+                return;
             }
 
-            PlcWriteRequest writeRequest = builder.build();
-
             try {
                 writeRequest.execute().get();
             } catch (InterruptedException | ExecutionException e) {
-                log.info("Failed to Write to {}" + connectionString);
+                log.warn("Failed to Write to {}", plc4xConnectionString);
             }
 
             try {
                 connection.close();
             } catch (Exception e) {
-                log.info("Failed to Close {}" + connectionString);
+                log.warn("Failed to Close {}", plc4xConnectionString);
             }
         }
         return;
     }
-}
\ No newline at end of file
+}