You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2020/11/05 11:39:54 UTC
[plc4x] branch feature/kafkasink updated: Removed request
connectionstring, added expires field
This is an automated email from the ASF dual-hosted git repository.
hutcheb pushed a commit to branch feature/kafkasink
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/feature/kafkasink by this push:
new 4dc7a5a Removed request connectionstring, added expires field
4dc7a5a is described below
commit 4dc7a5acb1aa3d92e99d572032315a284f56747c
Author: hutcheb <be...@gmail.com>
AuthorDate: Thu Nov 5 06:39:35 2020 -0500
Removed request connectionstring, added expires field
- Add an expires field which allows us to discard a write request if it
is too old.
- removed the connection string from the request, it is already defined
in the config file.
- Added handling/non handling of errors so that requests are discarded
if an error occurs.
- Added sample sink config file, need to fix topics field.
---
.../apache-kafka/config/plc4x-sink.properties | 36 +++++++++++++++
.../org/apache/plc4x/kafka/Plc4xSinkConnector.java | 1 -
.../java/org/apache/plc4x/kafka/Plc4xSinkTask.java | 53 +++++++++++++---------
3 files changed, 67 insertions(+), 23 deletions(-)
diff --git a/plc4j/integrations/apache-kafka/config/plc4x-sink.properties b/plc4j/integrations/apache-kafka/config/plc4x-sink.properties
new file mode 100644
index 0000000..83ff4d2
--- /dev/null
+++ b/plc4j/integrations/apache-kafka/config/plc4x-sink.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+name=plc-1
+connector.class=org.apache.plc4x.kafka.Plc4xSinkConnector
+default-topic=machineData
+topics=machineData
+tasks.max=3
+
+sinks=machineA
+sinks.machineA.connectionString=modbus://localhost
+sinks.machineA.topc=machineData
+
+bootstrap.servers=127.0.0.1:9092
+key.converter=org.apache.kafka.connect.json.JsonConverter
+value.converter=org.apache.kafka.connect.json.JsonConverter
+key.converter.schemas.enable=true
+value.converter.schemas.enable=true
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+plugin.path=/usr/local/share/kafka/plugins
+errors.log.enable = true
+errors.tolerance=all
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 253843a..8a83552 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -121,7 +121,6 @@ public class Plc4xSinkConnector extends SinkConnector {
sinkConnectionStringConfigValue.addErrorMessage(connectionStringConfig + " is mandatory");
} else {
// TODO: Check if the connection string is valid.
-
String sinkTopicConfig = SINK_CONFIG + "." + sinkName + "." + TOPIC_CONFIG;
final ConfigValue sinkTopicConfigValue = new ConfigValue(sinkTopicConfig);
config.configValues().add(sinkTopicConfigValue);
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index f63b37e..7ea7985 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -81,12 +81,13 @@ public class Plc4xSinkTask extends SinkTask {
private PlcDriverManager driverManager;
private Transformation<SinkRecord> transformation;
+ private String plc4xConnectionString;
@Override
public void start(Map<String, String> props) {
AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
String connectionName = config.getString(CONNECTION_NAME_CONFIG);
- String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
+ plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
Map<String, String> topics = new HashMap<>();
log.info("Creating Pooled PLC4x driver manager");
driverManager = new PooledPlcDriverManager();
@@ -104,47 +105,55 @@ public class Plc4xSinkTask extends SinkTask {
if (records.isEmpty()) {
return;
}
- log.info(records.toString());
- //ObjectMapper mapper = new ObjectMapper();
for (SinkRecord r: records) {
Struct record = (Struct) r.value();
- String connectionString = (String) record.get("connectionString");
- String address = (String) record.get("address");
- String value = (String) record.get("value");
+ String address = record.getString("address");
+ String value = record.getString("value");
+ Long expires = record.getInt64("expires");
+
+ if ((System.currentTimeMillis() > expires) & !(expires == 0)) {
+ log.warn("Write request has expired {}, discarding {}", System.currentTimeMillis(), address);
+ return;
+ }
PlcConnection connection = null;
try {
- connection = driverManager.getConnection(connectionString);
- } catch (PlcConnectionException e) {
- log.info("Failed to Open Connection {}" + connectionString);
+ connection = driverManager.getConnection(plc4xConnectionString);
+ } catch (PlcConnectionException e) {
+ log.warn("Failed to Open Connection {}", plc4xConnectionString);
}
final PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
-
- //If an array value is passed instead of a single value then convert to a String array
- if ((value.charAt(0) == '[') && (value.charAt(value.length() - 1) == ']')) {
- String[] values = value.substring(1,value.length() - 1).split(",");
- log.info("Adding Tag " + Arrays.toString(values));
- builder.addItem(address, address, values);
- } else {
- builder.addItem(address, address, value);
+ PlcWriteRequest writeRequest;
+ try {
+ //If an array value is passed instead of a single value then convert to a String array
+ if ((value.charAt(0) == '[') && (value.charAt(value.length() - 1) == ']')) {
+ String[] values = value.substring(1,value.length() - 1).split(",");
+ builder.addItem(address, address, values);
+ } else {
+ builder.addItem(address, address, value);
+ }
+
+ writeRequest = builder.build();
+ } catch (Exception e) {
+ //When building a request we want to discard the write if there is an error.
+ log.warn("Failed to Write to {}", plc4xConnectionString);
+ return;
}
- PlcWriteRequest writeRequest = builder.build();
-
try {
writeRequest.execute().get();
} catch (InterruptedException | ExecutionException e) {
- log.info("Failed to Write to {}" + connectionString);
+ log.warn("Failed to Write to {}", plc4xConnectionString);
}
try {
connection.close();
} catch (Exception e) {
- log.info("Failed to Close {}" + connectionString);
+ log.warn("Failed to Close {}", plc4xConnectionString);
}
}
return;
}
-}
\ No newline at end of file
+}