You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2020/01/29 12:57:38 UTC

[plc4x] 01/06: - Added an example wiring data to InfluxDB using PLC4X' subscription API - Fine-tuned the BACnet driver

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch fixes-mathi
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 0f4560e005f92ccf973a8fff47f1cb6617f65625
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Jan 28 15:35:24 2020 +0100

    - Added an example wiring data to InfluxDB using PLC4X' subscription API
    - Fine-tuned the BACnet driver
---
 .../example.properties                             |  32 +++++
 .../examples/hello-influx-data-collection/pom.xml  | 118 +++++++++++++++++
 .../java/examples/helloinflux/HelloInflux.java     | 145 +++++++++++++++++++++
 .../src/main/resources/logback.xml                 |  34 +++++
 plc4j/examples/pom.xml                             |   1 +
 .../PassiveBacNetIpConfiguration.java              |   2 +-
 .../protocol/PassiveBacNetIpProtocolLogic.java     |   7 +-
 7 files changed, 335 insertions(+), 4 deletions(-)

diff --git a/plc4j/examples/hello-influx-data-collection/example.properties b/plc4j/examples/hello-influx-data-collection/example.properties
new file mode 100644
index 0000000..ce87372
--- /dev/null
+++ b/plc4j/examples/hello-influx-data-collection/example.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Connection string for the indlux part
+influx.connectionString=http://127.0.0.1:9999
+influx.accessToken={yeaaaaahhh ... you'd really love that ... wouldn't you?}
+influx.org=My Cool Org
+influx.bucket=my-cool-bucket
+influx.measurement=my-cool-measurement
+
+# Connection string for a plc part.
+# Use a connection that supports subscriptions (BACnet/IP, KNXnet/IP, ADS).
+plc.connectionString=plc4x-connection-string
+# Field query (currently only one field)
+# Using BACnet/IP or KNXnet/IP this could be: */*/*
+plc.query=plc4x-field-query
\ No newline at end of file
diff --git a/plc4j/examples/hello-influx-data-collection/pom.xml b/plc4j/examples/hello-influx-data-collection/pom.xml
new file mode 100644
index 0000000..5a561c6
--- /dev/null
+++ b/plc4j/examples/hello-influx-data-collection/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.plc4x.examples</groupId>
+    <artifactId>plc4j-examples</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>plc4j-hello-influx-data-collection</artifactId>
+  <name>PLC4J: Examples: Hello InfluxDB Data Collection</name>
+  <description>Data Collection with PLC4X and InfluxDB.</description>
+
+  <properties>
+    <app.main.class>org.apache.plc4x.java.examples.helloinflux.HelloInflux</app.main.class>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-api</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-spi</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+    </dependency>
+
+    <!-- InfluxDB dependencies -->
+    <dependency>
+      <groupId>com.influxdb</groupId>
+      <artifactId>influxdb-client-java</artifactId>
+      <version>1.4.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-configuration2</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>log4j-over-slf4j</artifactId>
+      <version>1.7.25</version>
+    </dependency>
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+    </dependency>
+
+    <!-- Required driver implementation -->
+    <dependency>
+      <groupId>org.apache.plc4x.sandbox</groupId>
+      <artifactId>test-java-bacnetip-driver</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-transport-pcap-socket</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-transport-raw-socket</artifactId>
+      <version>0.7.0-SNAPSHOT</version>
+      <scope>runtime</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <usedDependencies combine.children="append">
+            <usedDependency>org.apache.plc4x.sandbox:test-java-bacnetip-driver</usedDependency>
+            <usedDependency>org.apache.plc4x:plc4j-transport-pcap-socket</usedDependency>
+            <usedDependency>org.apache.plc4x:plc4j-transport-raw-socket</usedDependency>
+            <usedDependency>org.slf4j:log4j-over-slf4j</usedDependency>
+          </usedDependencies>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
\ No newline at end of file
diff --git a/plc4j/examples/hello-influx-data-collection/src/main/java/org/apache/plc4x/java/examples/helloinflux/HelloInflux.java b/plc4j/examples/hello-influx-data-collection/src/main/java/org/apache/plc4x/java/examples/helloinflux/HelloInflux.java
new file mode 100644
index 0000000..57b4cbb
--- /dev/null
+++ b/plc4j/examples/hello-influx-data-collection/src/main/java/org/apache/plc4x/java/examples/helloinflux/HelloInflux.java
@@ -0,0 +1,145 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.examples.helloinflux;
+
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.WriteApi;
+import com.influxdb.client.domain.WritePrecision;
+import com.influxdb.client.write.Point;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.builder.fluent.Configurations;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.value.*;
+import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HelloInflux {
+
+    private static final Logger logger = LoggerFactory.getLogger(HelloInflux.class);
+
+    private Configuration configuration;
+
+    public HelloInflux(File configFile) {
+        Configurations configs = new Configurations();
+        try {
+            configuration = configs.properties(configFile);
+        } catch (ConfigurationException cex) {
+            throw new RuntimeException("Error reading configuration");
+        }
+    }
+
+    public void run() {
+        InfluxDBClient dbConnection = connectToDb();
+        WriteApi writeApi = dbConnection.getWriteApi();
+        try {
+            PlcConnection plcConnection = connectToPlc();
+
+            final PlcSubscriptionRequest subscriptionRequest =
+                plcConnection.subscriptionRequestBuilder().addChangeOfStateField("query",
+                    configuration.getString("plc.query")).build();
+            final PlcSubscriptionResponse subscriptionResponse =
+                subscriptionRequest.execute().get(10, TimeUnit.SECONDS);
+            subscriptionResponse.getSubscriptionHandle("query").register(plcSubscriptionEvent -> {
+                DefaultPlcSubscriptionEvent internalEvent = (DefaultPlcSubscriptionEvent) plcSubscriptionEvent;
+                final Point point = Point.measurement(configuration.getString("influx.measurement"))
+                    .time(plcSubscriptionEvent.getTimestamp().toEpochMilli(), WritePrecision.MS);
+                final Map<String, Pair<PlcResponseCode, PlcValue>> values = internalEvent.getValues();
+                values.forEach((fieldName, fieldResponsePair) -> {
+                    final PlcResponseCode responseCode = fieldResponsePair.getLeft();
+                    final PlcValue plcValue = fieldResponsePair.getRight();
+                    if(responseCode == PlcResponseCode.OK) {
+                        PlcStruct structValue = (PlcStruct) plcValue;
+                        for (String key : structValue.getKeys()) {
+                            PlcValue subValue = structValue.getValue(key);
+                            registerFields(point, key, subValue);
+                        }
+                    }
+                });
+                writeApi.writePoint(
+                    configuration.getString("influx.bucket"), configuration.getString("influx.org"), point);
+            });
+        } catch (PlcException e) {
+            logger.error("PLC Error", e);
+        } catch (Exception e) {
+            logger.error("General Error", e);
+        }
+    }
+
+    private void registerFields(Point point, String contextName, PlcValue plcValue) {
+        if (contextName.equals("address")) {
+            point.addTag(contextName, plcValue.getString());
+        } else {
+            if (plcValue instanceof PlcBoolean) {
+                point.addField(contextName, plcValue.getBoolean());
+            } else if (plcValue instanceof PlcInteger) {
+                point.addField(contextName, plcValue.getLong());
+            } else if (plcValue instanceof PlcFloat) {
+                point.addField(contextName, plcValue.getFloat());
+            } else if (plcValue instanceof PlcDouble) {
+                point.addField(contextName, plcValue.getDouble());
+            } else if (plcValue instanceof PlcString) {
+                point.addField(contextName, plcValue.getString());
+            } else if (plcValue instanceof PlcStruct) {
+                PlcStruct structValue = (PlcStruct) plcValue;
+                for (String key : structValue.getKeys()) {
+                    PlcValue subValue = structValue.getValue(key);
+                    registerFields(point, contextName + "-" + key, subValue);
+                }
+            }
+        }
+    }
+
+    private InfluxDBClient connectToDb() {
+        char[] token = configuration.getString("influx.accessToken").toCharArray();
+        return InfluxDBClientFactory.create(configuration.getString("influx.connectionString"), token);
+    }
+
+    private PlcConnection connectToPlc() throws PlcException {
+        final PlcConnection connection =
+            new PlcDriverManager().getConnection(configuration.getString("plc.connectionString"));
+        connection.connect();
+        return connection;
+    }
+
+    public static void main(String[] args) {
+        if(args.length != 1) {
+            System.out.println("Usage: HelloInflux {path-to-config-file}");
+        }
+        final File configFile = new File(args[0]);
+        if(!configFile.exists() || !configFile.isFile()) {
+            throw new PlcRuntimeException("Could not read config file");
+        }
+        new HelloInflux(configFile).run();
+    }
+
+}
diff --git a/plc4j/examples/hello-influx-data-collection/src/main/resources/logback.xml b/plc4j/examples/hello-influx-data-collection/src/main/resources/logback.xml
new file mode 100644
index 0000000..d52f9f6
--- /dev/null
+++ b/plc4j/examples/hello-influx-data-collection/src/main/resources/logback.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<configuration xmlns="http://ch.qos.logback/xml/ns/logback"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd">
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="warn">
+    <appender-ref ref="STDOUT" />
+  </root>
+
+</configuration>
\ No newline at end of file
diff --git a/plc4j/examples/pom.xml b/plc4j/examples/pom.xml
index 5b4a5c1..11f70b8 100644
--- a/plc4j/examples/pom.xml
+++ b/plc4j/examples/pom.xml
@@ -44,6 +44,7 @@
     <module>hello-cloud-google</module>
     <module>hello-connectivity-kafka</module>
     <module>hello-connectivity-mqtt</module>
