You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/08 08:42:29 UTC
[incubator-streampipes] 02/04: [hotfix] Add more logging to OPC-UA adapter
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit d72d5989c9c0824f3511d9f559545f6beda8f0d9
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Aug 7 21:00:28 2022 +0200
[hotfix] Add more logging to OPC-UA adapter
---
.../connect/iiot/adapters/opcua/OpcUaAdapter.java | 327 +++++++++++----------
1 file changed, 172 insertions(+), 155 deletions(-)
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
index 0faffd00e..0538a0c7d 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
@@ -41,7 +41,10 @@ import org.apache.streampipes.sdk.utils.Assets;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -50,185 +53,199 @@ import java.util.concurrent.TimeUnit;
public class OpcUaAdapter extends PullAdapter implements SupportsRuntimeConfig {
- public static final String ID = "org.apache.streampipes.connect.iiot.adapters.opcua";
+ public static final String ID = "org.apache.streampipes.connect.iiot.adapters.opcua";
+ private static final Logger LOG = LoggerFactory.getLogger(OpcUaAdapter.class);
- private int pullingIntervalMilliSeconds;
- private SpOpcUaClient spOpcUaClient;
- private List<OpcNode> allNodes;
- private List<NodeId> allNodeIds;
- private int numberProperties;
- private Map<String, Object> event;
+ private int pullingIntervalMilliSeconds;
+ private SpOpcUaClient spOpcUaClient;
+ private List<OpcNode> allNodes;
+ private List<NodeId> allNodeIds;
+ private int numberProperties;
+ private Map<String, Object> event;
- public OpcUaAdapter() {
- super();
- this.numberProperties = 0;
- this.event = new HashMap<>();
- }
+ public OpcUaAdapter() {
+ super();
+ this.numberProperties = 0;
+ this.event = new HashMap<>();
+ }
- public OpcUaAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
- super(adapterStreamDescription);
- this.numberProperties = 0;
- this.event = new HashMap<>();
- }
+ public OpcUaAdapter(SpecificAdapterStreamDescription adapterStreamDescription) {
+ super(adapterStreamDescription);
+ this.numberProperties = 0;
+ this.event = new HashMap<>();
+ }
- @Override
- protected void before() throws AdapterException {
+ @Override
+ protected void before() throws AdapterException {
- this.allNodeIds = new ArrayList<>();
- try {
- this.spOpcUaClient.connect();
- OpcUaNodeBrowser browserClient = new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), this.spOpcUaClient.getSpOpcConfig());
- this.allNodes = browserClient.findNodes();
+ this.allNodeIds = new ArrayList<>();
+ try {
+ this.spOpcUaClient.connect();
+ OpcUaNodeBrowser browserClient = new OpcUaNodeBrowser(this.spOpcUaClient.getClient(), this.spOpcUaClient.getSpOpcConfig());
+ this.allNodes = browserClient.findNodes();
- for (OpcNode node : this.allNodes) {
- this.allNodeIds.add(node.nodeId);
- }
+ for (OpcNode node : this.allNodes) {
+ this.allNodeIds.add(node.nodeId);
+ }
- if (spOpcUaClient.inPullMode()) {
- this.pullingIntervalMilliSeconds = spOpcUaClient.getPullIntervalMilliSeconds();
- } else {
- this.numberProperties = this.allNodeIds.size();
- this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
- }
+ if (spOpcUaClient.inPullMode()) {
+ this.pullingIntervalMilliSeconds = spOpcUaClient.getPullIntervalMilliSeconds();
+ } else {
+ this.numberProperties = this.allNodeIds.size();
+ this.spOpcUaClient.createListSubscription(this.allNodeIds, this);
+ }
- } catch (Exception e) {
- throw new AdapterException("The Connection to the OPC UA server could not be established.", e.getCause());
- }
+ } catch (Exception e) {
+ throw new AdapterException("The Connection to the OPC UA server could not be established.", e.getCause());
}
+ }
- @Override
- public void startAdapter() throws AdapterException {
-
- this.spOpcUaClient = new SpOpcUaClient(SpOpcUaConfigBuilder.from(this.adapterDescription));
-
- if (this.spOpcUaClient.inPullMode()) {
- super.startAdapter();
- } else {
- this.before();
- }
- }
+ @Override
+ public void startAdapter() throws AdapterException {
- @Override
- public void stopAdapter() throws AdapterException {
- // close connection
- this.spOpcUaClient.disconnect();
+ this.spOpcUaClient = new SpOpcUaClient(SpOpcUaConfigBuilder.from(this.adapterDescription));
- if (this.spOpcUaClient.inPullMode()){
- super.stopAdapter();
- }
+ if (this.spOpcUaClient.inPullMode()) {
+ super.startAdapter();
+ } else {
+ this.before();
}
+ }
- @Override
- protected void pullData() {
- CompletableFuture<List<DataValue>> response = this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
- try {
- List<DataValue> returnValues = response.get();
- for (int i = 0; i<returnValues.size(); i++) {
-
- Object value = returnValues.get(i).getValue().getValue();
- this.event.put(this.allNodes.get(i).getLabel(), value);
-
- }
- } catch (InterruptedException | ExecutionException ie) {
- ie.printStackTrace();
- }
-
- adapterPipeline.process(this.event);
+ @Override
+ public void stopAdapter() throws AdapterException {
+ // close connection
+ this.spOpcUaClient.disconnect();
+ if (this.spOpcUaClient.inPullMode()) {
+ super.stopAdapter();
}
-
- public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
-
- String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
-
- OpcNode currNode = this.allNodes.stream()
- .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
- .findFirst()
- .orElse(null);
-
- event.put(currNode.getLabel(), value.getValue().getValue());
-
- // ensure that event is complete and all opc ua subscriptions transmitted at least one value
- if (event.keySet().size() >= this.numberProperties) {
- Map <String, Object> newEvent = new HashMap<>();
- // deep copy of event to prevent preprocessor error
- for (String k : event.keySet()) {
- newEvent.put(k, event.get(k));
- }
- adapterPipeline.process(newEvent);
+ }
+
+ @Override
+ protected void pullData() {
+ CompletableFuture<List<DataValue>> response = this.spOpcUaClient.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
+ boolean badStatusCodeReceived = false;
+ boolean emptyValueReceived = false;
+ try {
+ List<DataValue> returnValues = response.get();
+ if (returnValues.size() == 0) {
+ emptyValueReceived = true;
+ LOG.warn("Empty value object returned - event will not be sent");
+ } else {
+ for (int i = 0; i < returnValues.size(); i++) {
+ var status = returnValues.get(i).getStatusCode();
+ if (StatusCode.GOOD.equals(status)) {
+ Object value = returnValues.get(i).getValue().getValue();
+ this.event.put(this.allNodes.get(i).getLabel(), value);
+ } else {
+ badStatusCodeReceived = true;
+ LOG.warn("Received status code {} for node label: {} - event will not be sent",
+ status,
+ this.allNodes.get(i).getLabel());
+ }
}
+ }
+ if (!badStatusCodeReceived && !emptyValueReceived) {
+ adapterPipeline.process(this.event);
+ }
+ } catch (InterruptedException | ExecutionException ie) {
+ LOG.error("Exception while reading data", ie);
}
+ }
- @Override
- protected PollingSettings getPollingInterval() {
- return PollingSettings.from(TimeUnit.MILLISECONDS, this.pullingIntervalMilliSeconds);
- }
-
- @Override
- public SpecificAdapterStreamDescription declareModel() {
-
- SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder
- .create(ID)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .category(AdapterType.Generic, AdapterType.Manufacturing)
- .requiredAlternatives(Labels.withId(OpcUaLabels.ADAPTER_TYPE.name()),
- Alternatives.from(Labels.withId(OpcUaLabels.PULL_MODE.name()),
- StaticProperties.integerFreeTextProperty(Labels.withId(OpcUaLabels.PULLING_INTERVAL.name()))),
- Alternatives.from(Labels.withId(OpcUaLabels.SUBSCRIPTION_MODE.name())))
- .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
- Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
- Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
- StaticProperties.group(
- Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
- StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
- StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name()))
- ))
- )
- .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
- Alternatives.from(
- Labels.withId(OpcUaLabels.OPC_URL.name()),
- StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name()), "opc.tcp://localhost:4840"))
- ,
- Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
- StaticProperties.group(
- Labels.withId("host-port"),
- StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
- StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name()))
- ))
- )
- .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
- .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
- .requiredRuntimeResolvableTreeInput(
- Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
- Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
- )
- .build();
-
- description.setAppId(ID);
-
- return description;
- }
+ public void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
- @Override
- public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
- return new OpcUaAdapter(adapterDescription);
- }
+ String key = OpcUaUtil.getRuntimeNameOfNode(item.getReadValueId().getNodeId());
- @Override
- public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
- return OpcUaUtil.getSchema(adapterDescription);
- }
+ OpcNode currNode = this.allNodes.stream()
+ .filter(node -> key.equals(node.getNodeId().getIdentifier().toString()))
+ .findFirst()
+ .orElse(null);
- @Override
- public String getId() {
- return ID;
- }
+ event.put(currNode.getLabel(), value.getValue().getValue());
- @Override
- public StaticProperty resolveConfiguration(String staticPropertyInternalName, StaticPropertyExtractor extractor) {
- return OpcUaUtil.resolveConfiguration(staticPropertyInternalName, extractor);
+ // ensure that event is complete and all opc ua subscriptions transmitted at least one value
+ if (event.keySet().size() >= this.numberProperties) {
+ Map<String, Object> newEvent = new HashMap<>();
+ // deep copy of event to prevent preprocessor error
+ for (String k : event.keySet()) {
+ newEvent.put(k, event.get(k));
+ }
+ adapterPipeline.process(newEvent);
}
+ }
+
+ @Override
+ protected PollingSettings getPollingInterval() {
+ return PollingSettings.from(TimeUnit.MILLISECONDS, this.pullingIntervalMilliSeconds);
+ }
+
+ @Override
+ public SpecificAdapterStreamDescription declareModel() {
+
+ SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder
+ .create(ID)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .category(AdapterType.Generic, AdapterType.Manufacturing)
+ .requiredAlternatives(Labels.withId(OpcUaLabels.ADAPTER_TYPE.name()),
+ Alternatives.from(Labels.withId(OpcUaLabels.PULL_MODE.name()),
+ StaticProperties.integerFreeTextProperty(Labels.withId(OpcUaLabels.PULLING_INTERVAL.name()))),
+ Alternatives.from(Labels.withId(OpcUaLabels.SUBSCRIPTION_MODE.name())))
+ .requiredAlternatives(Labels.withId(OpcUaLabels.ACCESS_MODE.name()),
+ Alternatives.from(Labels.withId(OpcUaLabels.UNAUTHENTICATED.name())),
+ Alternatives.from(Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
+ StaticProperties.group(
+ Labels.withId(OpcUaLabels.USERNAME_GROUP.name()),
+ StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.USERNAME.name())),
+ StaticProperties.secretValue(Labels.withId(OpcUaLabels.PASSWORD.name()))
+ ))
+ )
+ .requiredAlternatives(Labels.withId(OpcUaLabels.OPC_HOST_OR_URL.name()),
+ Alternatives.from(
+ Labels.withId(OpcUaLabels.OPC_URL.name()),
+ StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_URL.name()), "opc.tcp://localhost:4840"))
+ ,
+ Alternatives.from(Labels.withId(OpcUaLabels.OPC_HOST.name()),
+ StaticProperties.group(
+ Labels.withId("host-port"),
+ StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_HOST.name())),
+ StaticProperties.stringFreeTextProperty(Labels.withId(OpcUaLabels.OPC_SERVER_PORT.name()))
+ ))
+ )
+ .requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
+ .requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
+ .requiredRuntimeResolvableTreeInput(
+ Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
+ Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
+ )
+ .build();
+
+ description.setAppId(ID);
+
+ return description;
+ }
+
+ @Override
+ public Adapter getInstance(SpecificAdapterStreamDescription adapterDescription) {
+ return new OpcUaAdapter(adapterDescription);
+ }
+
+ @Override
+ public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
+ return OpcUaUtil.getSchema(adapterDescription);
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public StaticProperty resolveConfiguration(String staticPropertyInternalName, StaticPropertyExtractor extractor) {
+ return OpcUaUtil.resolveConfiguration(staticPropertyInternalName, extractor);
+ }
}