You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/02 14:25:04 UTC
[incubator-streampipes-extensions] 03/06: [STREAMPIPES-306] create
OPC Pull Adapter
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit 61396e5fae4c72de64f54c5bbc9f706794d25769
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Mar 2 15:00:02 2021 +0100
[STREAMPIPES-306] create OPC Pull Adapter
---
.../streampipes/connect/ConnectAdapterInit.java | 6 +-
.../connect/adapters/opcua/OpcUaPullAdapter.java | 158 +++++++++++++++++++++
.../extensions/all/jvm/AllExtensionsInit.java | 6 +-
3 files changed, 166 insertions(+), 4 deletions(-)
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index aa78a07..34b0e20 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.connect.adapters.iss.IssAdapter;
import org.apache.streampipes.connect.adapters.flic.FlicMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
import org.apache.streampipes.connect.adapters.simulator.machine.MachineDataStreamAdapter;
import org.apache.streampipes.connect.adapters.ti.TISensorTag;
@@ -36,7 +37,7 @@ import org.apache.streampipes.connect.adapters.influxdb.InfluxDbSetAdapter;
import org.apache.streampipes.connect.adapters.influxdb.InfluxDbStreamAdapter;
import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
import org.apache.streampipes.connect.adapters.simulator.random.RandomDataSetAdapter;
@@ -82,7 +83,8 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
.add(new WikipediaEditedArticlesAdapter())
.add(new WikipediaNewArticlesAdapter())
.add(new RosBridgeAdapter())
- .add(new OpcUaAdapter())
+ .add(new OpcUaSubscriptionAdapter())
+ .add(new OpcUaPullAdapter())
.add(new InfluxDbStreamAdapter())
.add(new InfluxDbSetAdapter())
.add(new TISensorTag())
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
new file mode 100644
index 0000000..f218040
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
@@ -0,0 +1,158 @@
+package org.apache.streampipes.connect.adapters.opcua;
+
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.adapter.util.PollingSettings;
+import org.apache.streampipes.connect.adapters.PullAdapter;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
+import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
+
+ public static final String ID = "org.apache.streampipes.connect.adapters.opcua.pull";
+
+ private int pollingIntervalInSeconds;
+ private OpcUa opcUa;
+ private List<OpcNode> allNodes;
+
+ public OpcUaPullAdapter() {
+ super();
+ }
+
+ public OpcUaPullAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
+ super(adapterStreamDescription);
+ }
+
+ @Override
+ protected void before() throws AdapterException {
+
+ StaticPropertyExtractor extractor =
+ StaticPropertyExtractor.from(this.adapterDescription.getConfig(), new ArrayList<>());
+
+ this.pollingIntervalInSeconds = extractor.singleValueParameter(OpcUaLabels.POLLING_INTERVAL.name(), int.class);
+
+ this.opcUa = OpcUa.from(this.adapterDescription);
+
+ try {
+ this.opcUa.connect();
+ this.allNodes = this.opcUa.browseNode(true);
+ } catch (Exception e) {
+ throw new AdapterException("The Connection to the OPC UA server could not be established.");
+ }
+
+ }
+
+ @Override
+ protected void pullData() {
+
+ Map<String, Object> event = new HashMap<>();
+
+ for (OpcNode opcNode : this.allNodes) {
+ CompletableFuture response = this.opcUa.getClient().readValue(0, TimestampsToReturn.Both, opcNode.getNodeId());
+ try {
+
+ Object value = ((DataValue) response.get()).getValue().getValue();
+
+ event.put(opcNode.getLabel(), value);
+
+ } catch (InterruptedException | ExecutionException ie) {
+ ie.printStackTrace();
+ }
+ }
+
+ adapterPipeline.process(event);
+
+ }
+
+ @Override
+ protected PollingSettings getPollingInterval() {
+ return PollingSettings.from(TimeUnit.SECONDS, this.pollingIntervalInSeconds);
+ }
+
+ @Override
+ public SpecificAdapterStreamDescription declareModel() {
+
+ SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder
+ .create(ID)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .category(AdapterType.Generic, AdapterType.Manufacturing)
+ .requiredIntegerParameter(Labels.withId(OpcUaLabels.POLLING_INTERVAL.name()))
+ .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
+ Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
+ Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
+ StaticProperties.group(
+ Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
+ StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
+ StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name()))
+ ))
+ )
+ .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
+ Alternatives.from(
+ Labels.withId(OpcUaLabels.OPC_URL.name()),
+ StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name())))
+ ,
+ Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
+ StaticProperties.group(
+ Labels.withId("host-port"),
+ StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
+ StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name()))
+ ))
+ )
+ .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
+ .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
+ .requiredMultiValueSelectionFromContainer(
+ Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
+ Arrays.asList(OpcUaLabels.POLLING_INTERVAL.name(), OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
+ )
+ .build();
+
+ description.setAppId(ID);
+
+ return description;
+ }
+
+ @Override
+ public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
+ return new OpcUaPullAdapter(adapterDescription);
+ }
+
+ @Override
+ public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
+
+ return OpcUaUtil.getSchema(adapterDescription);
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
+
+ return OpcUaUtil.resolveOptions(requestId, parameterExtractor);
+
+ }
+
+}
diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index dd67855..6e06b5d 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -31,7 +31,8 @@ import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
@@ -245,7 +246,8 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
.add(new WikipediaEditedArticlesAdapter())
.add(new WikipediaNewArticlesAdapter())
.add(new RosBridgeAdapter())
- .add(new OpcUaAdapter())
+ .add(new OpcUaSubscriptionAdapter())
+ .add(new OpcUaPullAdapter())
.add(new InfluxDbStreamAdapter())
.add(new InfluxDbSetAdapter())
.add(new TISensorTag())