You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/08 08:42:29 UTC

[incubator-streampipes] 02/04: [hotfix] Add more logging to OPC-UA adapter

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit d72d5989c9c0824f3511d9f559545f6beda8f0d9
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Aug 7 21:00:28 2022 +0200

    [hotfix] Add more logging to OPC-UA adapter
---
 .../connect/iiot/adapters/opcua/OpcUaAdapter.java  | 327 +++++++++++----------
 1 file changed, 172 insertions(+), 155 deletions(-)

diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
index 0faffd00e..0538a0c7d 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
@@ -41,7 +41,10 @@ import org.apache.streampipes.sdk.utils.Assets;
 import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
 import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
 import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -50,185 +53,199 @@ import java.util.concurrent.TimeUnit;
 
 public class OpcUaAdapter extends PullAdapter implements SupportsRuntimeConfig {
 
-    public static final String ID = "org.apache.streampipes.connect.iiot.adapters.opcua";
+  public static final String ID = "org.apache.streampipes.connect.iiot.adapters.opcua";
+  private static final Logger LOG = LoggerFactory.getLogger(OpcUaAdapter.class);
 
-    private int pullingIntervalMilliSeconds;
-    private SpOpcUaClient spOpcUaClient;
-    private List<OpcNode> allNodes;
-    private List<NodeId> allNodeIds;
-    private int numberProperties;
-    private Map<String, Object> event;
+  private int pullingIntervalMilliSeconds;
+  private SpOpcUaClient spOpcUaClient;
+  private List<OpcNode> allNodes;
+  private List<NodeId> allNodeIds;
+  private int numberProperties;
+  private Map<String, Object> event;
 
-    public OpcUaAdapter() {
-        super();
-        this.numberProperties = 0;
-        this.event = new HashMap<>();
-    }
+  public OpcUaAdapter() {
+    super();
+    this.numberProperties = 0;
+    this.event = new HashMap<>();
+  }
 
-    public OpcUaAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
-        super(adapterStreamDescription);
-        this.numberProperties = 0;
-        this.event = new HashMap<>();
-    }
+  public OpcUaAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
+    super(adapterStreamDescription);
+    this.numberProperties = 0;
+    this.event = new HashMap<>();
+  }
 
-    @Override
-    protected void before() throws AdapterException {
+  @Override
+  protected void before() throws AdapterException {
 
-        this.allNodeIds = new ArrayList<>();
-        try {
-            this.spOpcUaClient.connect();
-            OpcUaNodeBrowser browserClient = new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), this.spOpcUaClient.getSpOpcConfig());
-            this.allNodes = browserClient.findNodes();
+    this.allNodeIds = new ArrayList<>();
+    try {
+      this.spOpcUaClient.connect();
+      OpcUaNodeBrowser browserClient = new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), this.spOpcUaClient.getSpOpcConfig());
+      this.allNodes = browserClient.findNodes();
 
 
-                for (OpcNode node : this.allNodes) {
-                    this.allNodeIds.add(node.nodeId);
-                }
+      for (OpcNode node : this.allNodes) {
+        this.allNodeIds.add(node.nodeId);
+      }
 
-            if (spOpcUaClient.inPullMode()) {
-                this.pullingIntervalMilliSeconds = spOpcUaClient.getPullIntervalMilliSeconds();
-            } else {
-                this.numberProperties = this.allNodeIds.size();
-                this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
-            }
+      if (spOpcUaClient.inPullMode()) {
+        this.pullingIntervalMilliSeconds = spOpcUaClient.getPullIntervalMilliSeconds();
+      } else {
+        this.numberProperties = this.allNodeIds.size();
+        this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
+      }
 
 
-        } catch (Exception e) {
-            throw new AdapterException("The Connection to the OPC UA server could not be established.", e.getCause());
-        }
+    } catch (Exception e) {
+      throw new AdapterException("The Connection to the OPC UA server could not be established.", e.getCause());
     }
+  }
 
