You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/10/14 15:51:20 UTC

[incubator-plc4x] branch master updated: - Added a demo running InfluxDB as storage

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf9c25  - Added a demo running InfluxDB as storage
4bf9c25 is described below

commit 4bf9c25064cfa7d345d365aa5312dabe4fee34c4
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sun Oct 14 17:50:59 2018 +0200

    - Added a demo running InfluxDB as storage
---
 plc4j/protocols/delta-v/pom.xml                    |   9 +-
 .../plc4x/java/deltav/{PoC.java => PoCES.java}     |  10 +-
 .../plc4x/java/deltav/{PoC.java => PoCInflux.java} | 163 ++++++++-------------
 3 files changed, 74 insertions(+), 108 deletions(-)

diff --git a/plc4j/protocols/delta-v/pom.xml b/plc4j/protocols/delta-v/pom.xml
index 07c6e34..a7fa0f8 100644
--- a/plc4j/protocols/delta-v/pom.xml
+++ b/plc4j/protocols/delta-v/pom.xml
@@ -57,6 +57,13 @@
       <version>1.11</version>
     </dependency>
 
+    <!-- InfluxDB dependencies -->
+    <dependency>
+      <groupId>org.influxdb</groupId>
+      <artifactId>influxdb-java</artifactId>
+      <version>2.14</version>
+    </dependency>
+
     <!-- Elasticsearch dependencies -->
     <dependency>
       <groupId>org.elasticsearch</groupId>
@@ -86,7 +93,7 @@
         <version>1.6.0</version>
         <configuration>
           <classpathScope>test</classpathScope>
-          <mainClass>org.apache.plc4x.java.deltav.PoC</mainClass>
+          <mainClass>org.apache.plc4x.java.deltav.PoCInflux</mainClass>
         </configuration>
       </plugin>
     </plugins>
