You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/04/25 10:12:55 UTC
[incubator-streampipes-extensions] 02/02: [STREAMPIPES-113]: Update
OPC-UA adapter
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit d99640dfdd1adeea2d2048f7c69ff9d97a3bcfbd
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Apr 25 12:12:33 2020 +0200
[STREAMPIPES-113]: Update OPC-UA adapter
---
streampipes-connect-adapters/pom.xml | 7 +-
.../streampipes-connect-adapter/pom.xml | 26 +-
.../streampipes/connect/adapters/opcua/OpcUa.java | 69 ++---
.../connect/adapters/opcua/OpcUaTest.java | 341 ---------------------
4 files changed, 41 insertions(+), 402 deletions(-)
diff --git a/streampipes-connect-adapters/pom.xml b/streampipes-connect-adapters/pom.xml
index 5e5a4fe..85ebd23 100644
--- a/streampipes-connect-adapters/pom.xml
+++ b/streampipes-connect-adapters/pom.xml
@@ -38,7 +38,6 @@
<properties>
<animal-sniffer-annotations.version>1.17</animal-sniffer-annotations.version>
<bcprov.version>1.61</bcprov.version>
- <camel.version>3.2.0</camel.version>
<checker-qual.version>2.5.2</checker-qual.version>
<commons-compress.version>1.18</commons-compress.version>
<commons-text.version>1.4</commons-text.version>
@@ -49,6 +48,7 @@
<jsrosbridge.version>0.2.0</jsrosbridge.version>
<influxdb.java.version>2.14</influxdb.java.version>
<hadoop.version>3.2.1</hadoop.version>
+ <milo.version>0.4.0</milo.version>
<mysql-binlog-connector.version>0.18.1</mysql-binlog-connector.version>
<mysql-connector-java.version>8.0.15</mysql-connector-java.version>
<netty.version>4.1.39.Final</netty.version>
@@ -227,6 +227,11 @@
<version>${animal-sniffer-annotations.version}</version>
</dependency>
<dependency>
+ <groupId>org.eclipse.milo</groupId>
+ <artifactId>sdk-client</artifactId>
+ <version>${milo.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-sse</artifactId>
<version>${jersey.version}</version>
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/pom.xml b/streampipes-connect-adapters/streampipes-connect-adapter/pom.xml
index 09c763c..d941c37 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/pom.xml
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/pom.xml
@@ -74,10 +74,6 @@
<version>${influxdb.java.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-milo</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
@@ -103,6 +99,10 @@
<artifactId>pulsar-client</artifactId>
</dependency>
<dependency>
+ <groupId>org.eclipse.milo</groupId>
+ <artifactId>sdk-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
@@ -115,24 +115,6 @@
<artifactId>Java-WebSocket</artifactId>
</dependency>
- <!-- dependency convergence -->
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- </dependency>
-
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
index 9185530..0d98d27 100644
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
+++ b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
@@ -19,11 +19,16 @@
package org.apache.streampipes.connect.adapters.opcua;
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList;
+
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.sdk.utils.Datatypes;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
-import org.eclipse.milo.opcua.stack.client.UaTcpStackClient;
+import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
@@ -32,26 +37,31 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.*;
-import org.eclipse.milo.opcua.stack.core.types.structured.*;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.structured.BrowseDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.BrowseResult;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
+import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReferenceDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.sdk.utils.Datatypes;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
-import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
-import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList;
-
public class OpcUa {
static Logger LOG = LoggerFactory.getLogger(OpcUa.class);
@@ -76,43 +86,26 @@ public class OpcUa {
}
public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId) {
-
-
- this.opcServerURL = "opc.tcp://" + opcServer + ":" + opcServerPort;
-
- if (isInteger(nodeId)) {
- int integerNodeId = Integer.parseInt(nodeId);
- this.node = new NodeId(namespaceIndex, integerNodeId);
- } else {
- this.node = new NodeId(namespaceIndex, nodeId);
- }
- }
-
- public OpcUa(int namespaceIndex, String nodeId) {
- if (isInteger(nodeId)) {
- int integerNodeId = Integer.parseInt(nodeId);
- this.node = new NodeId(namespaceIndex, integerNodeId);
- } else {
- this.node = new NodeId(namespaceIndex, nodeId);
- }
-
+ this("opc.tcp://" + opcServer + ":" + opcServerPort, namespaceIndex, nodeId);
}
public void connect() throws Exception {
- EndpointDescription[] endpoints = UaTcpStackClient.getEndpoints(this.opcServerURL).get();
+ List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(this.opcServerURL).get();
String host = this.opcServerURL.split("://")[1].split(":")[0];
- EndpointDescription tmpEndpoint = Arrays.stream(endpoints).filter(e ->
- e.getSecurityPolicyUri().equals(SecurityPolicy.None.getSecurityPolicyUri())
- ).findFirst().orElseThrow(() -> new Exception("No endpoint with security policy none"));
+ EndpointDescription tmpEndpoint = endpoints
+ .stream()
+ .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
+ .findFirst()
+ .orElseThrow(() -> new Exception("No endpoint with security policy none"));
-// EndpointDescription tmpEndpoint = endpoints[0];
tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
- endpoints = new EndpointDescription[]{tmpEndpoint};
+ endpoints = Collections.singletonList(tmpEndpoint);
- EndpointDescription endpoint = Arrays.stream(endpoints)
- .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getSecurityPolicyUri()))
+ EndpointDescription endpoint = endpoints
+ .stream()
+ .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
.findFirst().orElseThrow(() -> new Exception("no desired endpoints returned"));
OpcUaClientConfig config = OpcUaClientConfig.builder()
@@ -121,7 +114,7 @@ public class OpcUa {
.setEndpoint(endpoint)
.build();
- this.client = new OpcUaClient(config);
+ this.client = OpcUaClient.create(config);
client.connect().get();
}
diff --git a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaTest.java b/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaTest.java
deleted file mode 100644
index 184a933..0000000
--- a/streampipes-connect-adapters/streampipes-connect-adapter/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaTest.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.adapters.opcua;
-
-
-import com.github.jsonldjava.shaded.com.google.common.collect.Lists;
-import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
-import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
-import org.eclipse.milo.opcua.stack.client.UaTcpStackClient;
-import org.eclipse.milo.opcua.stack.core.AttributeId;
-import org.eclipse.milo.opcua.stack.core.Identifiers;
-import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
-import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.*;
-import org.eclipse.milo.opcua.stack.core.types.structured.*;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-
-import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
-import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList;
-
-public class OpcUaTest {
-
- // private OpcUaClient myClient;
- private static String opcServerURL = "opc.tcp://141.21.12.160:4840";
- // private static String opcServerURL = "opc.tcp://192.168.0.144:4840";
- private static final AtomicLong clientHandles = new AtomicLong(1L);
-
-
- public static void main(String... args) throws Exception {
-
- OpcUaClient client = init();
-// client.connect().get();
- client.connect().get();
-
- NodeId node1 = new NodeId(1, "Sensor");
-// NodeId node2 = new NodeId(4, "|var|CODESYS Control for Raspberry Pi SL.Application.PLC_PRG.auto_rot");
-// NodeId node3 = new NodeId(4, "|var|CODESYS Control for Raspberry Pi SL.Application.PLC_PRG.fuss_rot");
-// NodeId node4 = new NodeId(4, "|var|CODESYS Control for Raspberry Pi SL.Application.PLC_PRG");
-
- browseNodeTest("", client, Identifiers.RootFolder);
-
-// CompletableFuture<DataValue> va1 = client.readValue(0, TimestampsToReturn.Both, node1);
-// CompletableFuture<DataValue> va2 = client.readValue(0, TimestampsToReturn.Both, node2);
-// CompletableFuture<DataValue> va3 = client.readValue(0, TimestampsToReturn.Both, node3);
-//
-//
-// System.out.println("Auto grün: " + va1.get().getValue());
-// System.out.println("Auto rot: " + va2.get().getValue());
-// System.out.println("Fußgänger rot: " + va3.get().getValue());
-
- /* JSONParser parser = new JSONParser();
- JSONObject json = (JSONObject) parser.parse(exchange.getIn().getBody().toString());*/
-
-// createListSubscription(client, Arrays.asList(node1, node2));
-// createSubscription(client, node1);
-// createSubscription(client, node2);
-
- // let the example run for 10 seconds then terminate
- Thread.sleep(100000000);
-
-// client.disconnect();
-
- }
-
-
-
- private static OpcUaClient init() throws Exception{
- EndpointDescription[] endpoints = UaTcpStackClient.getEndpoints(opcServerURL).get();
-
- EndpointDescription tmpEndpoint = endpoints[0];
- tmpEndpoint = updateEndpointUrl(tmpEndpoint, "141.21.12.160");
- endpoints = new EndpointDescription[]{tmpEndpoint};
-
- EndpointDescription endpoint = Arrays.stream(endpoints)
- .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getSecurityPolicyUri()))
- .findFirst().orElseThrow(() -> new Exception("no desired endpoints returned"));
-
- OpcUaClientConfig config = OpcUaClientConfig.builder()
- .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
- .setApplicationUri("urn:eclipse:milo:examples:client")
- .setEndpoint(endpoint)
- .build();
-
- return new OpcUaClient(config);
- }
-
- private static EndpointDescription updateEndpointUrl(
- EndpointDescription original, String hostname) throws URISyntaxException {
-
- URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
-
- String endpointUrl = String.format(
- "%s://%s:%s%s",
- uri.getScheme(),
- hostname,
- uri.getPort(),
- uri.getPath()
- );
-
- return new EndpointDescription(
- endpointUrl,
- original.getServer(),
- original.getServerCertificate(),
- original.getSecurityMode(),
- original.getSecurityPolicyUri(),
- original.getUserIdentityTokens(),
- original.getTransportProfileUri(),
- original.getSecurityLevel()
- );
- }
-
-
- private static void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
- System.out.println(
- "subscription value received: " + item.getReadValueId().toString() + " " + value.getValue().toString());
-
- }
-
- private static void createListSubscription(OpcUaClient client, List<NodeId> nodes) throws Exception {
- /*
- * create a subscription @ 1000ms
- */
- UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
-
-
- List<CompletableFuture<DataValue>> values = new ArrayList<>();
-
- for (NodeId node : nodes) {
- values.add(client.readValue(0, TimestampsToReturn.Both, node));
- }
-
- for (CompletableFuture<DataValue> value : values) {
- if (value.get().getValue().toString().contains("null")) {
- System.out.println("Node has no value");
- }
- }
-
-
- List<ReadValueId> readValues = new ArrayList<>();
- // Read a specific value attribute
- for (NodeId node : nodes) {
- readValues.add(new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE));
- }
-
- List<MonitoredItemCreateRequest> requests = new ArrayList<>();
-
- for (ReadValueId readValue : readValues) {
- // important: client handle must be unique per item
- UInteger clientHandle = uint(clientHandles.getAndIncrement());
-
- MonitoringParameters parameters = new MonitoringParameters(
- clientHandle,
- 1000.0, // sampling interval
- null, // filter, null means use default
- uint(10), // queue size
- true // discard oldest
- );
-
- requests.add(new MonitoredItemCreateRequest(readValue, MonitoringMode.Reporting, parameters));
- }
-
- BiConsumer<UaMonitoredItem, Integer> onItemCreated =
- (item, id) -> {
- item.setValueConsumer(OpcUaTest::onSubscriptionValue);
- };
-
- List<UaMonitoredItem> items = subscription.createMonitoredItems(
- TimestampsToReturn.Both,
- requests,
- onItemCreated
- ).get();
-
- for (UaMonitoredItem item : items) {
- NodeId tagId = item.getReadValueId().getNodeId();
- if (item.getStatusCode().isGood()) {
- System.out.println("item created for nodeId="+ tagId);
- } else {
- System.out.println("failed to create item for " + item.getReadValueId().getNodeId() + item.getStatusCode());
- }
- }
-
- }
-
-
- /**
- * creates a subcription for the given node
- *
- * @param client
- * @param node
- * @throws Exception
- */
- private static void createSubscription(OpcUaClient client, NodeId node) throws Exception {
- /*
- * create a subscription @ 1000ms
- */
- UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
-
- CompletableFuture<DataValue> value = client.readValue(0, TimestampsToReturn.Both, node);
-
- if (value.get().getValue().toString().contains("null")) {
- System.out.println("Node has no value");
- } else {
- // Read a specific value attribute
- ReadValueId readValue = new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
-
- // important: client handle must be unique per item
- UInteger clientHandle = uint(clientHandles.getAndIncrement());
-
- MonitoringParameters parameters = new MonitoringParameters(
- clientHandle,
- 1000.0, // sampling interval
- null, // filter, null means use default
- uint(10), // queue size
- true // discard oldest
- );
-
- MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValue, MonitoringMode.Reporting, parameters);
-
-
- BiConsumer<UaMonitoredItem, Integer> onItemCreated =
- (item, id) -> {
- System.out.println(id);
- item.setValueConsumer(OpcUaTest::onSubscriptionValue);
- };
-
-
- List<UaMonitoredItem> items = subscription.createMonitoredItems(
- TimestampsToReturn.Both,
- Lists.newArrayList(request),
- onItemCreated
- ).get();
-
- for (UaMonitoredItem item : items) {
- NodeId tagId = item.getReadValueId().getNodeId();
- if (item.getStatusCode().isGood()) {
- System.out.println("item created for nodeId="+ tagId);
- } else {
- System.out.println("failed to create item for " + item.getReadValueId().getNodeId() + item.getStatusCode());
- }
- }
-
- }
- }
-
- private static void browseNodeTest(String indent, OpcUaClient client, NodeId browseRoot) {
- BrowseDescription browse = new BrowseDescription(
- browseRoot,
- BrowseDirection.Forward,
- Identifiers.References,
- true,
- uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue()),
- uint(BrowseResultMask.All.getValue())
- );
-
- try {
- BrowseResult browseResult = client.browse(browse).get();
-
- List<ReferenceDescription> references = toList(browseResult.getReferences());
-
- for (ReferenceDescription rd : references) {
- System.out.println("=====================================================================");
- System.out.println(rd.toString());
- System.out.println(rd.getNodeClass());
- System.out.println("Node={} " + indent + " " + rd.getBrowseName().getName());
- System.out.println("=====================================================================");
- // recursively browse to children
- rd.getNodeId().local().ifPresent(nodeId -> {
- System.out.println("NodeId: " + nodeId.getNamespaceIndex());
- System.out.println("NodeId: " + nodeId.getIdentifier());
- System.out.println("NodeId: " + nodeId.getType());
- browseNodeTest(indent + " ", client, nodeId);
-
- });
- }
- } catch (InterruptedException | ExecutionException e) {
- System.out.println("Browsing nodeId=" + browseRoot + " failed: " + e.getMessage());
- }
- }
-
-
- private List<ReferenceDescription> browseNode(String indent, OpcUaClient client, NodeId browseRoot) {
- List<ReferenceDescription> result = new ArrayList<>();
-
- BrowseDescription browse = new BrowseDescription(
- browseRoot,
- BrowseDirection.Forward,
- Identifiers.References,
- true,
- uint(NodeClass.Object.getValue() | NodeClass.Variable.getValue()),
- uint(BrowseResultMask.All.getValue())
- );
-
- try {
- BrowseResult browseResult = client.browse(browse).get();
-
- List<ReferenceDescription> references = toList(browseResult.getReferences());
-
- for (ReferenceDescription rd : references) {
- result.add(rd);
- rd.getNodeId().local().ifPresent(nodeId -> browseNode(indent + " ", client, nodeId));
- }
- } catch (InterruptedException | ExecutionException e) {
- System.out.println("Browsing nodeId=" + browseRoot + " failed: " + e.getMessage());
- }
-
- return result;
-
- }
-
-}