You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2021/03/02 14:25:04 UTC

[incubator-streampipes-extensions] 03/06: [STREAMPIPES-306] create OPC Pull Adapter

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 61396e5fae4c72de64f54c5bbc9f706794d25769
Author: bossenti <bo...@posteo.de>
AuthorDate: Tue Mar 2 15:00:02 2021 +0100

    [STREAMPIPES-306] create OPC Pull Adapter
---
 .../streampipes/connect/ConnectAdapterInit.java    |   6 +-
 .../connect/adapters/opcua/OpcUaPullAdapter.java   | 158 +++++++++++++++++++++
 .../extensions/all/jvm/AllExtensionsInit.java      |   6 +-
 3 files changed, 166 insertions(+), 4 deletions(-)

diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
index aa78a07..34b0e20 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/ConnectAdapterInit.java
@@ -24,6 +24,7 @@ import org.apache.streampipes.connect.adapters.iss.IssAdapter;
 import org.apache.streampipes.connect.adapters.flic.FlicMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
 import org.apache.streampipes.connect.adapters.simulator.machine.MachineDataStreamAdapter;
 import org.apache.streampipes.connect.adapters.ti.TISensorTag;
@@ -36,7 +37,7 @@ import org.apache.streampipes.connect.adapters.influxdb.InfluxDbSetAdapter;
 import org.apache.streampipes.connect.adapters.influxdb.InfluxDbStreamAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
 import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
 import org.apache.streampipes.connect.adapters.simulator.random.RandomDataSetAdapter;
@@ -82,7 +83,8 @@ public class ConnectAdapterInit extends AdapterWorkerContainer {
             .add(new WikipediaEditedArticlesAdapter())
             .add(new WikipediaNewArticlesAdapter())
             .add(new RosBridgeAdapter())
-            .add(new OpcUaAdapter())
+            .add(new OpcUaSubscriptionAdapter())
+            .add(new OpcUaPullAdapter())
             .add(new InfluxDbStreamAdapter())
             .add(new InfluxDbSetAdapter())
             .add(new TISensorTag())
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
new file mode 100644
index 0000000..f218040
--- /dev/null
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/opcua/OpcUaPullAdapter.java
@@ -0,0 +1,158 @@
+package org.apache.streampipes.connect.adapters.opcua;
+
+import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.exception.AdapterException;
+import org.apache.streampipes.connect.adapter.exception.ParseException;
+import org.apache.streampipes.connect.adapter.util.PollingSettings;
+import org.apache.streampipes.connect.adapters.PullAdapter;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.connect.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
+import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.model.AdapterType;
+import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.apache.streampipes.model.connect.guess.GuessSchema;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.Alternatives;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class OpcUaPullAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
+
+    public static final String ID = "org.apache.streampipes.connect.adapters.opcua.pull";
+
+    private int pollingIntervalInSeconds;
+    private OpcUa opcUa;
+    private List<OpcNode> allNodes;
+
+    public OpcUaPullAdapter() {
+        super();
+    }
+
+    public OpcUaPullAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
+        super(adapterStreamDescription);
+    }
+
+    @Override
+    protected void before() throws AdapterException {
+
+        StaticPropertyExtractor extractor =
+                StaticPropertyExtractor.from(this.adapterDescription.getConfig(), new ArrayList<>());
+
+        this.pollingIntervalInSeconds = extractor.singleValueParameter(OpcUaLabels.POLLING_INTERVAL.name(), int.class);
+
+        this.opcUa = OpcUa.from(this.adapterDescription);
+
+        try {
+            this.opcUa.connect();
+            this.allNodes = this.opcUa.browseNode(true);
+        } catch (Exception e) {
+            throw new AdapterException("The Connection to the OPC UA server could not be established.");
+        }
+
+    }
+
+    @Override
+    protected void pullData() {
+
+        Map<String, Object> event = new HashMap<>();
+
+        for (OpcNode opcNode : this.allNodes) {
+            CompletableFuture response = this.opcUa.getClient().readValue(0, TimestampsToReturn.Both, opcNode.getNodeId());
+            try {
+
+                Object value = ((DataValue) response.get()).getValue().getValue();
+
+                event.put(opcNode.getLabel(), value);
+
+            } catch (InterruptedException | ExecutionException ie) {
+                ie.printStackTrace();
+            }
+        }
+
+        adapterPipeline.process(event);
+
+    }
+
+    @Override
+    protected PollingSettings getPollingInterval() {
+        return PollingSettings.from(TimeUnit.SECONDS, this.pollingIntervalInSeconds);
+    }
+
+    @Override
+    public SpecificAdapterStreamDescription declareModel() {
+
+        SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder
+                .create(ID)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .withLocales(Locales.EN)
+                .category(AdapterType.Generic, AdapterType.Manufacturing)
+                .requiredIntegerParameter(Labels.withId(OpcUaLabels.POLLING_INTERVAL.name()))
+                .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
+                        Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
+                        Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
+                                StaticProperties.group(
+                                        Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
+                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
+                                        StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name()))
+                                ))
+                )
+                .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
+                        Alternatives.from(
+                                Labels.withId(OpcUaLabels.OPC_URL.name()),
+                                StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name())))
+                        ,
+                        Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
+                                StaticProperties.group(
+                                        Labels.withId("host-port"),
+                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
+                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name()))
+                                ))
+                )
+                .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
+                .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
+                .requiredMultiValueSelectionFromContainer(
+                        Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
+                        Arrays.asList(OpcUaLabels.POLLING_INTERVAL.name(), OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
+                )
+                .build();
+
+        description.setAppId(ID);
+
+        return description;
+    }
+
+    @Override
+    public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
+        return new OpcUaPullAdapter(adapterDescription);
+    }
+
+    @Override
+    public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
+
+        return OpcUaUtil.getSchema(adapterDescription);
+    }
+
+    @Override
+    public String getId() {
+        return ID;
+    }
+
+    @Override
+    public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
+
+        return OpcUaUtil.resolveOptions(requestId, parameterExtractor);
+
+    }
+
+}
diff --git a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
index dd67855..6e06b5d 100644
--- a/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
+++ b/streampipes-extensions-all-jvm/src/main/java/org/apache/streampipes/extensions/all/jvm/AllExtensionsInit.java
@@ -31,7 +31,8 @@ import org.apache.streampipes.connect.adapters.mysql.MySqlSetAdapter;
 import org.apache.streampipes.connect.adapters.mysql.MySqlStreamAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioMQTTAdapter;
 import org.apache.streampipes.connect.adapters.netio.NetioRestAdapter;
-import org.apache.streampipes.connect.adapters.opcua.OpcUaAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaSubscriptionAdapter;
+import org.apache.streampipes.connect.adapters.opcua.OpcUaPullAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.modbus.Plc4xModbusAdapter;
 import org.apache.streampipes.connect.adapters.plc4x.s7.Plc4xS7Adapter;
 import org.apache.streampipes.connect.adapters.ros.RosBridgeAdapter;
@@ -245,7 +246,8 @@ public class AllExtensionsInit extends ExtensionsModelSubmitter {
                 .add(new WikipediaEditedArticlesAdapter())
                 .add(new WikipediaNewArticlesAdapter())
                 .add(new RosBridgeAdapter())
-                .add(new OpcUaAdapter())
+                .add(new OpcUaSubscriptionAdapter())
+                .add(new OpcUaPullAdapter())
                 .add(new InfluxDbStreamAdapter())
                 .add(new InfluxDbSetAdapter())
                 .add(new TISensorTag())