You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/09 19:19:35 UTC

[incubator-streampipes-extensions] 03/03: [STREAMPIPES-306] integrate pull an subscription mode into one adapter

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 77f338f67c64edae0811b267c586b28465381568
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Mar 9 20:13:32 2021 +0100

    [STREAMPIPES-306] integrate pull an subscription mode into one adapter
---
 .../streampipes/connect/ConnectAdapterInit.java    |   6 +-
 .../streampipes/connect/adapters/opcua/OpcUa.java  |  53 +++++--
 .../{OpcUaPullAdapter.java => OpcUaAdapter.java}   | 102 +++++++++---
 .../adapters/opcua/OpcUaSubscriptionAdapter.java   | 174 ---------------------
 .../connect/adapters/opcua/utils/OpcUaUtil.java    |   5 +-
 .../icon.png                                       | Bin 5087 -> 0 bytes
 .../documentation.md                               |  66 --------
 .../strings.en                                     |  46 ------
 .../documentation.md                               |   0
 .../icon.png                                       | Bin
 .../strings.en                                     |  17 +-
 .../extensions/all/jvm/AllExtensionsInit.java      |   6 +-
 12 files changed, 137 insertions(+), 338 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index 34b0e20..5190ff3 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.connect.adapters.iss.IssAdapter;
 import org.apache.streampipes.connect.adapters.flic.FlicMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
 import org.apache.streampipes.connect.adapters.simulator.machine.MachineDataStreamAdapter;
 import org.apache.streampipes.connect.adapters.ti.TISensorTag;
@@ -37,7 +37,6 @@ import org.apache.streampipes.connect.adapters.influxdb.InfluxDbSetAdapter;
 import org.apache.streampipes.connect.adapters.influxdb.InfluxDbStreamAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
 import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
 import org.apache.streampipes.connect.adapters.simulator.random.RandomDataSetAdapter;
@@ -83,8 +82,7 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
             .add(new WikipediaEditedArticlesAdapter())
             .add(new WikipediaNewArticlesAdapter())
             .add(new RosBridgeAdapter())
