You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/02 14:25:02 UTC

[incubator-streampipes-extensions] 01/06: [STREAMPIPES-301] move code to OpcUaUtil

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 2e465a3bb87d1a1b1386338a568b3c65b73382e4
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Mar 2 14:54:05 2021 +0100

    [STREAMPIPES-301] move code to OpcUaUtil
---
 .../streampipes/connect/adapters/opcua/OpcUa.java  |  12 +-
 .../connect/adapters/opcua/OpcUaAdapter.java       |  85 +-------------
 .../connect/adapters/opcua/utils/OpcUaConnect.java |  30 -----
 .../connect/adapters/opcua/utils/OpcUaUtil.java    | 130 +++++++++++++++++++++
 4 files changed, 141 insertions(+), 116 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
index 0e5c6a3..45c16b4 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
@@ -23,8 +23,8 @@ import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.
 import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList;
 
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaConnect;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaConnect.OpcUaLabels;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
 import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaNodeVariants;
 import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaTypes;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
@@ -129,13 +129,13 @@ public class OpcUa {
         if (useURL && unauthenticated){
 
             String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
-            serverAddress = OpcUaConnect.formatServerAddress(serverAddress);
+            serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
 
             return new OpcUa(serverAddress, namespaceIndex, nodeId, selectedNodeNames);
 
         } else if(!useURL && unauthenticated){
             String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
-            serverAddress = OpcUaConnect.formatServerAddress(serverAddress);
+            serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
             int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
 
             return new OpcUa(serverAddress, port, namespaceIndex, nodeId, selectedNodeNames);
@@ -146,12 +146,12 @@ public class OpcUa {
 
             if (useURL) {
                 String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
-                serverAddress = OpcUaConnect.formatServerAddress(serverAddress);
+                serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
 
                 return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, selectedNodeNames);
             } else {
                 String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
-                serverAddress = OpcUaConnect.formatServerAddress(serverAddress);
+                serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
                 int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
 
                 return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, selectedNodeNames);
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
index c79ddf7..d52c7d7 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
@@ -22,16 +22,14 @@ import org.apache.streampipes.connect.adapter.Adapter;
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.exception.ParseException;
 import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaConnect.OpcUaLabels;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
 import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.sdk.StaticProperties;
-import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.*;
@@ -40,7 +38,6 @@ import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 
-import java.net.URI;
 import java.util.*;
 
 public class OpcUaAdapter extends SpecificDataStreamAdapter implements ResolvesContainerProvidedOptions {
@@ -103,7 +100,7 @@ public class OpcUaAdapter extends SpecificDataStreamAdapter implements ResolvesC
 
     public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
 
-        String key = getRuntimeNameOfNode(item.getReadValueId().getNodeId());
+        String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
 
         OpcNode currNode = this.allNodes.stream()
                 .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
@@ -161,44 +158,7 @@ public class OpcUaAdapter extends SpecificDataStreamAdapter implements ResolvesC
     @Override
     public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
 
-        GuessSchema guessSchema = new GuessSchema();
-        EventSchema eventSchema = new EventSchema();
-        List<EventProperty> allProperties = new ArrayList<>();
-
-        OpcUa opc = OpcUa.from(this.adapterDescription);
-
-        try {
-            opc.connect();
-            List<OpcNode> res =  opc.browseNode(true);
-
-            if (res.size() > 0) {
-                for (OpcNode opcNode : res) {
-                    if (opcNode.opcUnitId == 0) {
-                        allProperties.add(PrimitivePropertyBuilder
-                                .create(opcNode.getType(), opcNode.getLabel())
-                                .label(opcNode.getLabel())
-                                .build());
-                    } else {
-                        allProperties.add(PrimitivePropertyBuilder
-                                .create(opcNode.getType(), opcNode.getLabel())
-                                .label(opcNode.getLabel())
-                                .measurementUnit(new URI(OpcUa.mapUnitIdToQudt(opcNode.opcUnitId)))
-                                .build());
-                    }
-                }
-            }
-
-            opc.disconnect();
-        } catch (Exception e) {
-
-            throw new AdapterException("Could not guess schema for opc node! " + e.getMessage());
-
-        }
-
-        eventSchema.setEventProperties(allProperties);
-        guessSchema.setEventSchema(eventSchema);
-
-        return guessSchema;
+        return OpcUaUtil.getSchema(adapterDescription);
     }
 
     @Override
@@ -209,41 +169,6 @@ public class OpcUaAdapter extends SpecificDataStreamAdapter implements ResolvesC
     @Override
     public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
 
-        try {
-            parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
-            parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
-        } catch (NullPointerException npe){
-            return new ArrayList<>();
-        }
-
-        OpcUa opc = OpcUa.from(parameterExtractor);
-
-        List<Option> nodes = new ArrayList<>();
-        try {
-            opc.connect();
-            this.allNodes =  opc.browseNode(false);
-
-            for (OpcNode opcNode: this.allNodes) {
-                nodes.add(new Option(opcNode.getLabel(), opcNode.nodeId.getIdentifier().toString()));
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        return nodes;
-    }
-
-    private String getRuntimeNameOfNode(NodeId nodeId) {
-        String[] keys = nodeId.getIdentifier().toString().split("\\.");
-        String key;
-
-        if (keys.length > 0) {
-            key = keys[keys.length - 1];
-        } else {
-            key = nodeId.getIdentifier().toString();
-        }
-
-        return key;
+        return OpcUaUtil.resolveOptions(requestId, parameterExtractor);
     }
 }
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaConnect.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaConnect.java
deleted file mode 100644
index d56bcbf..0000000
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaConnect.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.streampipes.connect.adapters.opcua.utils;
-
-public class OpcUaConnect {
-
-    public static String formatServerAddress(String serverAddress) {
-
-        if (!serverAddress.startsWith("opc.tcp://")) {
-            serverAddress = "opc.tcp://" + serverAddress;
-        }
-
-        return serverAddress;
-    }
-
-    public enum OpcUaLabels {
-        OPC_HOST_OR_URL,
-        OPC_URL,
-        OPC_HOST,
-        OPC_SERVER_URL,
-        OPC_SERVER_HOST,
-        OPC_SERVER_PORT,
-        NAMESPACE_INDEX,
-        NODE_ID,
-        ACCESS_MODE,
-        USERNAME_GROUP,
-        USERNAME,
-        PASSWORD,
-        UNAUTHENTICATED,
-        AVAILABLE_NODES;
-    }
-}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
new file mode 100644
index 0000000..4de5b79
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
@@ -0,0 +1,130 @@
+package org.apache.streampipes.connect.adapters.opcua.utils;
+
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.adapters.opcua.OpcNode;
+import org.apache.streampipes.connect.adapters.opcua.OpcUa;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class OpcUaUtil {
+
+    public static String formatServerAddress(String serverAddress) {
+
+        if (!serverAddress.startsWith("opc.tcp://")) {
+            serverAddress = "opc.tcp://" + serverAddress;
+        }
+
+        return serverAddress;
+    }
+
+    public static GuessSchema getSchema(SpecificAdapterStreamDescription adapterStreamDescription) throws AdapterException, ParseException {
+        GuessSchema guessSchema = new GuessSchema();
+        EventSchema eventSchema = new EventSchema();
+        List<EventProperty> allProperties = new ArrayList<>();
+
+        OpcUa opcUa = OpcUa.from(adapterStreamDescription);
+
+        try {
+            opcUa.connect();
+            List<OpcNode> selectedNodes = opcUa.browseNode(true);
+
+            if (!selectedNodes.isEmpty()) {
+                for (OpcNode opcNode : selectedNodes) {
+                    if (opcNode.hasUnitId()) {
+                        allProperties.add(PrimitivePropertyBuilder
+                            .create(opcNode.getType(), opcNode.getLabel())
+                            .label(opcNode.getLabel())
+                            .measurementUnit(new URI(OpcUa.mapUnitIdToQudt(opcNode.getOpcUnitId())))
+                            .build());
+                    } else {
+                        allProperties.add(PrimitivePropertyBuilder
+                            .create(opcNode.getType(), opcNode.getLabel())
+                            .label(opcNode.getLabel())
+                            .build());
+                    }
+
+                }
+            }
+
+            opcUa.disconnect();
+
+        } catch (Exception e) {
+            throw new AdapterException("Could not guess schema for opc node! " + e.getMessage());
+        }
+
+        eventSchema.setEventProperties(allProperties);
+        guessSchema.setEventSchema(eventSchema);
+
+        return guessSchema;
+
+    }
+
+
+    public static List<Option> resolveOptions (String requestId, StaticPropertyExtractor parameterExtractor) {
+
+        try {
+            parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
+            parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
+        } catch (NullPointerException nullPointerException) {
+            return new ArrayList<>();
+        }
+
+        OpcUa opcUa = OpcUa.from(parameterExtractor);
+
+        List<Option> nodeOptions = new ArrayList<>();
+        try{
+            opcUa.connect();
+
+            for(OpcNode opcNode: opcUa.browseNode(false)) {
+                nodeOptions.add(new Option(opcNode.getLabel(), opcNode.getNodeId().getIdentifier().toString()));
+            }
+
+            opcUa.disconnect();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        return nodeOptions;
+    }
+
+    public static String getRuntimeNameOfNode(NodeId nodeId) {
+        String[] keys = nodeId.getIdentifier().toString().split("\\.");
+        String key;
+
+        if (keys.length > 0) {
+            key = keys[keys.length - 1];
+        } else {
+            key = nodeId.getIdentifier().toString();
+        }
+
+        return key;
+    }
+
+    public enum OpcUaLabels {
+        OPC_HOST_OR_URL,
+        OPC_URL,
+        OPC_HOST,
+        OPC_SERVER_URL,
+        OPC_SERVER_HOST,
+        OPC_SERVER_PORT,
+        NAMESPACE_INDEX,
+        NODE_ID,
+        ACCESS_MODE,
+        USERNAME_GROUP,
+        USERNAME,
+        PASSWORD,
+        UNAUTHENTICATED,
+        AVAILABLE_NODES;
+    }
+}