You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2018/10/14 15:51:20 UTC
[incubator-plc4x] branch master updated: - Added a demo running
InfluxDB as storage
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
The following commit(s) were added to refs/heads/master by this push:
new 4bf9c25 - Added a demo running InfluxDB as storage
4bf9c25 is described below
commit 4bf9c25064cfa7d345d365aa5312dabe4fee34c4
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sun Oct 14 17:50:59 2018 +0200
- Added a demo running InfluxDB as storage
---
plc4j/protocols/delta-v/pom.xml | 9 +-
.../plc4x/java/deltav/{PoC.java => PoCES.java} | 10 +-
.../plc4x/java/deltav/{PoC.java => PoCInflux.java} | 163 ++++++++-------------
3 files changed, 74 insertions(+), 108 deletions(-)
diff --git a/plc4j/protocols/delta-v/pom.xml b/plc4j/protocols/delta-v/pom.xml
index 07c6e34..a7fa0f8 100644
--- a/plc4j/protocols/delta-v/pom.xml
+++ b/plc4j/protocols/delta-v/pom.xml
@@ -57,6 +57,13 @@
<version>1.11</version>
</dependency>
+ <!-- InfluxDB dependencies -->
+ <dependency>
+ <groupId>org.influxdb</groupId>
+ <artifactId>influxdb-java</artifactId>
+ <version>2.14</version>
+ </dependency>
+
<!-- Elasticsearch dependencies -->
<dependency>
<groupId>org.elasticsearch</groupId>
@@ -86,7 +93,7 @@
<version>1.6.0</version>
<configuration>
<classpathScope>test</classpathScope>
- <mainClass>org.apache.plc4x.java.deltav.PoC</mainClass>
+ <mainClass>org.apache.plc4x.java.deltav.PoCInflux</mainClass>
</configuration>
</plugin>
</plugins>
diff --git a/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java b/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCES.java
similarity index 99%
copy from plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java
copy to plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCES.java
index 7e0ec93..26bfb9a 100644
--- a/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java
+++ b/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCES.java
@@ -50,9 +50,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-public class PoC {
+public class PoCES {
- private static final Logger logger = LoggerFactory.getLogger(PoC.class);
+ private static final Logger logger = LoggerFactory.getLogger(PoCES.class);
private static final int SNAPLEN = 65536;
private static final int READ_TIMEOUT = 10;
@@ -65,7 +65,7 @@ public class PoC {
private Map<String, Map<Short, String>> testpointFieldNames = new HashMap<>();
private Map<String, Map<Short, Object>> testpointFieldValues = new HashMap<>();
- private PoC(String inputPath) throws Exception {
+ private PoCES(String inputPath) throws Exception {
if(inputPath == null) {
PcapNetworkInterface nif = null;
for (PcapNetworkInterface dev : Pcaps.findAllDevs()) {
@@ -1608,9 +1608,9 @@ public class PoC {
public static void main(String[] args) throws Exception {
if(args.length == 0) {
- new PoC(null);
+ new PoCES(null);
} else {
- new PoC(args[0]);
+ new PoCES(args[0]);
}
}
diff --git a/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java b/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCInflux.java
similarity index 95%
rename from plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java
rename to plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCInflux.java
index 7e0ec93..767dbfb 100644
--- a/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoC.java
+++ b/plc4j/protocols/delta-v/src/test/java/org/apache/plc4x/java/deltav/PoCInflux.java
@@ -1,48 +1,38 @@
/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
package org.apache.plc4x.java.deltav;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.codec.binary.Hex;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.node.InternalSettingsPreparer;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeValidationException;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.transport.Netty4Plugin;
+import org.influxdb.BatchOptions;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Pong;
import org.pcap4j.core.*;
import org.pcap4j.packet.UdpPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
@@ -50,22 +40,22 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-public class PoC {
+public class PoCInflux {
- private static final Logger logger = LoggerFactory.getLogger(PoC.class);
+ private static final Logger logger = LoggerFactory.getLogger(PoCInflux.class);
private static final int SNAPLEN = 65536;
private static final int READ_TIMEOUT = 10;
private PcapHandle receiveHandle;
- private Client esClient;
+ private InfluxDB connection;
private List<String> missingNames = new LinkedList<>();
private Map<String, String> testpointNames = new HashMap<>();
private Map<String, Map<Short, String>> testpointFieldNames = new HashMap<>();
private Map<String, Map<Short, Object>> testpointFieldValues = new HashMap<>();
- private PoC(String inputPath) throws Exception {
+ private PoCInflux(String inputPath) throws Exception {
if(inputPath == null) {
PcapNetworkInterface nif = null;
for (PcapNetworkInterface dev : Pcaps.findAllDevs()) {
@@ -657,7 +647,7 @@ public class PoC {
case (byte) 0x21: {
// From having a look at the byte values these could be 32bit floating point values with some sort of parameters
byte param = buf.readByte();
- decodeParam(param);
+ //decodeParam(param);
float floatValue = buf.readFloat();
//floatValue = Math.round(floatValue * 100.0f) / 100.0f;
updateValue(testpoint, fieldId, floatValue);
@@ -666,7 +656,7 @@ public class PoC {
case (byte) 0x22: {
// Parse boolean (From what I learnt, this could be a flagged boolean, where the first byte is some sort of param)
byte param = buf.readByte();
- decodeParam(param);
+ //decodeParam(param);
byte booleanByteValue = buf.readByte();
boolean booleanValue = false;
switch (booleanByteValue) {
@@ -784,7 +774,7 @@ public class PoC {
// Found blocks:
// 80 00 00 06 0d
byte param = buf.readByte();
- decodeParam(param);
+ //decodeParam(param);
int intValue = buf.readInt();
updateValue(testpoint, fieldId, intValue);
break;
@@ -1414,12 +1404,13 @@ public class PoC {
};
// Start an Elasticsearch node.
- Node esNode = startElasticsearchNode();
- esClient = esNode.client();
- System.out.println("Started Elasticsearch node on port 9200");
-
+ connection = connect();
+ Pong pong = connection.ping();
+ if(pong != null) {
+ System.out.println("Connected to InfluxDB");
+ }
// Make sure the indexes exist prior to writing to them.
- prepareIndexes(esClient);
+// prepareIndexes(connection);
// Start the packet capturing.
ExecutorService pool = Executors.newScheduledThreadPool(2);
@@ -1432,51 +1423,16 @@ public class PoC {
});
}
- private static class MyNode extends Node {
- private MyNode(Settings preparedSettings, Collection<Class<? extends Plugin>> classpathPlugins) {
- super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), classpathPlugins);
- }
+ private InfluxDB connect() {
+ InfluxDB connection = InfluxDBFactory.connect("http://10.10.64.222:8086", "admin", "password");
+ connection.setLogLevel(InfluxDB.LogLevel.BASIC);
+ connection.createDatabase("delta-v-demo");
+ connection.setDatabase("delta-v-demo");
+ connection.enableBatch(BatchOptions.DEFAULTS);
+ return connection;
}
- private Node startElasticsearchNode() {
- try {
- Node node = new MyNode(Settings.builder()
- .put("transport.type", "netty4")
- .put("http.type", "netty4")
- .put("http.enabled", "true")
- .put("path.home", "elasticsearch-data")
- .build(), Collections.singletonList(Netty4Plugin.class));
- node.start();
- return node;
- } catch (NodeValidationException e) {
- throw new RuntimeException("Could not start Elasticsearch node.", e);
- }
- }
-
- private void prepareIndexes(Client esClient) {
- IndicesAdminClient indicesAdminClient = esClient.admin().indices();
-
- // Check if the factory-data index exists and create it, if it doesn't.
- IndicesExistsRequest factoryDataIndexExistsRequest =
- indicesAdminClient.prepareExists("delta-v-testpoints").request();
- if(!indicesAdminClient.exists(factoryDataIndexExistsRequest).actionGet().isExists()) {
- CreateIndexRequest createIndexRequest = new CreateIndexRequest("delta-v-testpoints");
- createIndexRequest.mapping("Testpoint",
- "{\n" +
- " \"properties\": {\n" +
- " \"time\": {\n" +
- " \"type\": \"date\"\n" +
- " },\n" +
- " \"testpoint\": {\n" +
- " \"type\": \"keyword\"\n" +
- " }\n" +
- " }\n" +
- " }", XContentType.JSON);
- CreateIndexResponse createIndexResponse = indicesAdminClient.create(createIndexRequest).actionGet();
- if(!createIndexResponse.isAcknowledged()) {
- throw new RuntimeException("Could not create index 'delta-v-messstellen'");
- }
- }
+ private void prepareIndexes(InfluxDB connection) {
}
private String getTestpointName(String testpointId, Short fieldId) {
@@ -1524,23 +1480,26 @@ public class PoC {
}
String testpointName = testpointNames.get(testpointId);
- try {
- // Prepare the JSON for of the data.
- XContentBuilder testpointJson = XContentFactory.jsonBuilder()
- .startObject()
- .field("time", Calendar.getInstance().getTimeInMillis())
- .field("testpoint", testpointName);
- Map<Short, Object> values = testpointFieldValues.get(testpointId);
- for (Map.Entry<Short, Object> fieldValues : values.entrySet()) {
- testpointJson.field(getTestpointName(testpointId, fieldValues.getKey()), fieldValues.getValue());
- }
- testpointJson.endObject();
- // Store it in Elastic
- esClient.prepareIndex("delta-v-testpoints", "Testpoint").setSource(testpointJson).get();
- } catch (IOException e) {
- e.printStackTrace();
+ Point.Builder builder = Point.measurement("Messstelle")
+ .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
+ .tag("MESSSTELLE", testpointName);
+
+ Map<Short, Object> values = testpointFieldValues.get(testpointId);
+ for (Map.Entry<Short, Object> fieldValues : values.entrySet()) {
+ if(fieldValues.getValue() instanceof Number) {
+ builder.addField(getTestpointName(testpointId, fieldValues.getKey()), (Number) fieldValues.getValue());
+ } else if(fieldValues.getValue() instanceof Boolean) {
+ builder.addField(getTestpointName(testpointId, fieldValues.getKey()), (Boolean) fieldValues.getValue());
+ } else {
+ System.out.println("unknown type");
+ }
}
+
+ Point point = builder.build();
+
+ // Write the data to the db.
+ connection.write(point);
}
protected void outputDetectedBlock(String name, ByteBuf byteBuf, int endOfLastBlock) {
@@ -1608,9 +1567,9 @@ public class PoC {
public static void main(String[] args) throws Exception {
if(args.length == 0) {
- new PoC(null);
+ new PoCInflux(null);
} else {
- new PoC(args[0]);
+ new PoCInflux(args[0]);
}
}