-            .add(new OpcUaSubscriptionAdapter())
-            .add(new OpcUaPullAdapter())
+            .add(new OpcUaAdapter())
             .add(new InfluxDbStreamAdapter())
             .add(new InfluxDbSetAdapter())
             .add(new TISensorTag())
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
index 5c723f9..efc84a0 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
@@ -62,8 +62,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /***
- * Wrapper class for all OPC UA specific stuff. <br>
- * Is used both in the {@link OpcUaPullAdapter} and {@link OpcUaSubscriptionAdapter}.
+ * Wrapper class for all OPC UA specific stuff.
  */
 public class OpcUa {
 
@@ -73,6 +72,7 @@ public class OpcUa {
     private String opcServerURL;
     private OpcUaClient client;
     private boolean unauthenticated;
+    private Double pullIntervalSeconds;
     private String user;
     private String password;
     private List<Map<String, Integer>> unitIDs = new ArrayList<>();
@@ -90,17 +90,19 @@ public class OpcUa {
 
 
     /**
-     * Constructor for security level {@code None} and OPC server given by url
+     * Constructor for security level {@code None}, OPC server given by url and subscription-based
      *
      * @param opcServerURL complete OPC UA server url
      * @param namespaceIndex namespace index of the given node
      * @param nodeId node identifier
+     * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
      * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
      */
-    public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, List<String> selectedNodeNames) {
+    public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, Double pullIntervalSeconds, List<String> selectedNodeNames) {
 
         this.opcServerURL = opcServerURL;
         this.unauthenticated = true;
+        this.pullIntervalSeconds = pullIntervalSeconds;
         this.selectedNodeNames = selectedNodeNames;
 
         if (isInteger(nodeId)) {
@@ -118,10 +120,11 @@ public class OpcUa {
      * @param opcServerPort OPC UA port number
      * @param namespaceIndex namespace index of the given node
      * @param nodeId node identifier
+     * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
      * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
      */
-    public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, List<String> selectedNodeNames) {
-        this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, selectedNodeNames);
+    public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+        this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
     }
 
     /**
@@ -132,10 +135,11 @@ public class OpcUa {
      * @param nodeId node identifier
      * @param username username to authenticate at the OPC UA server
      * @param password corresponding password to given user name
+     * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
      * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
      */
-    public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, String username, String password, List<String> selectedNodeNames) {
-        this(opcServerURL, namespaceIndex, nodeId, selectedNodeNames);
+    public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, String username, String password, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+        this(opcServerURL, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
         this.unauthenticated = false;
         this.user = username;
         this.password = password;
@@ -150,10 +154,11 @@ public class OpcUa {
      * @param nodeId node identifier
      * @param username username to authenticate at the OPC UA server
      * @param password corresponding password to given user name
+     * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
      * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
      */
-    public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, String username, String password, List<String> selectedNodeNames) {
-        this (opcServer, opcServerPort, namespaceIndex, nodeId, selectedNodeNames);
+    public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, String username, String password, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+        this (opcServer, opcServerPort, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
         this.unauthenticated = false;
         this.user = username;
         this.password = password;
@@ -172,9 +177,15 @@ public class OpcUa {
         int namespaceIndex = extractor.singleValueParameter(OpcUaLabels.NAMESPACE_INDEX.name(), int.class);
         String nodeId = extractor.singleValueParameter(OpcUaLabels.NODE_ID.name(), String.class);
 
+        boolean usePullMode = extractor.selectedAlternativeInternalId(OpcUaLabels.ADAPTER_TYPE.name()).equals(OpcUaLabels.PULL_MODE.name());
         boolean useURL = selectedAlternativeConnection.equals(OpcUaLabels.OPC_URL.name());
         boolean unauthenticated =  selectedAlternativeAuthentication.equals(OpcUaLabels.UNAUTHENTICATED.name());
 
+        Double pullIntervalSeconds = null;
+        if (usePullMode) {
+            pullIntervalSeconds = extractor.singleValueParameter(OpcUaLabels.PULLING_INTERVAL.name(), Double.class);
+        }
+
         List<String> selectedNodeNames = extractor.selectedMultiValues(OpcUaLabels.AVAILABLE_NODES.name(), String.class);
 
         if (useURL && unauthenticated){
@@ -182,14 +193,14 @@ public class OpcUa {
             String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
             serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
 
-            return new OpcUa(serverAddress, namespaceIndex, nodeId, selectedNodeNames);
+            return new OpcUa(serverAddress, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
 
         } else if(!useURL && unauthenticated){
             String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
             serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
             int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
 
-            return new OpcUa(serverAddress, port, namespaceIndex, nodeId, selectedNodeNames);
+            return new OpcUa(serverAddress, port, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
         } else {
 
             String username = extractor.singleValueParameter(OpcUaLabels.USERNAME.name(), String.class);
@@ -199,13 +210,13 @@ public class OpcUa {
                 String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
                 serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
 
-                return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, selectedNodeNames);
+                return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
             } else {
                 String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
                 serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
                 int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
 
-                return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, selectedNodeNames);
+                return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
             }
         }
 
@@ -465,10 +476,10 @@ public class OpcUa {
     /***
      * Register subscriptions for given OPC UA nodes
      * @param nodes List of {@link org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
-     * @param opcUaSubscriptionAdapter current instance of {@link OpcUaSubscriptionAdapter}
+     * @param opcUaAdapter current instance of {@link OpcUaAdapter}
      * @throws Exception
      */
-    public void createListSubscription(List<NodeId> nodes, OpcUaSubscriptionAdapter opcUaSubscriptionAdapter) throws Exception {
+    public void createListSubscription(List<NodeId> nodes, OpcUaAdapter opcUaAdapter) throws Exception {
         /*
          * create a subscription @ 1000ms
          */
@@ -513,7 +524,7 @@ public class OpcUa {
 
         BiConsumer<UaMonitoredItem, Integer> onItemCreated =
                 (item, id) -> {
-                    item.setValueConsumer(opcUaSubscriptionAdapter::onSubscriptionValue);
+                    item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
                 };
 
         List<UaMonitoredItem> items = subscription.createMonitoredItems(
@@ -551,4 +562,12 @@ public class OpcUa {
     public String getOpcServerURL() {
         return opcServerURL;
     }
+
+    public boolean inPullMode() {
+        return !(this.pullIntervalSeconds == null);
+    }
+
+    public double getPullIntervalSeconds() {
+        return this.pullIntervalSeconds;
+    }
 }
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
similarity index 60%
rename from streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
rename to streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
index f218040..a784997 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
@@ -19,7 +19,9 @@ import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
 
 import java.util.*;
@@ -27,66 +29,119 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
+public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
 
-    public static final String ID = "org.apache.streampipes.connect.adapters.opcua.pull";
+    public static final String ID = "org.apache.streampipes.connect.adapters.opcua";
 
-    private int pollingIntervalInSeconds;
+    private double pullingIntervalInSeconds;
     private OpcUa opcUa;
     private List<OpcNode> allNodes;
+    private List<NodeId> allNodeIds;
+    private int numberProperties;
+    private Map<String, Object> event;
 
-    public OpcUaPullAdapter() {
+    public OpcUaAdapter() {
         super();
+        this.numberProperties = 0;
+        this.event = new HashMap<>();
     }
 
-    public OpcUaPullAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
+    public OpcUaAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
         super(adapterStreamDescription);
+        this.numberProperties = 0;
+        this.event = new HashMap<>();
     }
 
     @Override
     protected void before() throws AdapterException {
 
-        StaticPropertyExtractor extractor =
-                StaticPropertyExtractor.from(this.adapterDescription.getConfig(), new ArrayList<>());
-
-        this.pollingIntervalInSeconds = extractor.singleValueParameter(OpcUaLabels.POLLING_INTERVAL.name(), int.class);
-
         this.opcUa = OpcUa.from(this.adapterDescription);
 
+        this.allNodeIds = new ArrayList<>();
         try {
             this.opcUa.connect();
             this.allNodes = this.opcUa.browseNode(true);
+
+
+                for (OpcNode node : this.allNodes) {
+                    this.allNodeIds.add(node.nodeId);
+                }
+
+            if (opcUa.inPullMode()) {
+                this.pullingIntervalInSeconds = opcUa.getPullIntervalSeconds();
+            } else {
+                this.numberProperties = this.allNodeIds.size();
+                this.opcUa.createListSubscription(this.allNodeIds, this);
+            }
+
+
         } catch (Exception e) {
             throw new AdapterException("The Connection to the OPC UA server could not be established.");
         }
+    }
+
+        @Override
+    public void startAdapter() throws AdapterException {
+        this.before();
+        super.startAdapter();
+    }
 
+    @Override
+    public void stopAdapter() throws AdapterException {
+        // close connection
+        this.opcUa.disconnect();
     }
 
     @Override
     protected void pullData() {
 
-        Map<String, Object> event = new HashMap<>();
+        if (opcUa.inPullMode()) {
 
-        for (OpcNode opcNode : this.allNodes) {
-            CompletableFuture response = this.opcUa.getClient().readValue(0, TimestampsToReturn.Both, opcNode.getNodeId());
+            CompletableFuture<List<DataValue>> response = this.opcUa.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
             try {
 
-                Object value = ((DataValue) response.get()).getValue().getValue();
+            List<DataValue> returnValues = response.get();
+                for (int i = 0; i<returnValues.size(); i++) {
 
-                event.put(opcNode.getLabel(), value);
+                    Object value = returnValues.get(i).getValue().getValue();
+                    this.event.put(this.allNodes.get(i).getLabel(), value);
 
-            } catch (InterruptedException | ExecutionException ie) {
+                }
+             } catch (InterruptedException | ExecutionException ie) {
                 ie.printStackTrace();
-            }
+             }
+
+            adapterPipeline.process(this.event);
+
         }
 
-        adapterPipeline.process(event);
+    }
+
+    public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
 
+        String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
+
+        OpcNode currNode = this.allNodes.stream()
+                .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
+                .findFirst()
+                .orElse(null);
+
+        event.put(currNode.getLabel(), value.getValue().getValue());
+
+        // ensure that event is complete and all opc ua subscriptions transmitted at least one value
+        if (event.keySet().size() >= this.numberProperties) {
+            Map <String, Object> newEvent = new HashMap<>();
+            // deep copy of event to prevent preprocessor error
+            for (String k : event.keySet()) {
+                newEvent.put(k, event.get(k));
+            }
+            adapterPipeline.process(newEvent);
+        }
     }
 
     @Override
     protected PollingSettings getPollingInterval() {
-        return PollingSettings.from(TimeUnit.SECONDS, this.pollingIntervalInSeconds);
+        return PollingSettings.from(TimeUnit.MILLISECONDS, (int) this.pullingIntervalInSeconds * 1000);
     }
 
     @Override
@@ -97,7 +152,10 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
                 .withAssets(Assets.DOCUMENTATION, Assets.ICON)
                 .withLocales(Locales.EN)
                 .category(AdapterType.Generic, AdapterType.Manufacturing)
-                .requiredIntegerParameter(Labels.withId(OpcUaLabels.POLLING_INTERVAL.name()))
+                .requiredAlternatives(Labels.withId(OpcUaLabels.ADAPTER_TYPE.name()),
+                        Alternatives.from(Labels.withId(OpcUaLabels.PULL_MODE.name()),
+                                StaticProperties.integerFreeTextProperty(Labels.withId(OpcUaLabels.PULLING_INTERVAL.name()))),
+                        Alternatives.from(Labels.withId(OpcUaLabels.SUBSCRIPTION_MODE.name())))
                 .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
                         Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
                         Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
@@ -123,7 +181,7 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
                 .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
                 .requiredMultiValueSelectionFromContainer(
                         Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
-                        Arrays.asList(OpcUaLabels.POLLING_INTERVAL.name(), OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
+                        Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
                 )
                 .build();
 
@@ -134,7 +192,7 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
 
     @Override
     public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
-        return new OpcUaPullAdapter(adapterDescription);
+        return new OpcUaAdapter(adapterDescription);
     }
 
     @Override
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java
deleted file mode 100644
index 51a389a..0000000
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.adapters.opcua;
-
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.AdapterType;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.sdk.StaticProperties;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-
-import java.util.*;
-
-public class OpcUaSubscriptionAdapter extends SpecificDataStreamAdapter implements ResolvesContainerProvidedOptions {
-
-    public static final String ID = "org.apache.streampipes.connect.adapters.opcua.subscription";
-
-    private Map<String, Object> event;
-    private List<OpcNode> allNodes;
-    private OpcUa opcUa;
-
-    private int numberProperties;
-
-    public OpcUaSubscriptionAdapter() {
-        this.event = new HashMap<>();
-        this.numberProperties = 0;
-    }
-
-    public OpcUaSubscriptionAdapter(SpecificAdapterStreamDescription adapterDescription) {
-        super(adapterDescription);
-
-        this.opcUa = OpcUa.from(this.adapterDescription);
-
-        this.event = new HashMap<>();
-        this.numberProperties = 0;
-    }
-
-    @Override
-    public SpecificAdapterStreamDescription declareModel() {
-
-        SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .withLocales(Locales.EN)
-                .category(AdapterType.Generic, AdapterType.Manufacturing)
-                .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
-                        Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
-                        Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
-                                StaticProperties.group(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
-                                StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
-                                StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name())))))
-                .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
-                        Alternatives.from(Labels.withId(OpcUaLabels.OPC_URL.name()),
-                                StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name()))
-                        ),
-                        Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
-                                StaticProperties.group(Labels.withId("host-port"),
-                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
-                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name())))))
-                .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
-                .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
-                .requiredMultiValueSelectionFromContainer(
-                        Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
-                        Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
-                )
-                .build();
-        description.setAppId(ID);
-
-
-        return  description;
-    }
-
-    public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
-
-        String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
-
-        OpcNode currNode = this.allNodes.stream()
-                .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
-                .findFirst()
-                .orElse(null);
-
-        event.put(currNode.getLabel(), value.getValue().getValue());
-
-        // ensure that event is complete and all opc ua subscriptions transmitted at least one value
-        if (event.keySet().size() >= this.numberProperties) {
-            Map <String, Object> newEvent = new HashMap<>();
-            // deep copy of event to prevent preprocessor error
-            for (String k : event.keySet()) {
-              newEvent.put(k, event.get(k));
-            }
-            adapterPipeline.process(newEvent);
-        }
-    }
-
-
-    @Override
-    public void startAdapter() throws AdapterException {
-
-        this.opcUa = OpcUa.from(this.adapterDescription);
-
-        try {
-            this.opcUa.connect();
-
-            this.allNodes = this.opcUa.browseNode(true);
-
-            List<NodeId> nodeIds = new ArrayList<>();
-
-            for (OpcNode rd : this.allNodes) {
-                nodeIds.add(rd.nodeId);
-            }
-
-            this.numberProperties = nodeIds.size();
-            this.opcUa.createListSubscription(nodeIds, this);
-        } catch (Exception e) {
-            throw new AdapterException("Could not connect to OPC-UA server! Server: " + this.opcUa.getOpcServerURL());
-        }
-    }
-
-    @Override
-    public void stopAdapter() throws AdapterException {
-        // close connection
-        this.opcUa.disconnect();
-    }
-
-    @Override
-    public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
-        return new OpcUaSubscriptionAdapter(adapterDescription);
-    }
-
-    @Override
-    public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
-
-        return OpcUaUtil.getSchema(adapterDescription);
-    }
-
-    @Override
-    public String getId() {
-        return ID;
-    }
-
-    @Override
-    public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
-
-        return OpcUaUtil.resolveOptions(requestId, parameterExtractor);
-    }
-}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
index 77b8a82..55aecb3 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
@@ -151,6 +151,9 @@ public class OpcUaUtil {
         PASSWORD,
         UNAUTHENTICATED,
         AVAILABLE_NODES,
-        POLLING_INTERVAL;
+        PULLING_INTERVAL,
+        ADAPTER_TYPE,
+        PULL_MODE,
+        SUBSCRIPTION_MODE;
     }
 }
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png
deleted file mode 100644
index ab68d43..0000000
Binary files a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png and /dev/null differ
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md
deleted file mode 100644
index 8cc959a..0000000
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md
+++ /dev/null
@@ -1,66 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  ~
-  -->
-
-## OPC-UA Subscription Adapter
-
-<p align="center"> 
-    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
-</p>
-
-***
-
-## Description
-
-Read values from an OPC-UA server on value changes
-
-***
-
-## Required Input
-
-***
-
-## Configuration
-
-### Anonymous vs. Username/Password
-
-Choose whether you want to connect anonymously or authenticate using your credentials.
-
-&nbsp;&nbsp;&nbsp;&nbsp; **Anonymous**: No further information required <br>
-&nbsp;&nbsp;&nbsp;&nbsp; **Username/Password**: Insert your `username` and `password` to access the OPC UA server
-
-### OPC UA Server
-
-Where can the OPC UA server be found?
-
-&nbsp;&nbsp;&nbsp;&nbsp; **URL**: Specify the server's full `URL` (including port), can be with our without leading `opc.tcp://`<br>
-&nbsp;&nbsp;&nbsp;&nbsp; **Host/Port**: Insert the `host` address (with or without leading `opc.tcp://`) and the `port`<br>
-
-### Namespace Index
-
-Requires the index of the namespace you want to connect to.
-
-### Node ID
-
-The identifier of the node you want to read from, numbers and strings are both valid.
-
-### Available Nodes
-
-Shows all available nodes once namespace index and node ID are given.
-Select as much as you like to query.
-
-***
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
deleted file mode 100644
index 84c1974..0000000
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
+++ /dev/null
@@ -1,46 +0,0 @@
-org.apache.streampipes.connect.adapters.opcua.subscription.title=OPC UA Subscription
-org.apache.streampipes.connect.adapters.opcua.subscription.description=Reads values from an OPC-UA server only on changes
-
-
-OPC_HOST_OR_URL.title=OPC Server
-OPC_HOST_OR_URL.description=
-
-OPC_URL.title=URL
-OPC_URL.description=
-
-OPC_HOST.title=Host/Port
-OPC_HOST.description=
-
-OPC_SERVER_URL.title=URL
-OPC_SERVER_URL.description=Example: opc.tcp://test-server.com:4840,
-
-OPC_SERVER_HOST.title=Host
-OPC_SERVER_HOST.description=Example: test-server.com, opc.tcp://test-server.com)
-
-OPC_SERVER_PORT.title=Port
-OPC_SERVER_PORT.description=Example: 4840
-
-NAMESPACE_INDEX.title=Namespace Index
-NAMESPACE.INDEX.description=Example: 2
-
-NODE_ID.title=Node ID
-NODE_ID.description=Id of the Node to read the values from
-
-
-ACCESS_MODE.title=Security Level
-ACCESS_MODE.description=Select the OPC UA security level for the connection
-
-USERNAME_GROUP.title=Sign (username & password)
-USERNAME_GROUP.description=
-
-UNAUTHENTICATED.title=None
-UNAUTHENTICATED.description=
-
-USERNAME.title=Username
-USERNAME.description=
-
-PASSWORD.title=Password
-PASSWORD.description=
-
-AVAILABLE_NODES.title=Available Nodes
-AVAILABLE_NODES.description=Select the nodes that are relevant for you. Please ensure to select one option each for access mode and OPC UA server
\ No newline at end of file
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/documentation.md b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/documentation.md
similarity index 100%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/documentation.md
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/documentation.md
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/icon.png b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/icon.png
similarity index 100%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/icon.png
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/icon.png
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
similarity index 68%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
index 72ec8f8..240ffdb 100644
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
+++ b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
@@ -1,5 +1,5 @@
-org.apache.streampipes.connect.adapters.opcua.pull.title=OPC UA Pull
-org.apache.streampipes.connect.adapters.opcua.pull.description=Reads values from an OPC-UA server repeatedly
+org.apache.streampipes.connect.adapters.opcua.title=OPC UA
+org.apache.streampipes.connect.adapters.opcua.description=Reads values from an OPC-UA server
 
 
 OPC_HOST_OR_URL.title=OPC Server