-        @Override
-    public void startAdapter() throws AdapterException {
-
-        this.spOpcUaClient = new SpOpcUaClient(SpOpcUaConfigBuilder.from(this.adapterDescription));
-
-        if (this.spOpcUaClient.inPullMode()) {
-            super.startAdapter();
-        } else {
-            this.before();
-        }
-    }
+  @Override
+  public void startAdapter() throws AdapterException {
 
-    @Override
-    public void stopAdapter() throws AdapterException {
-        // close connection
-        this.spOpcUaClient.disconnect();
+    this.spOpcUaClient = new SpOpcUaClient(SpOpcUaConfigBuilder.from(this.adapterDescription));
 
-        if (this.spOpcUaClient.inPullMode()){
-            super.stopAdapter();
-        }
+    if (this.spOpcUaClient.inPullMode()) {
+      super.startAdapter();
+    } else {
+      this.before();
     }
+  }
 
-    @Override
-    protected void pullData() {
-        CompletableFuture<List<DataValue>> response = this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
-        try {
-        List<DataValue> returnValues = response.get();
-            for (int i = 0; i<returnValues.size(); i++) {
-
-                Object value = returnValues.get(i).getValue().getValue();
-                this.event.put(this.allNodes.get(i).getLabel(), value);
-
-            }
-         } catch (InterruptedException | ExecutionException ie) {
-            ie.printStackTrace();
-         }
-
-        adapterPipeline.process(this.event);
+  @Override
+  public void stopAdapter() throws AdapterException {
+    // close connection
+    this.spOpcUaClient.disconnect();
 
+    if (this.spOpcUaClient.inPullMode()) {
+      super.stopAdapter();
     }
-
-    public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
-
-        String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
-
-        OpcNode currNode = this.allNodes.stream()
-                .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
-                .findFirst()
-                .orElse(null);
-
-        event.put(currNode.getLabel(), value.getValue().getValue());
-
-        // ensure that event is complete and all opc ua subscriptions transmitted at least one value
-        if (event.keySet().size() >= this.numberProperties) {
-            Map <String, Object> newEvent = new HashMap<>();
-            // deep copy of event to prevent preprocessor error
-            for (String k : event.keySet()) {
-                newEvent.put(k, event.get(k));
-            }
-            adapterPipeline.process(newEvent);
+  }
+
+  @Override
+  protected void pullData() {
+    CompletableFuture<List<DataValue>> response = this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
+    boolean badStatusCodeReceived = false;
+    boolean emptyValueReceived = false;
+    try {
+      List<DataValue> returnValues = response.get();
+      if (returnValues.size() == 0) {
+        emptyValueReceived = true;
+        LOG.warn("Empty value object returned - event will not be sent");
+      } else {
+        for (int i = 0; i < returnValues.size(); i++) {
+          var status = returnValues.get(i).getStatusCode();
+          if (StatusCode.GOOD.equals(status)) {
+            Object value = returnValues.get(i).getValue().getValue();
+            this.event.put(this.allNodes.get(i).getLabel(), value);
+          } else {
+            badStatusCodeReceived = true;
+            LOG.warn("Received status code {} for node label: {} - event will not be sent",
+              status,
+              this.allNodes.get(i).getLabel());
+          }
         }
+      }
+      if (!badStatusCodeReceived && !emptyValueReceived) {
+        adapterPipeline.process(this.event);
+      }
+    } catch (InterruptedException | ExecutionException ie) {
+      LOG.error("Exception while reading data", ie);
     }
+  }
 
-    @Override
-    protected PollingSettings getPollingInterval() {
-        return PollingSettings.from(TimeUnit.MILLISECONDS, this.pullingIntervalMilliSeconds);
-    }
-
-    @Override
-    public SpecificAdapterStreamDescription declareModel() {
-
-        SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder
-                .create(ID)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .withLocales(Locales.EN)
-                .category(AdapterType.Generic, AdapterType.Manufacturing)
-                .requiredAlternatives(Labels.withId(OpcUaLabels.ADAPTER_TYPE.name()),
-                        Alternatives.from(Labels.withId(OpcUaLabels.PULL_MODE.name()),
-                                StaticProperties.integerFreeTextProperty(Labels.withId(OpcUaLabels.PULLING_INTERVAL.name()))),
-                        Alternatives.from(Labels.withId(OpcUaLabels.SUBSCRIPTION_MODE.name())))
-                .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
-                        Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
-                        Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
-                                StaticProperties.group(
-                                        Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
-                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
-                                        StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name()))
-                                ))
-                )
-                .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
-                        Alternatives.from(
-                                Labels.withId(OpcUaLabels.OPC_URL.name()),
-                                StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name()), "opc.tcp://localhost:4840"))
-                        ,
-                        Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
-                                StaticProperties.group(
-                                        Labels.withId("host-port"),
-                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
-                                        StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name()))
-                                ))
-                )
-                .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
-                .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
-                .requiredRuntimeResolvableTreeInput(
-                        Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
-                        Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
-                )
-                .build();
-
-        description.setAppId(ID);
-
-        return description;
-    }
+  public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
 
