You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/25 14:38:30 UTC

[incubator-streampipes] 02/03: [hotfix] Improve resilience of OPC-UA adapter in subscription mode

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 9f82908804d5ae563b52cba9d15d3b0039937b1e
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Aug 25 15:57:18 2022 +0200

    [hotfix] Improve resilience of OPC-UA adapter in subscription mode
---
 .../streampipes-connect-adapters-iiot/pom.xml      |  4 ----
 .../connect/iiot/adapters/opcua/SpOpcUaClient.java | 22 +++++++++++++++++++++-
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
index d89c75c85..5b1d7ad31 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/pom.xml
@@ -66,10 +66,6 @@
             <groupId>com.fasterxml.jackson.module</groupId>
             <artifactId>jackson-module-jaxb-annotations</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.github.shyiko</groupId>
-            <artifactId>mysql-binlog-connector-java</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.influxdb</groupId>
             <artifactId>influxdb-java</artifactId>
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/SpOpcUaClient.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/SpOpcUaClient.java
index 6e7b9af8f..299c4f04f 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/SpOpcUaClient.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/SpOpcUaClient.java
@@ -25,11 +25,13 @@ import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
 import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
+import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
 import org.eclipse.milo.opcua.stack.core.AttributeId;
 import org.eclipse.milo.opcua.stack.core.UaException;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
@@ -93,7 +95,25 @@ public class SpOpcUaClient {
      * @param opcUaAdapter current instance of {@link OpcUaAdapter}
      * @throws Exception
      */
-    public void createListSubscription(List<NodeId> nodes, OpcUaAdapter opcUaAdapter) throws Exception {
+    public void createListSubscription(List<NodeId> nodes,
+                                       OpcUaAdapter opcUaAdapter) throws Exception {
+        client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
+            @Override
+            public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
+                LOG.warn("Transfer for subscriptionId={} failed: {}", subscription.getSubscriptionId(), statusCode);
+                try {
+                    initSubscription(nodes, opcUaAdapter);
+                } catch (Exception e) {
+                    LOG.error("Re-creating the subscription failed", e);
+                }
+            }
+        });
+
+        initSubscription(nodes, opcUaAdapter);
+    }
+
+
+    public void initSubscription(List<NodeId> nodes, OpcUaAdapter opcUaAdapter) throws Exception {
         /*
          * create a subscription @ 1000ms
          */