You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2020/01/29 12:57:38 UTC
[plc4x] 01/06: - Added an example wiring data to InfluxDB using
PLC4X' subscription API - Fine-tuned the BACnet driver
This is an automated email from the ASF dual-hosted git repository.
jfeinauer pushed a commit to branch fixes-mathi
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 0f4560e005f92ccf973a8fff47f1cb6617f65625
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Jan 28 15:35:24 2020 +0100
- Added an example wiring data to InfluxDB using PLC4X' subscription API
- Fine-tuned the BACnet driver
---
.../example.properties | 32 +++++
.../examples/hello-influx-data-collection/pom.xml | 118 +++++++++++++++++
.../java/examples/helloinflux/HelloInflux.java | 145 +++++++++++++++++++++
.../src/main/resources/logback.xml | 34 +++++
plc4j/examples/pom.xml | 1 +
.../PassiveBacNetIpConfiguration.java | 2 +-
.../protocol/PassiveBacNetIpProtocolLogic.java | 7 +-
7 files changed, 335 insertions(+), 4 deletions(-)
diff --git a/plc4j/examples/hello-influx-data-collection/example.properties b/plc4j/examples/hello-influx-data-collection/example.properties
new file mode 100644
index 0000000..ce87372
--- /dev/null
+++ b/plc4j/examples/hello-influx-data-collection/example.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Connection string for the indlux part
+influx.connectionString=http://127.0.0.1:9999
+influx.accessToken={yeaaaaahhh ... you'd really love that ... wouldn't you?}
+influx.org=My Cool Org
+influx.bucket=my-cool-bucket
+influx.measurement=my-cool-measurement
+
+# Connection string for a plc part.
+# Use a connection that supports subscriptions (BACnet/IP, KNXnet/IP, ADS).
+plc.connectionString=plc4x-connection-string
+# Field query (currently only one field)
+# Using BACnet/IP or KNXnet/IP this could be: */*/*
+plc.query=plc4x-field-query
\ No newline at end of file
diff --git a/plc4j/examples/hello-influx-data-collection/pom.xml b/plc4j/examples/hello-influx-data-collection/pom.xml
new file mode 100644
index 0000000..5a561c6
--- /dev/null
+++ b/plc4j/examples/hello-influx-data-collection/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.plc4x.examples</groupId>
+ <artifactId>plc4j-examples</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>plc4j-hello-influx-data-collection</artifactId>
+ <name>PLC4J: Examples: Hello InfluxDB Data Collection</name>
+ <description>Data Collection with PLC4X and InfluxDB.</description>
+
+ <properties>
+ <app.main.class>org.apache.plc4x.java.examples.helloinflux.HelloInflux</app.main.class>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-api</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-spi</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- InfluxDB dependencies -->
+ <dependency>
+ <groupId>com.influxdb</groupId>
+ <artifactId>influxdb-client-java</artifactId>
+ <version>1.4.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-configuration2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+
+ <!-- Required driver implementation -->
+ <dependency>
+ <groupId>org.apache.plc4x.sandbox</groupId>
+ <artifactId>test-java-bacnetip-driver</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-transport-pcap-socket</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-transport-raw-socket</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <usedDependencies combine.children="append">
+ <usedDependency>org.apache.plc4x.sandbox:test-java-bacnetip-driver</usedDependency>
+ <usedDependency>org.apache.plc4x:plc4j-transport-pcap-socket</usedDependency>
+ <usedDependency>org.apache.plc4x:plc4j-transport-raw-socket</usedDependency>
+ <usedDependency>org.slf4j:log4j-over-slf4j</usedDependency>
+ </usedDependencies>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/plc4j/examples/hello-influx-data-collection/src/main/java/org/apache/plc4x/java/examples/helloinflux/HelloInflux.java b/plc4j/examples/hello-influx-data-collection/src/main/java/org/apache/plc4x/java/examples/helloinflux/HelloInflux.java
new file mode 100644
index 0000000..57b4cbb
--- /dev/null
+++ b/plc4j/examples/hello-influx-data-collection/src/main/java/org/apache/plc4x/java/examples/helloinflux/HelloInflux.java
@@ -0,0 +1,145 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.examples.helloinflux;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.builder.fluent.Configurations;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.value.*;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HelloInflux {
+
+ private static final Logger logger = LoggerFactory.getLogger(HelloInflux.class);
+
+ private Configuration configuration;
+
+ public HelloInflux(File configFile) {
+ Configurations configs = new Configurations();
+ try {
+ configuration = configs.properties(configFile);
+ } catch (ConfigurationException cex) {
+ throw new RuntimeException("Error reading configuration");
+ }
+ }
+
+ public void run() {
+ InfluxDBClient dbConnection = connectToDb();
+ WriteApi writeApi = dbConnection.getWriteApi();
+ try {
+ PlcConnection plcConnection = connectToPlc();
+
+ final PlcSubscriptionRequest subscriptionRequest =
+ plcConnection.subscriptionRequestBuilder().addChangeOfStateField("query",
+ configuration.getString("plc.query")).build();
+ final PlcSubscriptionResponse subscriptionResponse =
+ subscriptionRequest.execute().get(10, TimeUnit.SECONDS);
+ subscriptionResponse.getSubscriptionHandle("query").register(plcSubscriptionEvent -> {
+ DefaultPlcSubscriptionEvent internalEvent = (DefaultPlcSubscriptionEvent) plcSubscriptionEvent;
+ final Point point = Point.measurement(configuration.getString("influx.measurement"))
+ .time(plcSubscriptionEvent.getTimestamp().toEpochMilli(), WritePrecision.MS);
+ final Map<String, Pair<PlcResponseCode, PlcValue>> values = internalEvent.getValues();
+ values.forEach((fieldName, fieldResponsePair) -> {
+ final PlcResponseCode responseCode = fieldResponsePair.getLeft();
+ final PlcValue plcValue = fieldResponsePair.getRight();
+ if(responseCode == PlcResponseCode.OK) {
+ PlcStruct structValue = (PlcStruct) plcValue;
+ for (String key : structValue.getKeys()) {
+ PlcValue subValue = structValue.getValue(key);
+ registerFields(point, key, subValue);
+ }
+ }
+ });
+ writeApi.writePoint(
+ configuration.getString("influx.bucket"), configuration.getString("influx.org"), point);
+ });
+ } catch (PlcException e) {
+ logger.error("PLC Error", e);
+ } catch (Exception e) {
+ logger.error("General Error", e);
+ }
+ }
+
+ private void registerFields(Point point, String contextName, PlcValue plcValue) {
+ if (contextName.equals("address")) {
+ point.addTag(contextName, plcValue.getString());
+ } else {
+ if (plcValue instanceof PlcBoolean) {
+ point.addField(contextName, plcValue.getBoolean());
+ } else if (plcValue instanceof PlcInteger) {
+ point.addField(contextName, plcValue.getLong());
+ } else if (plcValue instanceof PlcFloat) {
+ point.addField(contextName, plcValue.getFloat());
+ } else if (plcValue instanceof PlcDouble) {
+ point.addField(contextName, plcValue.getDouble());
+ } else if (plcValue instanceof PlcString) {
+ point.addField(contextName, plcValue.getString());
+ } else if (plcValue instanceof PlcStruct) {
+ PlcStruct structValue = (PlcStruct) plcValue;
+ for (String key : structValue.getKeys()) {
+ PlcValue subValue = structValue.getValue(key);
+ registerFields(point, contextName + "-" + key, subValue);
+ }
+ }
+ }
+ }
+
+ private InfluxDBClient connectToDb() {
+ char[] token = configuration.getString("influx.accessToken").toCharArray();
+ return InfluxDBClientFactory.create(configuration.getString("influx.connectionString"), token);
+ }
+
+ private PlcConnection connectToPlc() throws PlcException {
+ final PlcConnection connection =
+ new PlcDriverManager().getConnection(configuration.getString("plc.connectionString"));
+ connection.connect();
+ return connection;
+ }
+
+ public static void main(String[] args) {
+ if(args.length != 1) {
+ System.out.println("Usage: HelloInflux {path-to-config-file}");
+ }
+ final File configFile = new File(args[0]);
+ if(!configFile.exists() || !configFile.isFile()) {
+ throw new PlcRuntimeException("Could not read config file");
+ }
+ new HelloInflux(configFile).run();
+ }
+
+}
diff --git a/plc4j/examples/hello-influx-data-collection/src/main/resources/logback.xml b/plc4j/examples/hello-influx-data-collection/src/main/resources/logback.xml
new file mode 100644
index 0000000..d52f9f6
--- /dev/null
+++ b/plc4j/examples/hello-influx-data-collection/src/main/resources/logback.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<configuration xmlns="http://ch.qos.logback/xml/ns/logback"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="warn">
+ <appender-ref ref="STDOUT" />
+ </root>
+
+</configuration>
\ No newline at end of file
diff --git a/plc4j/examples/pom.xml b/plc4j/examples/pom.xml
index 5b4a5c1..11f70b8 100644
--- a/plc4j/examples/pom.xml
+++ b/plc4j/examples/pom.xml
@@ -44,6 +44,7 @@
<module>hello-cloud-google</module>
<module>hello-connectivity-kafka</module>
<module>hello-connectivity-mqtt</module>
+ <module>hello-influx-data-collection</module>
<module>hello-integration-edgent</module>
<module>hello-opm</module>
<module>hello-storage-elasticsearch</module>
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/PassiveBacNetIpConfiguration.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/PassiveBacNetIpConfiguration.java
index 76519f5..acbe68f 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/PassiveBacNetIpConfiguration.java
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/PassiveBacNetIpConfiguration.java
@@ -51,7 +51,7 @@ public class PassiveBacNetIpConfiguration implements Configuration, UdpTransport
@Override
public float getReplaySpeedFactor() {
- return 0;
+ return 1.0F;
}
/**
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/PassiveBacNetIpProtocolLogic.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/PassiveBacNetIpProtocolLogic.java
index 6973d6c..6e9a8c1 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/PassiveBacNetIpProtocolLogic.java
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/PassiveBacNetIpProtocolLogic.java
@@ -27,9 +27,7 @@ import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.java.api.value.PlcString;
-import org.apache.plc4x.java.api.value.PlcStruct;
-import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.api.value.*;
import org.apache.plc4x.java.bacnetip.configuration.PassiveBacNetIpConfiguration;
import org.apache.plc4x.java.bacnetip.ede.EdeParser;
import org.apache.plc4x.java.bacnetip.ede.model.Datapoint;
@@ -128,6 +126,9 @@ public class PassiveBacNetIpProtocolLogic extends Plc4xProtocolBase<BVLC> implem
// Initialize an enriched version of the PlcStruct.
final Map<String, PlcValue> enrichedPlcValue = new HashMap<>();
+ enrichedPlcValue.put("deviceIdentifier", new PlcLong(deviceIdentifier));
+ enrichedPlcValue.put("objectType", new PlcInteger(objectType));
+ enrichedPlcValue.put("objectInstance", new PlcLong(objectInstance));
enrichedPlcValue.put("address", new PlcString(toString(curField)));
// Add all of the existing attributes.
enrichedPlcValue.putAll(plcValue.getStruct());