-    @Override
-    public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
-        return new OpcUaAdapter(adapterDescription);
-    }
+    String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
 
-    @Override
-    public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
-        return OpcUaUtil.getSchema(adapterDescription);
-    }
+    OpcNode currNode = this.allNodes.stream()
+      .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
+      .findFirst()
+      .orElse(null);
 
-    @Override
-    public String getId() {
-        return ID;
-    }
+    event.put(currNode.getLabel(), value.getValue().getValue());
 
-    @Override
-    public StaticProperty resolveConfiguration(String staticPropertyInternalName, StaticPropertyExtractor extractor) {
-        return OpcUaUtil.resolveConfiguration(staticPropertyInternalName, extractor);
+    // ensure that event is complete and all opc ua subscriptions transmitted at least one value
+    if (event.keySet().size() >= this.numberProperties) {
+      Map<String, Object> newEvent = new HashMap<>();
+      // deep copy of event to prevent preprocessor error
+      for (String k : event.keySet()) {
+        newEvent.put(k, event.get(k));
+      }
+      adapterPipeline.process(newEvent);
     }
+  }
+
+  @Override
+  protected PollingSettings getPollingInterval() {
+    return PollingSettings.from(TimeUnit.MILLISECONDS, this.pullingIntervalMilliSeconds);
+  }
+
+  @Override
+  public SpecificAdapterStreamDescription declareModel() {
+
+    SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder
+      .create(ID)
+      .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+      .withLocales(Locales.EN)
+      .category(AdapterType.Generic, AdapterType.Manufacturing)
+      .requiredAlternatives(Labels.withId(OpcUaLabels.ADAPTER_TYPE.name()),
+        Alternatives.from(Labels.withId(OpcUaLabels.PULL_MODE.name()),
+          StaticProperties.integerFreeTextProperty(Labels.withId(OpcUaLabels.PULLING_INTERVAL.name()))),
+        Alternatives.from(Labels.withId(OpcUaLabels.SUBSCRIPTION_MODE.name())))
+      .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
+        Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
+        Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
+          StaticProperties.group(
+            Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
+            StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
+            StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name()))
+          ))
+      )
+      .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
+        Alternatives.from(
+          Labels.withId(OpcUaLabels.OPC_URL.name()),
+          StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name()), "opc.tcp://localhost:4840"))
+        ,
+        Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
+          StaticProperties.group(
+            Labels.withId("host-port"),
+            StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
+            StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name()))
+          ))
+      )
+      .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
+      .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
+      .requiredRuntimeResolvableTreeInput(
+        Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
+        Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
+      )
+      .build();
+
+    description.setAppId(ID);
+
+    return description;
+  }
+
+  @Override
+  public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
+    return new OpcUaAdapter(adapterDescription);
+  }
+
+  @Override
+  public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
+    return OpcUaUtil.getSchema(adapterDescription);
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public StaticProperty resolveConfiguration(String staticPropertyInternalName, StaticPropertyExtractor extractor) {
+    return OpcUaUtil.resolveConfiguration(staticPropertyInternalName, extractor);
+  }
 }