@@ -45,5 +45,14 @@ PASSWORD.description=
 AVAILABLE_NODES.title=Available Nodes
 AVAILABLE_NODES.description=Select the nodes that are relevant for you. Please ensure to select one option each for access mode and OPC UA server and to specify the polling interval
 
-POLLING_INTERVAL.title=Polling Interval
-POLLING_INTERVAL.description=Timespan between data are polled, in seconds
\ No newline at end of file
+PULLING_INTERVAL.title=Pull Interval
+PULLING_INTERVAL.description=Timespan between data are pulled, in seconds
+
+ADAPTER_TYPE.title=Adapter Type
+ADAPTER_TYPE.description=Select whether the adapter pulls data regularly or listens to subscriptions
+
+PULL_MODE.title=Pull mode
+PULL_MODE.description=
+
+SUBSCRIPTION_MODE.title=Subscription mode
+SUBSCRIPTION_MODE.description=
\ No newline at end of file
diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index 6e06b5d..dd67855 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -31,8 +31,7 @@ import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
 import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
@@ -246,8 +245,7 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
                 .add(new WikipediaEditedArticlesAdapter())
                 .add(new WikipediaNewArticlesAdapter())
                 .add(new RosBridgeAdapter())
-                .add(new OpcUaSubscriptionAdapter())
-                .add(new OpcUaPullAdapter())
+                .add(new OpcUaAdapter())
                 .add(new InfluxDbStreamAdapter())
                 .add(new InfluxDbSetAdapter())
                 .add(new TISensorTag())