You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/02 14:25:02 UTC
[incubator-streampipes-extensions] 01/06: [STREAMPIPES-301] move
code to OpcUaUtil
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 2e465a3bb87d1a1b1386338a568b3c65b73382e4
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Mar 2 14:54:05 2021 +0100
[STREAMPIPES-301] move code to OpcUaUtil
---
.../streampipes/connect/adapters/opcua/OpcUa.java | 12 +-
.../connect/adapters/opcua/OpcUaAdapter.java | 85 +-------------
.../connect/adapters/opcua/utils/OpcUaConnect.java | 30 -----
.../connect/adapters/opcua/utils/OpcUaUtil.java | 130 +++++++++++++++++++++
4 files changed, 141 insertions(+), 116 deletions(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
index 0e5c6a3..45c16b4 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
@@ -23,8 +23,8 @@ import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.
import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList;
import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaConnect;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaConnect.OpcUaLabels;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaNodeVariants;
import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaTypes;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -129,13 +129,13 @@ public class OpcUa {
if (useURL && unauthenticated){
String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
- serverAddress = OpcUaConnect.formatServerAddress(serverAddress);
+ serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
return new OpcUa(serverAddress, namespaceIndex, nodeId, selectedNodeNames);
} else if(!useURL && unauthenticated){
String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
- serverAddress = OpcUaConnect.formatServerAddress(serverAddress);
+ serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
return new OpcUa(serverAddress, port, namespaceIndex, nodeId, selectedNodeNames);
@@ -146,12 +146,12 @@ public class OpcUa {
if (useURL) {
String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
- serverAddress = OpcUaConnect.formatServerAddress(serverAddress);
+ serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, selectedNodeNames);
} else {
String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
- serverAddress = OpcUaConnect.formatServerAddress(serverAddress);
+ serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, selectedNodeNames);
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
index c79ddf7..d52c7d7 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
@@ -22,16 +22,14 @@ import org.apache.streampipes.connect.adapter.Adapter;
import org.apache.streampipes.connect.adapter.exception.AdapterException;
import org.apache.streampipes.connect.adapter.exception.ParseException;
import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaConnect.OpcUaLabels;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.sdk.StaticProperties;
-import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.*;
@@ -40,7 +38,6 @@ import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-import java.net.URI;
import java.util.*;
public class OpcUaAdapter extends SpecificDataStreamAdapter implements ResolvesContainerProvidedOptions {
@@ -103,7 +100,7 @@ public class OpcUaAdapter extends SpecificDataStreamAdapter implements ResolvesC
public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
- String key = getRuntimeNameOfNode(item.getReadValueId().getNodeId());
+ String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
OpcNode currNode = this.allNodes.stream()
.filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
@@ -161,44 +158,7 @@ public class OpcUaAdapter extends SpecificDataStreamAdapter implements ResolvesC
@Override
public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
- GuessSchema guessSchema = new GuessSchema();
- EventSchema eventSchema = new EventSchema();
- List<EventProperty> allProperties = new ArrayList<>();
-
- OpcUa opc = OpcUa.from(this.adapterDescription);
-
- try {
- opc.connect();
- List<OpcNode> res = opc.browseNode(true);
-
- if (res.size() > 0) {
- for (OpcNode opcNode : res) {
- if (opcNode.opcUnitId == 0) {
- allProperties.add(PrimitivePropertyBuilder
- .create(opcNode.getType(), opcNode.getLabel())
- .label(opcNode.getLabel())
- .build());
- } else {
- allProperties.add(PrimitivePropertyBuilder
- .create(opcNode.getType(), opcNode.getLabel())
- .label(opcNode.getLabel())
- .measurementUnit(new URI(OpcUa.mapUnitIdToQudt(opcNode.opcUnitId)))
- .build());
- }
- }
- }
-
- opc.disconnect();
- } catch (Exception e) {
-
- throw new AdapterException("Could not guess schema for opc node! " + e.getMessage());
-
- }
-
- eventSchema.setEventProperties(allProperties);
- guessSchema.setEventSchema(eventSchema);
-
- return guessSchema;
+ return OpcUaUtil.getSchema(adapterDescription);
}
@Override
@@ -209,41 +169,6 @@ public class OpcUaAdapter extends SpecificDataStreamAdapter implements ResolvesC
@Override
public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
- try {
- parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
- parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
- } catch (NullPointerException npe){
- return new ArrayList<>();
- }
-
- OpcUa opc = OpcUa.from(parameterExtractor);
-
- List<Option> nodes = new ArrayList<>();
- try {
- opc.connect();
- this.allNodes = opc.browseNode(false);
-
- for (OpcNode opcNode: this.allNodes) {
- nodes.add(new Option(opcNode.getLabel(), opcNode.nodeId.getIdentifier().toString()));
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- return nodes;
- }
-
- private String getRuntimeNameOfNode(NodeId nodeId) {
- String[] keys = nodeId.getIdentifier().toString().split("\\.");
- String key;
-
- if (keys.length > 0) {
- key = keys[keys.length - 1];
- } else {
- key = nodeId.getIdentifier().toString();
- }
-
- return key;
+ return OpcUaUtil.resolveOptions(requestId, parameterExtractor);
}
}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaConnect.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaConnect.java
deleted file mode 100644
index d56bcbf..0000000
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaConnect.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.streampipes.connect.adapters.opcua.utils;
-
-public class OpcUaConnect {
-
- public static String formatServerAddress(String serverAddress) {
-
- if (!serverAddress.startsWith("opc.tcp://")) {
- serverAddress = "opc.tcp://" + serverAddress;
- }
-
- return serverAddress;
- }
-
- public enum OpcUaLabels {
- OPC_HOST_OR_URL,
- OPC_URL,
- OPC_HOST,
- OPC_SERVER_URL,
- OPC_SERVER_HOST,
- OPC_SERVER_PORT,
- NAMESPACE_INDEX,
- NODE_ID,
- ACCESS_MODE,
- USERNAME_GROUP,
- USERNAME,
- PASSWORD,
- UNAUTHENTICATED,
- AVAILABLE_NODES;
- }
-}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
new file mode 100644
index 0000000..4de5b79
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
@@ -0,0 +1,130 @@
+package org.apache.streampipes.connect.adapters.opcua.utils;
+
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.adapters.opcua.OpcNode;
+import org.apache.streampipes.connect.adapters.opcua.OpcUa;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class OpcUaUtil {
+
+ public static String formatServerAddress(String serverAddress) {
+
+ if (!serverAddress.startsWith("opc.tcp://")) {
+ serverAddress = "opc.tcp://" + serverAddress;
+ }
+
+ return serverAddress;
+ }
+
+ public static GuessSchema getSchema(SpecificAdapterStreamDescription adapterStreamDescription) throws AdapterException, ParseException {
+ GuessSchema guessSchema = new GuessSchema();
+ EventSchema eventSchema = new EventSchema();
+ List<EventProperty> allProperties = new ArrayList<>();
+
+ OpcUa opcUa = OpcUa.from(adapterStreamDescription);
+
+ try {
+ opcUa.connect();
+ List<OpcNode> selectedNodes = opcUa.browseNode(true);
+
+ if (!selectedNodes.isEmpty()) {
+ for (OpcNode opcNode : selectedNodes) {
+ if (opcNode.hasUnitId()) {
+ allProperties.add(PrimitivePropertyBuilder
+ .create(opcNode.getType(), opcNode.getLabel())
+ .label(opcNode.getLabel())
+ .measurementUnit(new URI(OpcUa.mapUnitIdToQudt(opcNode.getOpcUnitId())))
+ .build());
+ } else {
+ allProperties.add(PrimitivePropertyBuilder
+ .create(opcNode.getType(), opcNode.getLabel())
+ .label(opcNode.getLabel())
+ .build());
+ }
+
+ }
+ }
+
+ opcUa.disconnect();
+
+ } catch (Exception e) {
+ throw new AdapterException("Could not guess schema for opc node! " + e.getMessage());
+ }
+
+ eventSchema.setEventProperties(allProperties);
+ guessSchema.setEventSchema(eventSchema);
+
+ return guessSchema;
+
+ }
+
+
+ public static List<Option> resolveOptions (String requestId, StaticPropertyExtractor parameterExtractor) {
+
+ try {
+ parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
+ parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
+ } catch (NullPointerException nullPointerException) {
+ return new ArrayList<>();
+ }
+
+ OpcUa opcUa = OpcUa.from(parameterExtractor);
+
+ List<Option> nodeOptions = new ArrayList<>();
+ try{
+ opcUa.connect();
+
+ for(OpcNode opcNode: opcUa.browseNode(false)) {
+ nodeOptions.add(new Option(opcNode.getLabel(), opcNode.getNodeId().getIdentifier().toString()));
+ }
+
+ opcUa.disconnect();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return nodeOptions;
+ }
+
+ public static String getRuntimeNameOfNode(NodeId nodeId) {
+ String[] keys = nodeId.getIdentifier().toString().split("\\.");
+ String key;
+
+ if (keys.length > 0) {
+ key = keys[keys.length - 1];
+ } else {
+ key = nodeId.getIdentifier().toString();
+ }
+
+ return key;
+ }
+
+ public enum OpcUaLabels {
+ OPC_HOST_OR_URL,
+ OPC_URL,
+ OPC_HOST,
+ OPC_SERVER_URL,
+ OPC_SERVER_HOST,
+ OPC_SERVER_PORT,
+ NAMESPACE_INDEX,
+ NODE_ID,
+ ACCESS_MODE,
+ USERNAME_GROUP,
+ USERNAME,
+ PASSWORD,
+ UNAUTHENTICATED,
+ AVAILABLE_NODES;
+ }
+}