+    <module>hello-influx-data-collection</module>
     <module>hello-integration-edgent</module>
     <module>hello-opm</module>
     <module>hello-storage-elasticsearch</module>
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/PassiveBacNetIpConfiguration.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/PassiveBacNetIpConfiguration.java
index 76519f5..acbe68f 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/PassiveBacNetIpConfiguration.java
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/configuration/PassiveBacNetIpConfiguration.java
@@ -51,7 +51,7 @@ public class PassiveBacNetIpConfiguration implements Configuration, UdpTransport
 
     @Override
     public float getReplaySpeedFactor() {
-        return 0;
+        return 1.0F;
     }
 
     /**
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/PassiveBacNetIpProtocolLogic.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/PassiveBacNetIpProtocolLogic.java
index 6973d6c..6e9a8c1 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/PassiveBacNetIpProtocolLogic.java
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/protocol/PassiveBacNetIpProtocolLogic.java
@@ -27,9 +27,7 @@ import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
 import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.java.api.value.PlcString;
-import org.apache.plc4x.java.api.value.PlcStruct;
-import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.api.value.*;
 import org.apache.plc4x.java.bacnetip.configuration.PassiveBacNetIpConfiguration;
 import org.apache.plc4x.java.bacnetip.ede.EdeParser;
 import org.apache.plc4x.java.bacnetip.ede.model.Datapoint;
@@ -128,6 +126,9 @@ public class PassiveBacNetIpProtocolLogic extends Plc4xProtocolBase<BVLC> implem
 
                             // Initialize an enriched version of the PlcStruct.
                             final Map<String, PlcValue> enrichedPlcValue = new HashMap<>();
+                            enrichedPlcValue.put("deviceIdentifier", new PlcLong(deviceIdentifier));
+                            enrichedPlcValue.put("objectType", new PlcInteger(objectType));
+                            enrichedPlcValue.put("objectInstance", new PlcLong(objectInstance));
                             enrichedPlcValue.put("address", new PlcString(toString(curField)));
                             // Add all of the existing attributes.
                             enrichedPlcValue.putAll(plcValue.getStruct());