You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/09 19:19:35 UTC
[incubator-streampipes-extensions] 03/03: [STREAMPIPES-306]
integrate pull an subscription mode into one adapter
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 77f338f67c64edae0811b267c586b28465381568
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Mar 9 20:13:32 2021 +0100
[STREAMPIPES-306] integrate pull an subscription mode into one adapter
---
.../streampipes/connect/ConnectAdapterInit.java | 6 +-
.../streampipes/connect/adapters/opcua/OpcUa.java | 53 +++++--
.../{OpcUaPullAdapter.java => OpcUaAdapter.java} | 102 +++++++++---
.../adapters/opcua/OpcUaSubscriptionAdapter.java | 174 ---------------------
.../connect/adapters/opcua/utils/OpcUaUtil.java | 5 +-
.../icon.png | Bin 5087 -> 0 bytes
.../documentation.md | 66 --------
.../strings.en | 46 ------
.../documentation.md | 0
.../icon.png | Bin
.../strings.en | 17 +-
.../extensions/all/jvm/AllExtensionsInit.java | 6 +-
12 files changed, 137 insertions(+), 338 deletions(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index 34b0e20..5190ff3 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -24,7 +24,7 @@ import org.apache.streampipes.connect.adapters.iss.IssAdapter;
import org.apache.streampipes.connect.adapters.flic.FlicMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
import org.apache.streampipes.connect.adapters.simulator.machine.MachineDataStreamAdapter;
import org.apache.streampipes.connect.adapters.ti.TISensorTag;
@@ -37,7 +37,6 @@ import org.apache.streampipes.connect.adapters.influxdb.InfluxDbSetAdapter;
import org.apache.streampipes.connect.adapters.influxdb.InfluxDbStreamAdapter;
import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
import org.apache.streampipes.connect.adapters.simulator.random.RandomDataSetAdapter;
@@ -83,8 +82,7 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
.add(new WikipediaEditedArticlesAdapter())
.add(new WikipediaNewArticlesAdapter())
.add(new RosBridgeAdapter())
- .add(new OpcUaSubscriptionAdapter())
- .add(new OpcUaPullAdapter())
+ .add(new OpcUaAdapter())
.add(new InfluxDbStreamAdapter())
.add(new InfluxDbSetAdapter())
.add(new TISensorTag())
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
index 5c723f9..efc84a0 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUa.java
@@ -62,8 +62,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/***
- * Wrapper class for all OPC UA specific stuff. <br>
- * Is used both in the {@link OpcUaPullAdapter} and {@link OpcUaSubscriptionAdapter}.
+ * Wrapper class for all OPC UA specific stuff.
*/
public class OpcUa {
@@ -73,6 +72,7 @@ public class OpcUa {
private String opcServerURL;
private OpcUaClient client;
private boolean unauthenticated;
+ private Double pullIntervalSeconds;
private String user;
private String password;
private List<Map<String, Integer>> unitIDs = new ArrayList<>();
@@ -90,17 +90,19 @@ public class OpcUa {
/**
- * Constructor for security level {@code None} and OPC server given by url
+ * Constructor for security level {@code None}, OPC server given by url and subscription-based
*
* @param opcServerURL complete OPC UA server url
* @param namespaceIndex namespace index of the given node
* @param nodeId node identifier
+ * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
* @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
*/
- public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, List<String> selectedNodeNames) {
+ public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, Double pullIntervalSeconds, List<String> selectedNodeNames) {
this.opcServerURL = opcServerURL;
this.unauthenticated = true;
+ this.pullIntervalSeconds = pullIntervalSeconds;
this.selectedNodeNames = selectedNodeNames;
if (isInteger(nodeId)) {
@@ -118,10 +120,11 @@ public class OpcUa {
* @param opcServerPort OPC UA port number
* @param namespaceIndex namespace index of the given node
* @param nodeId node identifier
+ * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
* @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
*/
- public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, List<String> selectedNodeNames) {
- this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, selectedNodeNames);
+ public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+ this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
}
/**
@@ -132,10 +135,11 @@ public class OpcUa {
* @param nodeId node identifier
* @param username username to authenticate at the OPC UA server
* @param password corresponding password to given user name
+ * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
* @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
*/
- public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, String username, String password, List<String> selectedNodeNames) {
- this(opcServerURL, namespaceIndex, nodeId, selectedNodeNames);
+ public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, String username, String password, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+ this(opcServerURL, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
this.unauthenticated = false;
this.user = username;
this.password = password;
@@ -150,10 +154,11 @@ public class OpcUa {
* @param nodeId node identifier
* @param username username to authenticate at the OPC UA server
* @param password corresponding password to given user name
+ * @param pullIntervalSeconds duration of pull interval in seconds, {@code null} if in subscription mode
* @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
*/
- public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, String username, String password, List<String> selectedNodeNames) {
- this (opcServer, opcServerPort, namespaceIndex, nodeId, selectedNodeNames);
+ public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, String username, String password, Double pullIntervalSeconds, List<String> selectedNodeNames) {
+ this (opcServer, opcServerPort, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
this.unauthenticated = false;
this.user = username;
this.password = password;
@@ -172,9 +177,15 @@ public class OpcUa {
int namespaceIndex = extractor.singleValueParameter(OpcUaLabels.NAMESPACE_INDEX.name(), int.class);
String nodeId = extractor.singleValueParameter(OpcUaLabels.NODE_ID.name(), String.class);
+ boolean usePullMode = extractor.selectedAlternativeInternalId(OpcUaLabels.ADAPTER_TYPE.name()).equals(OpcUaLabels.PULL_MODE.name());
boolean useURL = selectedAlternativeConnection.equals(OpcUaLabels.OPC_URL.name());
boolean unauthenticated = selectedAlternativeAuthentication.equals(OpcUaLabels.UNAUTHENTICATED.name());
+ Double pullIntervalSeconds = null;
+ if (usePullMode) {
+ pullIntervalSeconds = extractor.singleValueParameter(OpcUaLabels.PULLING_INTERVAL.name(), Double.class);
+ }
+
List<String> selectedNodeNames = extractor.selectedMultiValues(OpcUaLabels.AVAILABLE_NODES.name(), String.class);
if (useURL && unauthenticated){
@@ -182,14 +193,14 @@ public class OpcUa {
String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
- return new OpcUa(serverAddress, namespaceIndex, nodeId, selectedNodeNames);
+ return new OpcUa(serverAddress, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
} else if(!useURL && unauthenticated){
String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
- return new OpcUa(serverAddress, port, namespaceIndex, nodeId, selectedNodeNames);
+ return new OpcUa(serverAddress, port, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
} else {
String username = extractor.singleValueParameter(OpcUaLabels.USERNAME.name(), String.class);
@@ -199,13 +210,13 @@ public class OpcUa {
String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
- return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, selectedNodeNames);
+ return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
} else {
String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
- return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, selectedNodeNames);
+ return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
}
}
@@ -465,10 +476,10 @@ public class OpcUa {
/***
* Register subscriptions for given OPC UA nodes
* @param nodes List of {@link org.eclipse.milo.opcua.stack.core.types.builtin.NodeId}
- * @param opcUaSubscriptionAdapter current instance of {@link OpcUaSubscriptionAdapter}
+ * @param opcUaAdapter current instance of {@link OpcUaAdapter}
* @throws Exception
*/
- public void createListSubscription(List<NodeId> nodes, OpcUaSubscriptionAdapter opcUaSubscriptionAdapter) throws Exception {
+ public void createListSubscription(List<NodeId> nodes, OpcUaAdapter opcUaAdapter) throws Exception {
/*
* create a subscription @ 1000ms
*/
@@ -513,7 +524,7 @@ public class OpcUa {
BiConsumer<UaMonitoredItem, Integer> onItemCreated =
(item, id) -> {
- item.setValueConsumer(opcUaSubscriptionAdapter::onSubscriptionValue);
+ item.setValueConsumer(opcUaAdapter::onSubscriptionValue);
};
List<UaMonitoredItem> items = subscription.createMonitoredItems(
@@ -551,4 +562,12 @@ public class OpcUa {
public String getOpcServerURL() {
return opcServerURL;
}
+
+ public boolean inPullMode() {
+ return !(this.pullIntervalSeconds == null);
+ }
+
+ public double getPullIntervalSeconds() {
+ return this.pullIntervalSeconds;
+ }
}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
similarity index 60%
rename from streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
rename to streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
index f218040..a784997 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaAdapter.java
@@ -19,7 +19,9 @@ import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import java.util.*;
@@ -27,66 +29,119 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
+public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
- public static final String ID = "org.apache.streampipes.connect.adapters.opcua.pull";
+ public static final String ID = "org.apache.streampipes.connect.adapters.opcua";
- private int pollingIntervalInSeconds;
+ private double pullingIntervalInSeconds;
private OpcUa opcUa;
private List<OpcNode> allNodes;
+ private List<NodeId> allNodeIds;
+ private int numberProperties;
+ private Map<String, Object> event;
- public OpcUaPullAdapter() {
+ public OpcUaAdapter() {
super();
+ this.numberProperties = 0;
+ this.event = new HashMap<>();
}
- public OpcUaPullAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
+ public OpcUaAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
super(adapterStreamDescription);
+ this.numberProperties = 0;
+ this.event = new HashMap<>();
}
@Override
protected void before() throws AdapterException {
- StaticPropertyExtractor extractor =
- StaticPropertyExtractor.from(this.adapterDescription.getConfig(), new ArrayList<>());
-
- this.pollingIntervalInSeconds = extractor.singleValueParameter(OpcUaLabels.POLLING_INTERVAL.name(), int.class);
-
this.opcUa = OpcUa.from(this.adapterDescription);
+ this.allNodeIds = new ArrayList<>();
try {
this.opcUa.connect();
this.allNodes = this.opcUa.browseNode(true);
+
+
+ for (OpcNode node : this.allNodes) {
+ this.allNodeIds.add(node.nodeId);
+ }
+
+ if (opcUa.inPullMode()) {
+ this.pullingIntervalInSeconds = opcUa.getPullIntervalSeconds();
+ } else {
+ this.numberProperties = this.allNodeIds.size();
+ this.opcUa.createListSubscription(this.allNodeIds, this);
+ }
+
+
} catch (Exception e) {
throw new AdapterException("The Connection to the OPC UA server could not be established.");
}
+ }
+
+ @Override
+ public void startAdapter() throws AdapterException {
+ this.before();
+ super.startAdapter();
+ }
+ @Override
+ public void stopAdapter() throws AdapterException {
+ // close connection
+ this.opcUa.disconnect();
}
@Override
protected void pullData() {
- Map<String, Object> event = new HashMap<>();
+ if (opcUa.inPullMode()) {
- for (OpcNode opcNode : this.allNodes) {
- CompletableFuture response = this.opcUa.getClient().readValue(0, TimestampsToReturn.Both, opcNode.getNodeId());
+ CompletableFuture<List<DataValue>> response = this.opcUa.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
try {
- Object value = ((DataValue) response.get()).getValue().getValue();
+ List<DataValue> returnValues = response.get();
+ for (int i = 0; i<returnValues.size(); i++) {
- event.put(opcNode.getLabel(), value);
+ Object value = returnValues.get(i).getValue().getValue();
+ this.event.put(this.allNodes.get(i).getLabel(), value);
- } catch (InterruptedException | ExecutionException ie) {
+ }
+ } catch (InterruptedException | ExecutionException ie) {
ie.printStackTrace();
- }
+ }
+
+ adapterPipeline.process(this.event);
+
}
- adapterPipeline.process(event);
+ }
+
+ public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
+ String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
+
+ OpcNode currNode = this.allNodes.stream()
+ .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
+ .findFirst()
+ .orElse(null);
+
+ event.put(currNode.getLabel(), value.getValue().getValue());
+
+ // ensure that event is complete and all opc ua subscriptions transmitted at least one value
+ if (event.keySet().size() >= this.numberProperties) {
+ Map <String, Object> newEvent = new HashMap<>();
+ // deep copy of event to prevent preprocessor error
+ for (String k : event.keySet()) {
+ newEvent.put(k, event.get(k));
+ }
+ adapterPipeline.process(newEvent);
+ }
}
@Override
protected PollingSettings getPollingInterval() {
- return PollingSettings.from(TimeUnit.SECONDS, this.pollingIntervalInSeconds);
+ return PollingSettings.from(TimeUnit.MILLISECONDS, (int) this.pullingIntervalInSeconds * 1000);
}
@Override
@@ -97,7 +152,10 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withLocales(Locales.EN)
.category(AdapterType.Generic, AdapterType.Manufacturing)
- .requiredIntegerParameter(Labels.withId(OpcUaLabels.POLLING_INTERVAL.name()))
+ .requiredAlternatives(Labels.withId(OpcUaLabels.ADAPTER_TYPE.name()),
+ Alternatives.from(Labels.withId(OpcUaLabels.PULL_MODE.name()),
+ StaticProperties.integerFreeTextProperty(Labels.withId(OpcUaLabels.PULLING_INTERVAL.name()))),
+ Alternatives.from(Labels.withId(OpcUaLabels.SUBSCRIPTION_MODE.name())))
.requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
@@ -123,7 +181,7 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
.requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
.requiredMultiValueSelectionFromContainer(
Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
- Arrays.asList(OpcUaLabels.POLLING_INTERVAL.name(), OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
+ Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
)
.build();
@@ -134,7 +192,7 @@ public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerPr
@Override
public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
- return new OpcUaPullAdapter(adapterDescription);
+ return new OpcUaAdapter(adapterDescription);
}
@Override
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java
deleted file mode 100644
index 51a389a..0000000
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaSubscriptionAdapter.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.connect.adapters.opcua;
-
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.connect.adapter.exception.AdapterException;
-import org.apache.streampipes.connect.adapter.exception.ParseException;
-import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
-import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.AdapterType;
-import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.sdk.StaticProperties;
-import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
-import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
-import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
-
-import java.util.*;
-
-public class OpcUaSubscriptionAdapter extends SpecificDataStreamAdapter implements ResolvesContainerProvidedOptions {
-
- public static final String ID = "org.apache.streampipes.connect.adapters.opcua.subscription";
-
- private Map<String, Object> event;
- private List<OpcNode> allNodes;
- private OpcUa opcUa;
-
- private int numberProperties;
-
- public OpcUaSubscriptionAdapter() {
- this.event = new HashMap<>();
- this.numberProperties = 0;
- }
-
- public OpcUaSubscriptionAdapter(SpecificAdapterStreamDescription adapterDescription) {
- super(adapterDescription);
-
- this.opcUa = OpcUa.from(this.adapterDescription);
-
- this.event = new HashMap<>();
- this.numberProperties = 0;
- }
-
- @Override
- public SpecificAdapterStreamDescription declareModel() {
-
- SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .category(AdapterType.Generic, AdapterType.Manufacturing)
- .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
- Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
- Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
- StaticProperties.group(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
- StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
- StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name())))))
- .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
- Alternatives.from(Labels.withId(OpcUaLabels.OPC_URL.name()),
- StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name()))
- ),
- Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
- StaticProperties.group(Labels.withId("host-port"),
- StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
- StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name())))))
- .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
- .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
- .requiredMultiValueSelectionFromContainer(
- Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
- Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
- )
- .build();
- description.setAppId(ID);
-
-
- return description;
- }
-
- public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
-
- String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
-
- OpcNode currNode = this.allNodes.stream()
- .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
- .findFirst()
- .orElse(null);
-
- event.put(currNode.getLabel(), value.getValue().getValue());
-
- // ensure that event is complete and all opc ua subscriptions transmitted at least one value
- if (event.keySet().size() >= this.numberProperties) {
- Map <String, Object> newEvent = new HashMap<>();
- // deep copy of event to prevent preprocessor error
- for (String k : event.keySet()) {
- newEvent.put(k, event.get(k));
- }
- adapterPipeline.process(newEvent);
- }
- }
-
-
- @Override
- public void startAdapter() throws AdapterException {
-
- this.opcUa = OpcUa.from(this.adapterDescription);
-
- try {
- this.opcUa.connect();
-
- this.allNodes = this.opcUa.browseNode(true);
-
- List<NodeId> nodeIds = new ArrayList<>();
-
- for (OpcNode rd : this.allNodes) {
- nodeIds.add(rd.nodeId);
- }
-
- this.numberProperties = nodeIds.size();
- this.opcUa.createListSubscription(nodeIds, this);
- } catch (Exception e) {
- throw new AdapterException("Could not connect to OPC-UA server! Server: " + this.opcUa.getOpcServerURL());
- }
- }
-
- @Override
- public void stopAdapter() throws AdapterException {
- // close connection
- this.opcUa.disconnect();
- }
-
- @Override
- public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
- return new OpcUaSubscriptionAdapter(adapterDescription);
- }
-
- @Override
- public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
-
- return OpcUaUtil.getSchema(adapterDescription);
- }
-
- @Override
- public String getId() {
- return ID;
- }
-
- @Override
- public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
-
- return OpcUaUtil.resolveOptions(requestId, parameterExtractor);
- }
-}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
index 77b8a82..55aecb3 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/utils/OpcUaUtil.java
@@ -151,6 +151,9 @@ public class OpcUaUtil {
PASSWORD,
UNAUTHENTICATED,
AVAILABLE_NODES,
- POLLING_INTERVAL;
+ PULLING_INTERVAL,
+ ADAPTER_TYPE,
+ PULL_MODE,
+ SUBSCRIPTION_MODE;
}
}
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png
deleted file mode 100644
index ab68d43..0000000
Binary files a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/icon.png and /dev/null differ
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md
deleted file mode 100644
index 8cc959a..0000000
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/documentation.md
+++ /dev/null
@@ -1,66 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-
-## OPC-UA Subscription Adapter
-
-<p align="center">
- <img src="icon.png" width="150px;" class="pe-image-documentation"/>
-</p>
-
-***
-
-## Description
-
-Read values from an OPC-UA server on value changes
-
-***
-
-## Required Input
-
-***
-
-## Configuration
-
-### Anonymous vs. Username/Password
-
-Choose whether you want to connect anonymously or authenticate using your credentials.
-
- **Anonymous**: No further information required <br>
- **Username/Password**: Insert your `username` and `password` to access the OPC UA server
-
-### OPC UA Server
-
-Where can the OPC UA server be found?
-
- **URL**: Specify the server's full `URL` (including port), can be with our without leading `opc.tcp://`<br>
- **Host/Port**: Insert the `host` address (with or without leading `opc.tcp://`) and the `port`<br>
-
-### Namespace Index
-
-Requires the index of the namespace you want to connect to.
-
-### Node ID
-
-The identifier of the node you want to read from, numbers and strings are both valid.
-
-### Available Nodes
-
-Shows all available nodes once namespace index and node ID are given.
-Select as much as you like to query.
-
-***
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
deleted file mode 100644
index 84c1974..0000000
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/strings.en
+++ /dev/null
@@ -1,46 +0,0 @@
-org.apache.streampipes.connect.adapters.opcua.subscription.title=OPC UA Subscription
-org.apache.streampipes.connect.adapters.opcua.subscription.description=Reads values from an OPC-UA server only on changes
-
-
-OPC_HOST_OR_URL.title=OPC Server
-OPC_HOST_OR_URL.description=
-
-OPC_URL.title=URL
-OPC_URL.description=
-
-OPC_HOST.title=Host/Port
-OPC_HOST.description=
-
-OPC_SERVER_URL.title=URL
-OPC_SERVER_URL.description=Example: opc.tcp://test-server.com:4840,
-
-OPC_SERVER_HOST.title=Host
-OPC_SERVER_HOST.description=Example: test-server.com, opc.tcp://test-server.com)
-
-OPC_SERVER_PORT.title=Port
-OPC_SERVER_PORT.description=Example: 4840
-
-NAMESPACE_INDEX.title=Namespace Index
-NAMESPACE.INDEX.description=Example: 2
-
-NODE_ID.title=Node ID
-NODE_ID.description=Id of the Node to read the values from
-
-
-ACCESS_MODE.title=Security Level
-ACCESS_MODE.description=Select the OPC UA security level for the connection
-
-USERNAME_GROUP.title=Sign (username & password)
-USERNAME_GROUP.description=
-
-UNAUTHENTICATED.title=None
-UNAUTHENTICATED.description=
-
-USERNAME.title=Username
-USERNAME.description=
-
-PASSWORD.title=Password
-PASSWORD.description=
-
-AVAILABLE_NODES.title=Available Nodes
-AVAILABLE_NODES.description=Select the nodes that are relevant for you. Please ensure to select one option each for access mode and OPC UA server
\ No newline at end of file
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/documentation.md b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/documentation.md
similarity index 100%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/documentation.md
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/documentation.md
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/icon.png b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/icon.png
similarity index 100%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.subscription/icon.png
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/icon.png
diff --git a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
similarity index 68%
rename from streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
rename to streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
index 72ec8f8..240ffdb 100644
--- a/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua.pull/strings.en
+++ b/streampipes-connect-adapters/src/main/resources/org.apache.streampipes.connect.adapters.opcua/strings.en
@@ -1,5 +1,5 @@
-org.apache.streampipes.connect.adapters.opcua.pull.title=OPC UA Pull
-org.apache.streampipes.connect.adapters.opcua.pull.description=Reads values from an OPC-UA server repeatedly
+org.apache.streampipes.connect.adapters.opcua.title=OPC UA
+org.apache.streampipes.connect.adapters.opcua.description=Reads values from an OPC-UA server
OPC_HOST_OR_URL.title=OPC Server
@@ -45,5 +45,14 @@ PASSWORD.description=
AVAILABLE_NODES.title=Available Nodes
AVAILABLE_NODES.description=Select the nodes that are relevant for you. Please ensure to select one option each for access mode and OPC UA server and to specify the polling interval
-POLLING_INTERVAL.title=Polling Interval
-POLLING_INTERVAL.description=Timespan between data are polled, in seconds
\ No newline at end of file
+PULLING_INTERVAL.title=Pull Interval
+PULLING_INTERVAL.description=Timespan between data are pulled, in seconds
+
+ADAPTER_TYPE.title=Adapter Type
+ADAPTER_TYPE.description=Select whether the adapter pulls data regularly or listens to subscriptions
+
+PULL_MODE.title=Pull mode
+PULL_MODE.description=
+
+SUBSCRIPTION_MODE.title=Subscription mode
+SUBSCRIPTION_MODE.description=
\ No newline at end of file
diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index 6e06b5d..dd67855 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -31,8 +31,7 @@ import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
@@ -246,8 +245,7 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
.add(new WikipediaEditedArticlesAdapter())
.add(new WikipediaNewArticlesAdapter())
.add(new RosBridgeAdapter())
- .add(new OpcUaSubscriptionAdapter())
- .add(new OpcUaPullAdapter())
+ .add(new OpcUaAdapter())
.add(new InfluxDbStreamAdapter())
.add(new InfluxDbSetAdapter())
.add(new TISensorTag())