You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/09 19:19:32 UTC

[incubator-streampipes-extensions] branch dev updated (64a331b -> 77f338f)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git.


    from 64a331b  Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
     new 0bdbf9e  [STREAMPIPES-301] adapt strings.en to OPC UA wording
     new d42c96a  [STREAMPIPES-301] add javadoc comments
     new 77f338f  [STREAMPIPES-306] integrate pull an subscription mode into one adapter

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streampipes/connect/ConnectAdapterInit.java    |   6 +-
 .../connect/adapters/opcua/OpcNode.java            |  46 +++++-
 .../streampipes/connect/adapters/opcua/OpcUa.java  | 138 ++++++++++++----
 .../{OpcUaPullAdapter.java => OpcUaAdapter.java}   | 102 +++++++++---
 .../adapters/opcua/OpcUaSubscriptionAdapter.java   | 174 ---------------------
 .../adapters/opcua/utils/OpcUaNodeVariants.java    |   5 +
 .../connect/adapters/opcua/utils/OpcUaTypes.java   |   5 +
 .../connect/adapters/opcua/utils/OpcUaUtil.java    |  32 +++-
 .../icon.png                                       | Bin 5087 -> 0 bytes
 .../documentation.md                               |  66 --------
 .../strings.en                                     |  46 ------
 .../documentation.md                               |   0
 .../icon.png                                       | Bin
 .../strings.en                                     |  25 ++-
 .../extensions/all/jvm/AllExtensionsInit.java      |   6 +-
 15 files changed, 297 insertions(+), 354 deletions(-)
 rename streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/{OpcUaPullAdapter.java => OpcUaAdapter.java} (60%)
 delete mode 100644 streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java
 delete mode 100644 streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png
 delete mode 100644 streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md
 delete mode 100644 streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
 rename streampipes-connect-adapters/src/main/resources/{org.apache.streampipes.connect.adapters.opcua.pull => org.apache.streampipes.connect.adapters.opcua}/documentation.md (100%)
 rename streampipes-connect-adapters/src/main/resources/{org.apache.streampipes.connect.adapters.opcua.subscription => org.apache.streampipes.connect.adapters.opcua}/icon.png (100%)
 rename streampipes-connect-adapters/src/main/resources/{org.apache.streampipes.connect.adapters.opcua.pull => org.apache.streampipes.connect.adapters.opcua}/strings.en (57%)


[incubator-streampipes-extensions] 02/03: [STREAMPIPES-301] add javadoc comments

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit d42c96a1dc60d881fc93ad3c43dbff1416d5be5b
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Mar 9 13:27:17 2021 +0100

    [STREAMPIPES-301] add javadoc comments
---
 .../connect/adapters/opcua/OpcNode.java            | 46 ++++++++++-
 .../streampipes/connect/adapters/opcua/OpcUa.java  | 93 ++++++++++++++++++----
 .../adapters/opcua/utils/OpcUaNodeVariants.java    |  5 ++
 .../connect/adapters/opcua/utils/OpcUaTypes.java   |  5 ++
 .../connect/adapters/opcua/utils/OpcUaUtil.java    | 27 ++++++-
 5 files changed, 160 insertions(+), 16 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcNode.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcNode.java
index 6903a33..7a9fb82 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcNode.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcNode.java
@@ -21,12 +21,25 @@ package org.apache.streampipes.connect.adapters.opcua;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.apache.streampipes.sdk.utils.Datatypes;
 