diff --git a/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java b/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCES.java
similarity index 99%
copy from plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java
copy to plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCES.java
index 7e0ec93..26bfb9a 100644
--- a/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java
+++ b/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCES.java
@@ -50,9 +50,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-public class PoC {
+public class PoCES {
 
-    private static final Logger logger = LoggerFactory.getLogger(PoC.class);
+    private static final Logger logger = LoggerFactory.getLogger(PoCES.class);
 
     private static final int SNAPLEN = 65536;
     private static final int READ_TIMEOUT = 10;
@@ -65,7 +65,7 @@ public class PoC {
     private Map<String, Map<Short, String>> testpointFieldNames = new HashMap<>();
     private Map<String, Map<Short, Object>> testpointFieldValues = new HashMap<>();
 
-    private PoC(String inputPath) throws Exception {
+    private PoCES(String inputPath) throws Exception {
         if(inputPath == null) {
             PcapNetworkInterface nif = null;
             for (PcapNetworkInterface dev : Pcaps.findAllDevs()) {
@@ -1608,9 +1608,9 @@ public class PoC {
 
     public static void main(String[] args) throws Exception {
         if(args.length == 0) {
-            new PoC(null);
+            new PoCES(null);
         } else {
-            new PoC(args[0]);
+            new PoCES(args[0]);
         }
     }
 
diff --git a/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java b/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCInflux.java
similarity index 95%
rename from plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java
rename to plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCInflux.java
index 7e0ec93..767dbfb 100644
--- a/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java
+++ b/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCInflux.java
@@ -1,48 +1,38 @@
 /*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
-  http://www.apache.org/licenses/LICENSE-2.0
+     http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
 
 package org.apache.plc4x.java.deltav;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.commons.codec.binary.Hex;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.node.InternalSettingsPreparer;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeValidationException;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.transport.Netty4Plugin;
+import org.influxdb.BatchOptions;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Pong;
 import org.pcap4j.core.*;
 import org.pcap4j.packet.UdpPacket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
@@ -50,22 +40,22 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-public class PoC {
+public class PoCInflux {
 
-    private static final Logger logger = LoggerFactory.getLogger(PoC.class);
+    private static final Logger logger = LoggerFactory.getLogger(PoCInflux.class);
 
     private static final int SNAPLEN = 65536;
     private static final int READ_TIMEOUT = 10;
 
     private PcapHandle receiveHandle;
-    private Client esClient;
+    private InfluxDB connection;
 
     private List<String> missingNames = new LinkedList<>();
     private Map<String, String> testpointNames = new HashMap<>();
     private Map<String, Map<Short, String>> testpointFieldNames = new HashMap<>();
     private Map<String, Map<Short, Object>> testpointFieldValues = new HashMap<>();
 
-    private PoC(String inputPath) throws Exception {
+    private PoCInflux(String inputPath) throws Exception {
         if(inputPath == null) {
             PcapNetworkInterface nif = null;
             for (PcapNetworkInterface dev : Pcaps.findAllDevs()) {
@@ -657,7 +647,7 @@ public class PoC {
                                             case (byte) 0x21: {
                                                 // From having a look at the byte values these could be 32bit floating point values with some sort of parameters
                                                 byte param = buf.readByte();
-                                                decodeParam(param);
+                                                //decodeParam(param);
                                                 float floatValue = buf.readFloat();
                                                 //floatValue = Math.round(floatValue * 100.0f) / 100.0f;
                                                 updateValue(testpoint, fieldId, floatValue);
@@ -666,7 +656,7 @@ public class PoC {
                                             case (byte) 0x22: {
                                                 // Parse boolean (From what I learnt, this could be a flagged boolean, where the first byte is some sort of param)
                                                 byte param = buf.readByte();
-                                                decodeParam(param);
+                                                //decodeParam(param);
                                                 byte booleanByteValue = buf.readByte();
                                                 boolean booleanValue = false;
                                                 switch (booleanByteValue) {
@@ -784,7 +774,7 @@ public class PoC {
                                                 // Found blocks:
                                                 // 80 00 00 06 0d
                                                 byte param = buf.readByte();
-                                                decodeParam(param);
+                                                //decodeParam(param);
                                                 int intValue = buf.readInt();
                                                 updateValue(testpoint, fieldId, intValue);
                                                 break;
@@ -1414,12 +1404,13 @@ public class PoC {
         };
 
         // Start an Elasticsearch node.
-        Node esNode = startElasticsearchNode();
-        esClient = esNode.client();
-        System.out.println("Started Elasticsearch node on port 9200");
-
+        connection = connect();
+        Pong pong = connection.ping();
+        if(pong != null) {
+            System.out.println("Connected to InfluxDB");
+        }
         // Make sure the indexes exist prior to writing to them.
-        prepareIndexes(esClient);
+//        prepareIndexes(connection);
 
         // Start the packet capturing.
         ExecutorService pool = Executors.newScheduledThreadPool(2);
@@ -1432,51 +1423,16 @@ public class PoC {
         });
     }
 
-    private static class MyNode extends Node {
-        private MyNode(Settings preparedSettings, Collection<Class<? extends Plugin>> classpathPlugins) {
-            super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), classpathPlugins);
-        }
+    private InfluxDB connect() {
+        InfluxDB connection = InfluxDBFactory.connect("http://10.10.64.222:8086", "admin", "password");
+        connection.setLogLevel(InfluxDB.LogLevel.BASIC);
+        connection.createDatabase("delta-v-demo");
+        connection.setDatabase("delta-v-demo");
+        connection.enableBatch(BatchOptions.DEFAULTS);
+        return connection;
     }
 
-    private Node startElasticsearchNode() {
-        try {
-            Node node = new MyNode(Settings.builder()
-                .put("transport.type", "netty4")
-                .put("http.type", "netty4")
-                .put("http.enabled", "true")
-                .put("path.home", "elasticsearch-data")
-                .build(), Collections.singletonList(Netty4Plugin.class));
-            node.start();
-            return node;
-        } catch (NodeValidationException e) {
-            throw new RuntimeException("Could not start Elasticsearch node.", e);
-        }
-    }
-
-    private void prepareIndexes(Client esClient) {
-        IndicesAdminClient indicesAdminClient = esClient.admin().indices();
-
-        // Check if the factory-data index exists and create it, if it doesn't.
-        IndicesExistsRequest factoryDataIndexExistsRequest =
-            indicesAdminClient.prepareExists("delta-v-testpoints").request();
-        if(!indicesAdminClient.exists(factoryDataIndexExistsRequest).actionGet().isExists()) {
-            CreateIndexRequest createIndexRequest = new CreateIndexRequest("delta-v-testpoints");
-            createIndexRequest.mapping("Testpoint",
-                "{\n" +
-                    "            \"properties\": {\n" +
-                    "                \"time\": {\n" +
-                    "                    \"type\": \"date\"\n" +
-                    "                },\n" +
-                    "                \"testpoint\": {\n" +
-                    "                    \"type\": \"keyword\"\n" +
-                    "                }\n" +
-                    "            }\n" +
-                    "        }", XContentType.JSON);
-            CreateIndexResponse createIndexResponse = indicesAdminClient.create(createIndexRequest).actionGet();
-            if(!createIndexResponse.isAcknowledged()) {
-                throw new RuntimeException("Could not create index 'delta-v-messstellen'");
-            }
-        }
+    private void prepareIndexes(InfluxDB connection) {
     }
 
     private String getTestpointName(String testpointId, Short fieldId) {
@@ -1524,23 +1480,26 @@ public class PoC {
         }
 
         String testpointName = testpointNames.get(testpointId);
-        try {
-            // Prepare the JSON for of the data.
-            XContentBuilder testpointJson = XContentFactory.jsonBuilder()
-                .startObject()
-                .field("time", Calendar.getInstance().getTimeInMillis())
-                .field("testpoint", testpointName);
-            Map<Short, Object> values = testpointFieldValues.get(testpointId);
-            for (Map.Entry<Short, Object> fieldValues : values.entrySet()) {
-                testpointJson.field(getTestpointName(testpointId, fieldValues.getKey()), fieldValues.getValue());
-            }
-            testpointJson.endObject();
 
-            // Store it in Elastic
-            esClient.prepareIndex("delta-v-testpoints", "Testpoint").setSource(testpointJson).get();
-        } catch (IOException e) {
-            e.printStackTrace();
+        Point.Builder builder = Point.measurement("Messstelle")
+            .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+            .tag("MESSSTELLE", testpointName);
+
+        Map<Short, Object> values = testpointFieldValues.get(testpointId);
+        for (Map.Entry<Short, Object> fieldValues : values.entrySet()) {
+            if(fieldValues.getValue() instanceof Number) {
+                builder.addField(getTestpointName(testpointId, fieldValues.getKey()), (Number) fieldValues.getValue());
+            } else if(fieldValues.getValue() instanceof Boolean) {
+                builder.addField(getTestpointName(testpointId, fieldValues.getKey()), (Boolean) fieldValues.getValue());
+            } else {
+                System.out.println("unknown type");
+            }
         }
+
+        Point point = builder.build();
+
+        // Write the data to the db.
+        connection.write(point);
     }
 
     protected void outputDetectedBlock(String name, ByteBuf byteBuf, int endOfLastBlock) {
@@ -1608,9 +1567,9 @@ public class PoC {
 
     public static void main(String[] args) throws Exception {
         if(args.length == 0) {
-            new PoC(null);
+            new PoCInflux(null);
         } else {
-            new PoC(args[0]);
+            new PoCInflux(args[0]);
         }
     }