+/**
+ * OpcNode is a StreamPipes internal model of an OPC UA node.
+ * It's main purpose is to ease the handling of nodes within StreamPipes.
+ */
+
 public class OpcNode {
     String label;
     Datatypes type;
     NodeId nodeId;
     int opcUnitId;
 
+    /**
+     * Constructor for class OpcNode without an OPC UA unit identifier. <br>
+     * Unit identifier is set to zero as default.
+     *
+     * @param label name of the OPC UA node
+     * @param type datatype of the OPC UA node
+     * @param nodeId identifier of the OPC UA node
+     */
     public OpcNode(String label, Datatypes type, NodeId nodeId) {
         this.label = label;
         this.type = type;
@@ -34,6 +47,16 @@ public class OpcNode {
         this.opcUnitId = 0;
     }
 
+    /**
+     * Constructor for class OpcNode with an OPC UA unit identifier. <br>
+     * This identifier references to an OPC UA measurement unit, e.g. degree celsius. <br>
+     * With {@link OpcNode#getQudtURI()} the OPC UA internal ID is mapped to the QUDT unit ontology <br>
+     *
+     * @param label name of the OPC UA node
+     * @param type datatype of the OPC UA node
+     * @param nodeId identifier of the OPC UA node
+     * @param opcUnitId OPC UA internal unit identifier
+     */
     public OpcNode(String label, Datatypes type, NodeId nodeId, Integer opcUnitId){
         this.label = label;
         this.type = type;
@@ -67,7 +90,28 @@ public class OpcNode {
 
     public int getOpcUnitId() {return this.opcUnitId;}
 
-    public boolean hasUnitId() {return this.opcUnitId !=0; }
+    public boolean hasUnitId() {
+        // zero is the default case when no unit id is present
+        return this.opcUnitId !=0;
+    }
 
+    /**
+     * Returns the corresponding QUDT URI if the {@code opcUnitId} is given,
+     * otherwise it returns an empty string. <br>
+     * Currently, there are only two examples added. <br>
+     * Other units have to be added manually, please have a look at the <a href="http://opcfoundation.org/UA/EngineeringUnits/UNECE/UNECE_to_OPCUA.csv"> OPC UA unitID mapping table</a>. <br>
+     *
+     * @return QUDT URI as string of the given unit
+     */
 
+    public String getQudtURI(){
+        switch (this.opcUnitId){
+            case 17476:
+                return "http://qudt.org/vocab/unit#DEG";
+            case 4408652:
+                return "http://qudt.org/vocab/unit#DegreeCelsius";
+            default:
+                return "";
+        }
+    }
 }
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
index 48bed89..5c723f9 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
@@ -61,6 +61,10 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+/***
+ * Wrapper class for all OPC UA specific stuff. <br>
+ * Is used both in the {@link OpcUaPullAdapter} and {@link OpcUaSubscriptionAdapter}.
+ */
 public class OpcUa {
 
     static Logger LOG = LoggerFactory.getLogger(OpcUa.class);
@@ -76,11 +80,23 @@ public class OpcUa {
 
     private static final AtomicLong clientHandles = new AtomicLong(1L);
 
+    /***
+     *
+     * @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
+     */
     public OpcUaClient getClient() {
         return this.client;
     }
 
 
+    /**
+     * Constructor for security level {@code None} and OPC server given by url
+     *
+     * @param opcServerURL complete OPC UA server url
+     * @param namespaceIndex namespace index of the given node
+     * @param nodeId node identifier
+     * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
+     */
     public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, List<String> selectedNodeNames) {
 
         this.opcServerURL = opcServerURL;
@@ -95,10 +111,29 @@ public class OpcUa {
         }
     }
 
+    /**
+     * Constructor for security level {@code None} and OPC server given by hostname and port number
+     *
+     * @param opcServer OPC UA hostname
+     * @param opcServerPort OPC UA port number
+     * @param namespaceIndex namespace index of the given node
+     * @param nodeId node identifier
+     * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
+     */
     public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, List<String> selectedNodeNames) {
         this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, selectedNodeNames);
     }
 
+    /**
+     * Constructor for security level {@code Sign} and OPC server given by url
+     *
+     * @param opcServerURL complete OPC UA server url
+     * @param namespaceIndex namespace index of the given node
+     * @param nodeId node identifier
+     * @param username username to authenticate at the OPC UA server
+     * @param password corresponding password to given user name
+     * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
+     */
     public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, String username, String password, List<String> selectedNodeNames) {
         this(opcServerURL, namespaceIndex, nodeId, selectedNodeNames);
         this.unauthenticated = false;
@@ -106,6 +141,17 @@ public class OpcUa {
         this.password = password;
     }
 
+    /**
+     * Constructor for OPC UA security level {@code Sign} and OPC server given by hostname and port number
+     *
+     * @param opcServer OPC UA hostname
+     * @param opcServerPort OPC UA port number
+     * @param namespaceIndex namespace index of the given node
+     * @param nodeId node identifier
+     * @param username username to authenticate at the OPC UA server
+     * @param password corresponding password to given user name
+     * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
+     */
     public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, String username, String password, List<String> selectedNodeNames) {
         this (opcServer, opcServerPort, namespaceIndex, nodeId, selectedNodeNames);
         this.unauthenticated = false;
@@ -113,6 +159,11 @@ public class OpcUa {
         this.password = password;
     }
 
+    /**
+     * Creates {@link OpcUa}  instance in accordance with the given {@link org.apache.streampipes.sdk.extractor.StaticPropertyExtractor}.
+     * @param extractor extractor for user inputs
+     * @return {@link OpcUa}  instance based on information from {@code extractor}
+     */
     public static OpcUa from(StaticPropertyExtractor extractor) {
 
         String selectedAlternativeConnection = extractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
@@ -160,6 +211,11 @@ public class OpcUa {
 
     }
 
+    /***
+     * Creates {@link OpcUa}  instance in accordance with the given {@link org.apache.streampipes.model.connect.adapter.AdapterDescription}
+     * @param adapterDescription description of current adapter
+     * @return {@link OpcUa}  instance based on information from {@code adapterDescription}
+     */
     public static OpcUa from(AdapterDescription adapterDescription){
 
         StaticPropertyExtractor extractor =
@@ -168,6 +224,11 @@ public class OpcUa {
         return from(extractor);
     }
 
+    /***
+     * Establishes appropriate connection to OPC UA endpoint depending on the {@link OpcUa} instance
+     *
+     * @throws Exception
+     */
     public void connect() throws Exception {
 
         List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(this.opcServerURL).get();
@@ -235,6 +296,12 @@ public class OpcUa {
         );
     }
 
+    /***
+     * Search for related nodes relative to {@link OpcUa#node}
+     * @param selectNodes indicates whether only nodes of {@link OpcUa#selectedNodeNames} should be returned
+     * @return List of {@link OpcNode}s related to {@link OpcUa#node}
+     * @throws AdapterException
+     */
     public List<OpcNode> browseNode(boolean selectNodes) throws AdapterException {
         List<OpcNode> referenceDescriptions = browseNode(node, selectNodes);
 
@@ -282,6 +349,12 @@ public class OpcUa {
         return result;
     }
 
+    /***
+     * Search for related nodes relative to {@link OpcUa#node}
+     * @param selectNodes indicates whether only nodes of {@link OpcUa#selectedNodeNames} should be returned
+     * @return List of {@link OpcNode}s related to {@link OpcUa#node}
+     * @throws AdapterException
+     */
     private List<OpcNode> browseNode(NodeId browseRoot, boolean selectNodes) throws AdapterException {
         List<OpcNode> result = new ArrayList<>();
 
@@ -389,6 +462,12 @@ public class OpcUa {
     }
 
 
+    /***
+     * Register subscriptions for given OPC UA nodes
+     * @param nodes List of {@link org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
+     * @param opcUaSubscriptionAdapter current instance of {@link OpcUaSubscriptionAdapter}
+     * @throws Exception
+     */
     public void createListSubscription(List<NodeId> nodes, OpcUaSubscriptionAdapter opcUaSubscriptionAdapter) throws Exception {
         /*
          * create a subscription @ 1000ms
@@ -464,20 +543,6 @@ public class OpcUa {
         return true;
     }
 
-    // utility function for mapping UPC Unit Ids to QUDT entities
-    // has to be maintained manually
-    // information about OPC Unit IDs can be found under: opcfoundation.org/UA/EngineeringUnits/UNECE/UNECE_to_OPCUA.csv
-    public static String mapUnitIdToQudt(int unitId){
-        switch (unitId){
-            case 17476:
-                return "http://qudt.org/vocab/unit#DEG";
-            case 4408652:
-                return "http://qudt.org/vocab/unit#DegreeCelsius";
-            default:
-                return "";
-        }
-    }
-
 
     public List<String> getSelectedNodeNames() {
         return selectedNodeNames;
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaNodeVariants.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaNodeVariants.java
index 0361556..265d756 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaNodeVariants.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaNodeVariants.java
@@ -20,6 +20,11 @@ package org.apache.streampipes.connect.adapters.opcua.utils;
 
 import javax.annotation.Nullable;
 
+/**
+ * Enum that maintains different variants of OPC UA nodes. <br>
+ * Not yet completed. <br>
+ *
+ */
 public enum OpcUaNodeVariants {
     Property(68),
     EUInformation(887);
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaTypes.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaTypes.java
index 4ddab4d..aae7ae1 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaTypes.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaTypes.java
@@ -24,6 +24,11 @@ import org.apache.streampipes.sdk.utils.Datatypes;
 
 public class OpcUaTypes {
 
+    /**
+     * Maps OPC UA data types to internal StreamPipes data types
+     * @param o data type id as UInteger
+     * @return StreamPipes internal data type
+     */
     public static Datatypes getType(UInteger o) {
         if (UInteger.valueOf(4).equals(o) | UInteger.valueOf(6).equals(o) | UInteger.valueOf(8).equals(o) | UInteger.valueOf(27).equals(o)) {
             return Datatypes.Integer;
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
index 1fadfb6..77b8a82 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
@@ -17,8 +17,16 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
+/***
+ * Collection of several utility functions in context of OPC UA
+ */
 public class OpcUaUtil {
 
+    /***
+     * Ensures server address starts with {@code opc.tcp://}
+     * @param serverAddress server address as given by user
+     * @return correctly formated server address
+     */
     public static String formatServerAddress(String serverAddress) {
 
         if (!serverAddress.startsWith("opc.tcp://")) {
@@ -28,6 +36,13 @@ public class OpcUaUtil {
         return serverAddress;
     }
 
+    /***
+     * OPC UA specific implementation of {@link org.apache.streampipes.connect.adapter.Adapter}
+     * @param adapterStreamDescription
+     * @return guess schema
+     * @throws AdapterException
+     * @throws ParseException
+     */
     public static GuessSchema getSchema(SpecificAdapterStreamDescription adapterStreamDescription) throws AdapterException, ParseException {
         GuessSchema guessSchema = new GuessSchema();
         EventSchema eventSchema = new EventSchema();
@@ -45,7 +60,7 @@ public class OpcUaUtil {
                         allProperties.add(PrimitivePropertyBuilder
                             .create(opcNode.getType(), opcNode.getLabel())
                             .label(opcNode.getLabel())
-                            .measurementUnit(new URI(OpcUa.mapUnitIdToQudt(opcNode.getOpcUnitId())))
+                            .measurementUnit(new URI(opcNode.getQudtURI()))
                             .build());
                     } else {
                         allProperties.add(PrimitivePropertyBuilder
@@ -71,8 +86,15 @@ public class OpcUaUtil {
     }
 
 
+    /***
+     * OPC UA specific implementation of {@link org.apache.streampipes.container.api.ResolvesContainerProvidedOptions#resolveOptions(String, StaticPropertyExtractor)}.  }
+     * @param requestId
+     * @param parameterExtractor
+     * @return {@code List<Option>} with available node names for the given OPC UA configuration
+     */
     public static List<Option> resolveOptions (String requestId, StaticPropertyExtractor parameterExtractor) {
 
+        // access mode and host/url have to be selected
         try {
             parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
             parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
@@ -111,6 +133,9 @@ public class OpcUaUtil {
         return key;
     }
 
+    /***
+     * Enum for all possible labels in the context of OPC UA adapters
+     */
     public enum OpcUaLabels {
         OPC_HOST_OR_URL,
         OPC_URL,


[incubator-streampipes-extensions] 03/03: [STREAMPIPES-306] integrate pull an subscription mode into one adapter

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 77f338f67c64edae0811b267c586b28465381568
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Mar 9 20:13:32 2021 +0100

    [STREAMPIPES-306] integrate pull an subscription mode into one adapter
---
 .../streampipes/connect/ConnectAdapterInit.java    |   6 +-
 .../streampipes/connect/adapters/opcua/OpcUa.java  |  53 +++++--
 .../{OpcUaPullAdapter.java => OpcUaAdapter.java}   | 102 +++++++++---
 .../adapters/opcua/OpcUaSubscriptionAdapter.java   | 174 ---------------------
 .../connect/adapters/opcua/utils/OpcUaUtil.java    |   5 +-
 .../icon.png                                       | Bin 5087 -> 0 bytes
 .../documentation.md                               |  66 --------
 .../strings.en                                     |  46 ------
 .../documentation.md                               |   0
 .../icon.png                                       | Bin
 .../strings.en                                     |  17 +-
 .../extensions/all/jvm/AllExtensionsInit.java      |   6 +-
 12 files changed, 137 insertions(+), 338 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index 34b0e20..5190ff3 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.connect.adapters.iss.IssAdapter;
 import org.apache.streampipes.connect.adapters.flic.FlicMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
 import org.apache.streampipes.connect.adapters.simulator.machine.MachineDataStreamAdapter;
 import org.apache.streampipes.connect.adapters.ti.TISensorTag;
@@ -37,7 +37,6 @@ import org.apache.streampipes.connect.adapters.influxdb.InfluxDbSetAdapter;
 import org.apache.streampipes.connect.adapters.influxdb.InfluxDbStreamAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
 import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
 import org.apache.streampipes.connect.adapters.simulator.random.RandomDataSetAdapter;
@@ -83,8 +82,7 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
             .add(new WikipediaEditedArticlesAdapter())
             .add(new WikipediaNewArticlesAdapter())
             .add(new RosBridgeAdapter())
-            .add(new OpcUaSubscriptionAdapter())
-            .add(new OpcUaPullAdapter())
+            .add(new OpcUaAdapter())
             .add(new InfluxDbStreamAdapter())
             .add(new InfluxDbSetAdapter())
             .add(new TISensorTag())
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
index 5c723f9..efc84a0 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
@@ -62,8 +62,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /***
- * Wrapper class for all OPC UA specific stuff. <br>
- * Is used both in the {@link OpcUaPullAdapter} and {@link OpcUaSubscriptionAdapter}.
+ * Wrapper class for all OPC UA specific stuff.
  */
 public class OpcUa {
 
@@ -73,6 +72,7 @@ public class OpcUa {
     private String opcServerURL;
     private OpcUaClient client;
     private boolean unauthenticated;
+    private Double pullIntervalSeconds;
     private String user;
     private String password;
     private List<Map<String, Integer>> unitIDs = new ArrayList<>();
@@ -90,17 +90,19 @@ public class OpcUa {
 
 
     /**
-     * Constructor for security level {@code None} and OPC server given by url
+     * Constructor for security level {@code None}, OPC server given by url and subscription-based
      *
      * @param opcServerURL complete OPC UA server url
      * @param namespaceIndex namespace index of the given node
      * @param nodeId node identifier
+     * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
      * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
      */
-    public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, List<String> selectedNodeNames) {
+    public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, Double pullIntervalSeconds, List<String> selectedNodeNames) {
 
         this.opcServerURL = opcServerURL;
         this.unauthenticated = true;
+        this.pullIntervalSeconds = pullIntervalSeconds;
         this.selectedNodeNames = selectedNodeNames;
 
         if (isInteger(nodeId)) {
@@ -118,10 +120,11 @@ public class OpcUa {
      * @param opcServerPort OPC UA port number
      * @param namespaceIndex namespace index of the given node
      * @param nodeId node identifier
+     * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
      * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
      */
-    public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, List<String> selectedNodeNames) {
-        this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, selectedNodeNames);
+    public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+        this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
     }
 
     /**
@@ -132,10 +135,11 @@ public class OpcUa {
      * @param nodeId node identifier
      * @param username username to authenticate at the OPC UA server
      * @param password corresponding password to given user name
+     * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
      * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
      */
-    public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, String username, String password, List<String> selectedNodeNames) {
-        this(opcServerURL, namespaceIndex, nodeId, selectedNodeNames);
+    public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, String username, String password, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+        this(opcServerURL, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
         this.unauthenticated = false;
         this.user = username;
         this.password = password;
@@ -150,10 +154,11 @@ public class OpcUa {
      * @param nodeId node identifier
      * @param username username to authenticate at the OPC UA server
      * @param password corresponding password to given user name
+     * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
      * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
      */
-    public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, String username, String password, List<String> selectedNodeNames) {
-        this (opcServer, opcServerPort, namespaceIndex, nodeId, selectedNodeNames);
+    public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, String username, String password, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+        this (opcServer, opcServerPort, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
         this.unauthenticated = false;
         this.user = username;
         this.password = password;
@@ -172,9 +177,15 @@ public class OpcUa {
         int namespaceIndex = extractor.singleValueParameter(OpcUaLabels.NAMESPACE_INDEX.name(), int.class);
         String nodeId = extractor.singleValueParameter(OpcUaLabels.NODE_ID.name(), String.class);
 
+        boolean usePullMode = extractor.selectedAlternativeInternalId(OpcUaLabels.ADAPTER_TYPE.name()).equals(OpcUaLabels.PULL_MODE.name());
         boolean useURL = selectedAlternativeConnection.equals(OpcUaLabels.OPC_URL.name());
         boolean unauthenticated =  selectedAlternativeAuthentication.equals(OpcUaLabels.UNAUTHENTICATED.name());
 
+        Double pullIntervalSeconds = null;
+        if (usePullMode) {
+            pullIntervalSeconds = extractor.singleValueParameter(OpcUaLabels.PULLING_INTERVAL.name(), Double.class);
+        }
+
         List<String> selectedNodeNames = extractor.selectedMultiValues(OpcUaLabels.AVAILABLE_NODES.name(), String.class);
 
         if (useURL && unauthenticated){
@@ -182,14 +193,14 @@ public class OpcUa {
             String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
             serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
 
-            return new OpcUa(serverAddress, namespaceIndex, nodeId, selectedNodeNames);
+            return new OpcUa(serverAddress, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
 
         } else if(!useURL && unauthenticated){
             String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
             serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
             int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
 
-            return new OpcUa(serverAddress, port, namespaceIndex, nodeId, selectedNodeNames);
+            return new OpcUa(serverAddress, port, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
         } else {
 
             String username = extractor.singleValueParameter(OpcUaLabels.USERNAME.name(), String.class);
@@ -199,13 +210,13 @@ public class OpcUa {
                 String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
                 serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
 
-                return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, selectedNodeNames);
+                return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
             } else {
                 String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
                 serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
                 int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
 
-                return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, selectedNodeNames);
+                return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
             }
         }
 
@@ -465,10 +476,10 @@ public class OpcUa {
     /***
      * Register subscriptions for given OPC UA nodes
      * @param nodes List of {@link org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
-     * @param opcUaSubscriptionAdapter current instance of {@link OpcUaSubscriptionAdapter}
+     * @param opcUaAdapter current instance of {@link OpcUaAdapter}
      * @throws Exception
      */
-    public void createListSubscription(List<NodeId> nodes, OpcUaSubscriptionAdapter opcUaSubscriptionAdapter) throws Exception {
+    public void createListSubscription(List<NodeId> nodes, OpcUaAdapter opcUaAdapter) throws Exception {
         /*
          * create a subscription @ 1000ms
          */
@@ -513,7 +524,7 @@ public class OpcUa {
 
         BiConsumer<UaMonitoredItem, Integer> onItemCreated =
                 (item, id) -> {
-                    item.setValueConsumer(opcUaSubscriptionAdapter::onSubscriptionValue);
+                    item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
                 };
 
         List<UaMonitoredItem> items = subscription.createMonitoredItems(
@@ -551,4 +562,12 @@ public class OpcUa {
     public String getOpcServerURL() {
         return opcServerURL;
     }
+
+    public boolean inPullMode() {
+        return !(this.pullIntervalSeconds == null);
+    }
+
+    public double getPullIntervalSeconds() {
+        return this.pullIntervalSeconds;
+    }
 }
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
similarity index 60%
rename from streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
rename to streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
index f218040..a784997 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
@@ -19,7 +19,9 @@ import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
 
 import java.util.*;
@@ -27,66 +29,119 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
+public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
 
-    public static final String ID = "org.apache.streampipes.connect.adapters.opcua.pull";
+    public static final String ID = "org.apache.streampipes.connect.adapters.opcua";
 
-    private int pollingIntervalInSeconds;
+    private double pullingIntervalInSeconds;
     private OpcUa opcUa;
     private List<OpcNode> allNodes;
+    private List<NodeId> allNodeIds;
+    private int numberProperties;
+    private Map<String, Object> event;
 
-    public OpcUaPullAdapter() {
+    public OpcUaAdapter() {
         super();
+        this.numberProperties = 0;
+        this.event = new HashMap<>();
     }
 
-    public OpcUaPullAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
+    public OpcUaAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
         super(adapterStreamDescription);
+        this.numberProperties = 0;
+        this.event = new HashMap<>();
     }
 
     @Override
     protected void before() throws AdapterException {
 
-        StaticPropertyExtractor extractor =
-                StaticPropertyExtractor.from(this.adapterDescription.getConfig(), new ArrayList<>());
-
-        this.pollingIntervalInSeconds = extractor.singleValueParameter(OpcUaLabels.POLLING_INTERVAL.name(), int.class);
-
         this.opcUa = OpcUa.from(this.adapterDescription);
 
+        this.allNodeIds = new ArrayList<>();
         try {
             this.opcUa.connect();
             this.allNodes = this.opcUa.browseNode(true);
+
+
+                for (OpcNode node : this.allNodes) {
+                    this.allNodeIds.add(node.nodeId);
+                }
+
+            if (opcUa.inPullMode()) {
+                this.pullingIntervalInSeconds = opcUa.getPullIntervalSeconds();
+            } else {
+                this.numberProperties = this.allNodeIds.size();
+                this.opcUa.createListSubscription(this.allNodeIds, this);
+            }
+
+
         } catch (Exception e) {
             throw new AdapterException("The Connection to the OPC UA server could not be established.");
         }
+    }
+
+        @Override
+    public void startAdapter() throws AdapterException {
+        this.before();
+        super.startAdapter();
+    }
 
+    @Override
+    public void stopAdapter() throws AdapterException {
+        // close connection
+        this.opcUa.disconnect();
     }
 
     @Override
     protected void pullData() {
 
-        Map<String, Object> event = new HashMap<>();
+        if (opcUa.inPullMode()) {
 
-        for (OpcNode opcNode : this.allNodes) {
-            CompletableFuture response = this.opcUa.getClient().readValue(0, TimestampsToReturn.Both, opcNode.getNodeId());
+            CompletableFuture<List<DataValue>> response = this.opcUa.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
             try {
 
-                Object value = ((DataValue) response.get()).getValue().getValue();
+            List<DataValue> returnValues = response.get();
+                for (int i = 0; i<returnValues.size(); i++) {
 
-                event.put(opcNode.getLabel(), value);
+                    Object value = returnValues.get(i).getValue().getValue();
+                    this.event.put(this.allNodes.get(i).getLabel(), value);
 
-            } catch (InterruptedException | ExecutionException ie) {
+                }
+             } catch (InterruptedException | ExecutionException ie) {
                 ie.printStackTrace();
-            }
+             }
+
+            adapterPipeline.process(this.event);
+
         }
 
-        adapterPipeline.process(event);
+    }
+
+    public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
 
+        String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
+
+        OpcNode currNode = this.allNodes.stream()
+                .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
+                .findFirst()
+                .orElse(null);
+
+        event.put(currNode.getLabel(), value.getValue().getValue());
+
+        // ensure that event is complete and all opc ua subscriptions transmitted at least one value
+        if (event.keySet().size() >= this.numberProperties) {
+            Map <String, Object> newEvent = new HashMap<>();
+            // deep copy of event to prevent preprocessor error
+            for (String k : event.keySet()) {
+                newEvent.put(k, event.get(k));
+            }
+            adapterPipeline.process(newEvent);
+        }
     }
 
     @Override
     protected PollingSettings getPollingInterval() {
-        return PollingSettings.from(TimeUnit.SECONDS, this.pollingIntervalInSeconds);
+        return PollingSettings.from(TimeUnit.MILLISECONDS, (int) this.pullingIntervalInSeconds * 1000);
     }
 
     @Override
@@ -97,7 +152,10 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
                 .withAssets(Assets.DOCUMENTATION, Assets.ICON)
                 .withLocales(Locales.EN)
                 .category(AdapterType.Generic, AdapterType.Manufacturing)
-                .requiredIntegerParameter(Labels.withId(OpcUaLabels.POLLING_INTERVAL.name()))
+                .requiredAlternatives(Labels.withId(OpcUaLabels.ADAPTER_TYPE.name()),
+                        Alternatives.from(Labels.withId(OpcUaLabels.PULL_MODE.name()),
+                                StaticProperties.integerFreeTextProperty(Labels.withId(OpcUaLabels.PULLING_INTERVAL.name()))),
+                        Alternatives.from(Labels.withId(OpcUaLabels.SUBSCRIPTION_MODE.name())))
                 .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
                         Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
                         Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
@@ -123,7 +181,7 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
                 .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
                 .requiredMultiValueSelectionFromContainer(
                         Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
-                        Arrays.asList(OpcUaLabels.POLLING_INTERVAL.name(), OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
+                        Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
                 )
                 .build();
 
@@ -134,7 +192,7 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
 
     @Override
     public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
-        return new OpcUaPullAdapter(adapterDescription);
+        return new OpcUaAdapter(adapterDescription);
     }
 
     @Override
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java
deleted file mode 100644
index 51a389a..0000000
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.adapters.opcua;
-
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.AdapterType;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.sdk.StaticProperties;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-
-import java.util.*;
-
-public class OpcUaSubscriptionAdapter extends SpecificDataStreamAdapter implements ResolvesContainerProvidedOptions {
-
-    public static final String ID = "org.apache.streampipes.connect.adapters.opcua.subscription";
-
-    private Map<String, Object> event;
-    private List<OpcNode> allNodes;
-    private OpcUa opcUa;
-
-    private int numberProperties;
-
-    public OpcUaSubscriptionAdapter() {
-        this.event = new HashMap<>();
-        this.numberProperties = 0;
-    }
-
-    public OpcUaSubscriptionAdapter(SpecificAdapterStreamDescription adapterDescription) {
-        super(adapterDescription);
-
-        this.opcUa = OpcUa.from(this.adapterDescription);
-
-        this.event = new HashMap<>();
-        this.numberProperties = 0;
-    }
-
-    @Override
-    public SpecificAdapterStreamDescription declareModel() {
-
-        SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .withLocales(Locales.EN)
-                .category(AdapterType.Generic, AdapterType.Manufacturing)
-                .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
-                        Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
-                        Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
-                                StaticProperties.group(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
-                                StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
-                                StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name())))))
-                .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
-                        Alternatives.from(Labels.withId(OpcUaLabels.OPC_URL.name()),
-                                StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name()))
-                        ),
-                        Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
-                                StaticProperties.group(Labels.withId("host-port"),
-                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
-                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name())))))
-                .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
-                .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
-                .requiredMultiValueSelectionFromContainer(
-                        Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
-                        Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
-                )
-                .build();
-        description.setAppId(ID);
-
-
-        return  description;
-    }
-
-    public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
-
-        String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
-
-        OpcNode currNode = this.allNodes.stream()
-                .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
-                .findFirst()
-                .orElse(null);
-
-        event.put(currNode.getLabel(), value.getValue().getValue());
-
-        // ensure that event is complete and all opc ua subscriptions transmitted at least one value
-        if (event.keySet().size() >= this.numberProperties) {
-            Map <String, Object> newEvent = new HashMap<>();
-            // deep copy of event to prevent preprocessor error
-            for (String k : event.keySet()) {
-              newEvent.put(k, event.get(k));
-            }
-            adapterPipeline.process(newEvent);
-        }
-    }
-
-
-    @Override
-    public void startAdapter() throws AdapterException {
-
-        this.opcUa = OpcUa.from(this.adapterDescription);
-
-        try {
-            this.opcUa.connect();
-
-            this.allNodes = this.opcUa.browseNode(true);
-
-            List<NodeId> nodeIds = new ArrayList<>();
-
-            for (OpcNode rd : this.allNodes) {
-                nodeIds.add(rd.nodeId);
-            }
-
-            this.numberProperties = nodeIds.size();
-            this.opcUa.createListSubscription(nodeIds, this);
-        } catch (Exception e) {
-            throw new AdapterException("Could not connect to OPC-UA server! Server: " + this.opcUa.getOpcServerURL());
-        }
-    }
-
-    @Override
-    public void stopAdapter() throws AdapterException {
-        // close connection
-        this.opcUa.disconnect();
-    }
-
-    @Override
-    public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
-        return new OpcUaSubscriptionAdapter(adapterDescription);
-    }
-
-    @Override
-    public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
-
-        return OpcUaUtil.getSchema(adapterDescription);
-    }
-
-    @Override
-    public String getId() {
-        return ID;
-    }
-
-    @Override
-    public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
-
-        return OpcUaUtil.resolveOptions(requestId, parameterExtractor);
-    }
-}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
index 77b8a82..55aecb3 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
@@ -151,6 +151,9 @@ public class OpcUaUtil {
         PASSWORD,
         UNAUTHENTICATED,
         AVAILABLE_NODES,
-        POLLING_INTERVAL;
+        PULLING_INTERVAL,
+        ADAPTER_TYPE,
+        PULL_MODE,
+        SUBSCRIPTION_MODE;
     }
 }
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png
deleted file mode 100644
index ab68d43..0000000
Binary files a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png and /dev/null differ
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md
deleted file mode 100644
index 8cc959a..0000000
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md
+++ /dev/null
@@ -1,66 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  ~
-  -->
-
-## OPC-UA Subscription Adapter
-
-<p align="center"> 
-    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
-</p>
-
-***
-
-## Description
-
-Read values from an OPC-UA server on value changes
-
-***
-
-## Required Input
-
-***
-
-## Configuration
-
-### Anonymous vs. Username/Password
-
-Choose whether you want to connect anonymously or authenticate using your credentials.
-
-&nbsp;&nbsp;&nbsp;&nbsp; **Anonymous**: No further information required <br>
-&nbsp;&nbsp;&nbsp;&nbsp; **Username/Password**: Insert your `username` and `password` to access the OPC UA server
-
-### OPC UA Server
-
-Where can the OPC UA server be found?
-
-&nbsp;&nbsp;&nbsp;&nbsp; **URL**: Specify the server's full `URL` (including port), can be with our without leading `opc.tcp://`<br>
-&nbsp;&nbsp;&nbsp;&nbsp; **Host/Port**: Insert the `host` address (with or without leading `opc.tcp://`) and the `port`<br>
-
-### Namespace Index
-
-Requires the index of the namespace you want to connect to.
-
-### Node ID
-
-The identifier of the node you want to read from, numbers and strings are both valid.
-
-### Available Nodes
-
-Shows all available nodes once namespace index and node ID are given.
-Select as much as you like to query.
-
-***
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
deleted file mode 100644
index 84c1974..0000000
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
+++ /dev/null
@@ -1,46 +0,0 @@
-org.apache.streampipes.connect.adapters.opcua.subscription.title=OPC UA Subscription
-org.apache.streampipes.connect.adapters.opcua.subscription.description=Reads values from an OPC-UA server only on changes
-
-
-OPC_HOST_OR_URL.title=OPC Server
-OPC_HOST_OR_URL.description=
-
-OPC_URL.title=URL
-OPC_URL.description=
-
-OPC_HOST.title=Host/Port
-OPC_HOST.description=
-
-OPC_SERVER_URL.title=URL
-OPC_SERVER_URL.description=Example: opc.tcp://test-server.com:4840,
-
-OPC_SERVER_HOST.title=Host
-OPC_SERVER_HOST.description=Example: test-server.com, opc.tcp://test-server.com)
-
-OPC_SERVER_PORT.title=Port
-OPC_SERVER_PORT.description=Example: 4840
-
-NAMESPACE_INDEX.title=Namespace Index
-NAMESPACE.INDEX.description=Example: 2
-
-NODE_ID.title=Node ID
-NODE_ID.description=Id of the Node to read the values from
-
-
-ACCESS_MODE.title=Security Level
-ACCESS_MODE.description=Select the OPC UA security level for the connection
-
-USERNAME_GROUP.title=Sign (username & password)
-USERNAME_GROUP.description=
-
-UNAUTHENTICATED.title=None
-UNAUTHENTICATED.description=
-
-USERNAME.title=Username
-USERNAME.description=
-
-PASSWORD.title=Password
-PASSWORD.description=
-
-AVAILABLE_NODES.title=Available Nodes
-AVAILABLE_NODES.description=Select the nodes that are relevant for you. Please ensure to select one option each for access mode and OPC UA server
\ No newline at end of file
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/documentation.md b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/documentation.md
similarity index 100%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/documentation.md
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/documentation.md
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/icon.png b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/icon.png
similarity index 100%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/icon.png
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/icon.png
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
similarity index 68%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
index 72ec8f8..240ffdb 100644
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
+++ b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
@@ -1,5 +1,5 @@
-org.apache.streampipes.connect.adapters.opcua.pull.title=OPC UA Pull
-org.apache.streampipes.connect.adapters.opcua.pull.description=Reads values from an OPC-UA server repeatedly
+org.apache.streampipes.connect.adapters.opcua.title=OPC UA
+org.apache.streampipes.connect.adapters.opcua.description=Reads values from an OPC-UA server
 
 
 OPC_HOST_OR_URL.title=OPC Server
@@ -45,5 +45,14 @@ PASSWORD.description=
 AVAILABLE_NODES.title=Available Nodes
 AVAILABLE_NODES.description=Select the nodes that are relevant for you. Please ensure to select one option each for access mode and OPC UA server and to specify the polling interval
 
-POLLING_INTERVAL.title=Polling Interval
-POLLING_INTERVAL.description=Timespan between data are polled, in seconds
\ No newline at end of file
+PULLING_INTERVAL.title=Pull Interval
+PULLING_INTERVAL.description=Timespan between data are pulled, in seconds
+
+ADAPTER_TYPE.title=Adapter Type
+ADAPTER_TYPE.description=Select whether the adapter pulls data regularly or listens to subscriptions
+
+PULL_MODE.title=Pull mode
+PULL_MODE.description=
+
+SUBSCRIPTION_MODE.title=Subscription mode
+SUBSCRIPTION_MODE.description=
\ No newline at end of file
diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index 6e06b5d..dd67855 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -31,8 +31,7 @@ import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
 import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
@@ -246,8 +245,7 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
                 .add(new WikipediaEditedArticlesAdapter())
                 .add(new WikipediaNewArticlesAdapter())
                 .add(new RosBridgeAdapter())
-                .add(new OpcUaSubscriptionAdapter())
-                .add(new OpcUaPullAdapter())
+                .add(new OpcUaAdapter())
                 .add(new InfluxDbStreamAdapter())
                 .add(new InfluxDbSetAdapter())
                 .add(new TISensorTag())


[incubator-streampipes-extensions] 01/03: [STREAMPIPES-301] adapt strings.en to OPC UA wording

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 0bdbf9e548f2e2e66d254b5a1c47567285b264c9
Author: bossenti <bo...@posteo.de>
AuthorDate: Thu Mar 4 15:45:49 2021 +0100

    [STREAMPIPES-301] adapt strings.en to OPC UA wording
---
 .../org.apache.streampipes.connect.adapters.opcua.pull/strings.en | 8 ++++----
 .../strings.en                                                    | 8 ++++----
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
index 34f3a81..72ec8f8 100644
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
+++ b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
@@ -27,13 +27,13 @@ NODE_ID.title=Node ID
 NODE_ID.description=Id of the Node to read the values from
 
 
-ACCESS_MODE.title=Access Mode
-ACCESS_MODE.description=
+ACCESS_MODE.title=Security Level
+ACCESS_MODE.description=Select the OPC UA security level for the connection
 
-USERNAME_GROUP.title=Username/Password
+USERNAME_GROUP.title=Sign (username & password)
 USERNAME_GROUP.description=
 
-UNAUTHENTICATED.title=Unauthenticated
+UNAUTHENTICATED.title=None
 UNAUTHENTICATED.description=
 
 USERNAME.title=Username
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
index 22262ab..84c1974 100644
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
+++ b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
@@ -27,13 +27,13 @@ NODE_ID.title=Node ID
 NODE_ID.description=Id of the Node to read the values from
 
 
-ACCESS_MODE.title=Access Mode
-ACCESS_MODE.description=
+ACCESS_MODE.title=Security Level
+ACCESS_MODE.description=Select the OPC UA security level for the connection
 
-USERNAME_GROUP.title=Username/Password
+USERNAME_GROUP.title=Sign (username & password)
 USERNAME_GROUP.description=
 
-UNAUTHENTICATED.title=Unauthenticated
+UNAUTHENTICATED.title=None
 UNAUTHENTICATED.description=
 
 USERNAME.title=Username