You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/10 21:38:27 UTC
[incubator-streampipes] 05/08: [STREAMPIPES-505] Improve runtime-resolvable configuration of adapters, split OPC adapter
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-505
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit dbe82cafe380bebf5f6152e6715c44c8b690d8b7
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Mon Jan 10 10:53:49 2022 +0100
[STREAMPIPES-505] Improve runtime-resolvable configuration of adapters, split OPC adapter
---
.../worker/management/RuntimeResovable.java | 25 +-
.../worker/rest/RuntimeResolvableResource.java | 17 +-
.../opcua/MiloOpcUaConfigurationProvider.java | 100 +++++++
.../connect/iiot/adapters/opcua/OpcNode.java | 9 +
.../connect/iiot/adapters/opcua/OpcUa.java | 296 +++------------------
.../connect/iiot/adapters/opcua/OpcUaAdapter.java | 44 ++-
.../opcua/configuration/SpOpcUaConfig.java | 210 +++++++++++++++
.../opcua/configuration/SpOpcUaConfigBuilder.java | 99 +++++++
.../adapters/opcua/utils/OpcUaNodeVariants.java | 2 +-
.../iiot/adapters/opcua/utils/OpcUaUtil.java | 37 ++-
.../org/apache/streampipes/model/util/Cloner.java | 2 +
.../sdk/extractor/AbstractParameterExtractor.java | 52 +++-
12 files changed, 568 insertions(+), 325 deletions(-)
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/RuntimeResovable.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/RuntimeResovable.java
index a4252ce..fe4b003 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/RuntimeResovable.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/RuntimeResovable.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.connect.container.worker.management;
import org.apache.streampipes.connect.adapter.AdapterRegistry;
+import org.apache.streampipes.connect.api.Connector;
import org.apache.streampipes.connect.api.IAdapter;
import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.connect.api.IProtocol;
@@ -42,17 +43,17 @@ public class RuntimeResovable {
}
}
- public static ResolvesContainerProvidedOptions getRuntimeResolvableAdapter(String id) throws IllegalArgumentException {
- id = id.replaceAll("sp:", SP_NS);
- Map<String, IAdapter> allAdapters = DeclarersSingleton.getInstance().getAllAdaptersMap();
- Map<String, IProtocol> allProtocols = DeclarersSingleton.getInstance().getAllProtocolsMap();
-
- if (allAdapters.containsKey(id)) {
- return (ResolvesContainerProvidedOptions) allAdapters.get(id);
- } else if (allProtocols.containsKey(id)) {
- return (ResolvesContainerProvidedOptions) allProtocols.get(id);
- } else {
- throw new IllegalArgumentException("Could not find adapter with id " + id);
- }
+ public static Connector getAdapterOrProtocol(String id) {
+ id = id.replaceAll("sp:", SP_NS);
+ Map<String, IAdapter> allAdapters = DeclarersSingleton.getInstance().getAllAdaptersMap();
+ Map<String, IProtocol> allProtocols = DeclarersSingleton.getInstance().getAllProtocolsMap();
+
+ if (allAdapters.containsKey(id)) {
+ return allAdapters.get(id);
+ } else if (allProtocols.containsKey(id)) {
+ return allProtocols.get(id);
+ } else {
+ throw new IllegalArgumentException("Could not find adapter with id " + id);
+ }
}
}
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
index d9c9c6e..92a81fd 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/rest/RuntimeResolvableResource.java
@@ -18,9 +18,11 @@
package org.apache.streampipes.connect.container.worker.rest;
+import org.apache.streampipes.connect.api.Connector;
import org.apache.streampipes.connect.container.worker.management.RuntimeResovable;
import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
import org.apache.streampipes.container.api.RuntimeResolvableRequestHandler;
+import org.apache.streampipes.container.api.SupportsRuntimeConfig;
import org.apache.streampipes.model.runtime.RuntimeOptionsRequest;
import org.apache.streampipes.model.runtime.RuntimeOptionsResponse;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -41,10 +43,17 @@ public class RuntimeResolvableResource extends AbstractSharedRestInterface {
public Response fetchConfigurations(@PathParam("id") String elementId,
RuntimeOptionsRequest runtimeOptionsRequest) {
- ResolvesContainerProvidedOptions adapterClass =
- RuntimeResovable.getRuntimeResolvableAdapter(elementId);
-
- RuntimeOptionsResponse response = new RuntimeResolvableRequestHandler().handleRuntimeResponse(adapterClass, runtimeOptionsRequest);
+ Connector connector = RuntimeResovable.getAdapterOrProtocol(elementId);
+ RuntimeOptionsResponse response;
+ RuntimeResolvableRequestHandler handler = new RuntimeResolvableRequestHandler();
+
+ if (connector instanceof ResolvesContainerProvidedOptions) {
+ response = handler.handleRuntimeResponse((ResolvesContainerProvidedOptions) connector, runtimeOptionsRequest);
+ } else if (connector instanceof SupportsRuntimeConfig) {
+ response = handler.handleRuntimeResponse((SupportsRuntimeConfig) connector, runtimeOptionsRequest);
+ } else {
+ throw new WebApplicationException(javax.ws.rs.core.Response.Status.BAD_REQUEST);
+ }
return ok(response);
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/MiloOpcUaConfigurationProvider.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/MiloOpcUaConfigurationProvider.java
new file mode 100644
index 0000000..e4336d8
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/MiloOpcUaConfigurationProvider.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.adapters.opcua;
+
+import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfig;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
+import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+
+public class MiloOpcUaConfigurationProvider {
+
+ public OpcUaClientConfig makeClientConfig(SpOpcUaConfig spOpcConfig) throws Exception {
+ String opcServerUrl = spOpcConfig.getOpcServerURL();
+ List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(opcServerUrl).get();
+ String host = opcServerUrl.split("://")[1].split(":")[0];
+
+ EndpointDescription tmpEndpoint = endpoints
+ .stream()
+ .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
+ .findFirst()
+ .orElseThrow(() -> new Exception("No endpoint with security policy none"));
+
+ tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
+ endpoints = Collections.singletonList(tmpEndpoint);
+
+ EndpointDescription endpoint = endpoints
+ .stream()
+ .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
+ .findFirst().orElseThrow(() -> new Exception("no desired endpoints returned"));
+
+ return buildConfig(endpoint, spOpcConfig);
+ }
+
+ private OpcUaClientConfig buildConfig(EndpointDescription endpoint,
+ SpOpcUaConfig spOpcConfig) {
+
+ OpcUaClientConfigBuilder builder = defaultBuilder(endpoint);
+ if (!spOpcConfig.isUnauthenticated()) {
+ builder.setIdentityProvider(new UsernameProvider(spOpcConfig.getUsername(), spOpcConfig.getPassword()));
+ }
+ return builder.build();
+ }
+
+ private OpcUaClientConfigBuilder defaultBuilder(EndpointDescription endpoint) {
+ return OpcUaClientConfig.builder()
+ .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
+ .setApplicationUri("urn:eclipse:milo:examples:client")
+ .setEndpoint(endpoint);
+ }
+
+ private EndpointDescription updateEndpointUrl(
+ EndpointDescription original, String hostname) throws URISyntaxException {
+
+ URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
+
+ String endpointUrl = String.format(
+ "%s://%s:%s%s",
+ uri.getScheme(),
+ hostname,
+ uri.getPort(),
+ uri.getPath()
+ );
+
+ return new EndpointDescription(
+ endpointUrl,
+ original.getServer(),
+ original.getServerCertificate(),
+ original.getSecurityMode(),
+ original.getSecurityPolicyUri(),
+ original.getUserIdentityTokens(),
+ original.getTransportProfileUri(),
+ original.getSecurityLevel()
+ );
+ }
+}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcNode.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcNode.java
index 3c91249..99c8da6 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcNode.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcNode.java
@@ -31,6 +31,7 @@ public class OpcNode {
Datatypes type;
NodeId nodeId;
int opcUnitId;
+ private boolean readable;
/**
* Constructor for class OpcNode without an OPC UA unit identifier. <br>
@@ -95,6 +96,14 @@ public class OpcNode {
return this.opcUnitId !=0;
}
+ public boolean isReadable() {
+ return readable;
+ }
+
+ public void setReadable(boolean readable) {
+ this.readable = readable;
+ }
+
/**
* Returns the corresponding QUDT URI if the {@code opcUnitId} is given,
* otherwise it returns an empty string. <br>
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUa.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUa.java
index 8fabc9d..a947421 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUa.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUa.java
@@ -19,42 +19,29 @@
package org.apache.streampipes.connect.iiot.adapters.opcua;
-import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.retrieveDataTypesFromServer;
-import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
-import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList;
-
import org.apache.streampipes.connect.api.exception.AdapterException;
-import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil;
-import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
+import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfig;
import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaNodeVariants;
import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaTypes;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
-import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
-import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
-import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseResultMask;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
-import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.*;
import org.eclipse.milo.opcua.stack.core.types.structured.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
@@ -62,25 +49,27 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.retrieveDataTypesFromServer;
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+import static org.eclipse.milo.opcua.stack.core.util.ConversionUtil.toList;
+
/***
* Wrapper class for all OPC UA specific stuff.
*/
public class OpcUa {
- static Logger LOG = LoggerFactory.getLogger(OpcUa.class);
+ private static final Logger LOG = LoggerFactory.getLogger(OpcUa.class);
- private NodeId originNodeId;
- private String opcServerURL;
private OpcUaClient client;
- private boolean unauthenticated;
- private Integer pullIntervalMilliSeconds;
- private String user;
- private String password;
+ private final SpOpcUaConfig spOpcConfig;
private List<Map<String, Integer>> unitIDs = new ArrayList<>();
- private List<String> selectedNodeNames;
private static final AtomicLong clientHandles = new AtomicLong(1L);
+ public OpcUa(SpOpcUaConfig config) {
+ this.spOpcConfig = config;
+ }
+
/***
*
* @return current {@link org.eclipse.milo.opcua.sdk.client.OpcUaClient}
@@ -89,193 +78,14 @@ public class OpcUa {
return this.client;
}
-
- /**
- * Constructor for security level {@code None}, OPC server given by url and subscription-based
- *
- * @param opcServerURL complete OPC UA server url
- * @param namespaceIndex namespace index of the given node
- * @param nodeId node identifier
- * @param pullIntervalMilliSeconds duration of pull interval in milliseconds, {@code null} if in subscription mode
- * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
- */
- public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, int pullIntervalMilliSeconds, List<String> selectedNodeNames) {
-
- this.opcServerURL = opcServerURL;
- this.unauthenticated = true;
- this.pullIntervalMilliSeconds = pullIntervalMilliSeconds;
- this.selectedNodeNames = selectedNodeNames;
-
- if (isInteger(nodeId)) {
- int integerNodeId = Integer.parseInt(nodeId);
- this.originNodeId = new NodeId(namespaceIndex, integerNodeId);
- } else {
- this.originNodeId = new NodeId(namespaceIndex, nodeId);
- }
- }
-
- /**
- * Constructor for security level {@code None} and OPC server given by hostname and port number
- *
- * @param opcServer OPC UA hostname
- * @param opcServerPort OPC UA port number
- * @param namespaceIndex namespace index of the given node
- * @param nodeId node identifier
- * @param pullIntervalMilliSeconds duration of pull interval in milliseconds, {@code null} if in subscription mode
- * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
- */
- public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, int pullIntervalMilliSeconds, List<String> selectedNodeNames) {
- this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, pullIntervalMilliSeconds, selectedNodeNames);
- }
-
- /**
- * Constructor for security level {@code Sign} and OPC server given by url
- *
- * @param opcServerURL complete OPC UA server url
- * @param namespaceIndex namespace index of the given node
- * @param nodeId node identifier
- * @param username username to authenticate at the OPC UA server
- * @param password corresponding password to given user name
- * @param pullIntervalMilliSeconds duration of pull interval in milliseconds, {@code null} if in subscription mode
- * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
- */
- public OpcUa(String opcServerURL, int namespaceIndex, String nodeId, String username, String password, int pullIntervalMilliSeconds, List<String> selectedNodeNames) {
- this(opcServerURL, namespaceIndex, nodeId, pullIntervalMilliSeconds, selectedNodeNames);
- this.unauthenticated = false;
- this.user = username;
- this.password = password;
- }
-
- /**
- * Constructor for OPC UA security level {@code Sign} and OPC server given by hostname and port number
- *
- * @param opcServer OPC UA hostname
- * @param opcServerPort OPC UA port number
- * @param namespaceIndex namespace index of the given node
- * @param nodeId node identifier
- * @param username username to authenticate at the OPC UA server
- * @param password corresponding password to given user name
- * @param pullIntervalMilliSeconds duration of pull interval in milliseconds, {@code null} if in subscription mode
- * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveOptions(String, StaticPropertyExtractor)}
- */
- public OpcUa(String opcServer, int opcServerPort, int namespaceIndex, String nodeId, String username, String password, int pullIntervalMilliSeconds, List<String> selectedNodeNames) {
- this (opcServer, opcServerPort, namespaceIndex, nodeId, pullIntervalMilliSeconds, selectedNodeNames);
- this.unauthenticated = false;
- this.user = username;
- this.password = password;
- }
-
- /**
- * Creates {@link OpcUa} instance in accordance with the given {@link org.apache.streampipes.sdk.extractor.StaticPropertyExtractor}.
- * @param extractor extractor for user inputs
- * @return {@link OpcUa} instance based on information from {@code extractor}
- */
- public static OpcUa from(StaticPropertyExtractor extractor) {
-
- String selectedAlternativeConnection = extractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
- String selectedAlternativeAuthentication = extractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
-
- int namespaceIndex = extractor.singleValueParameter(OpcUaLabels.NAMESPACE_INDEX.name(), int.class);
- String nodeId = extractor.singleValueParameter(OpcUaLabels.NODE_ID.name(), String.class);
-
- boolean usePullMode = extractor.selectedAlternativeInternalId(OpcUaLabels.ADAPTER_TYPE.name()).equals(OpcUaLabels.PULL_MODE.name());
- boolean useURL = selectedAlternativeConnection.equals(OpcUaLabels.OPC_URL.name());
- boolean unauthenticated = selectedAlternativeAuthentication.equals(OpcUaLabels.UNAUTHENTICATED.name());
-
- Integer pullIntervalSeconds = null;
- if (usePullMode) {
- pullIntervalSeconds = extractor.singleValueParameter(OpcUaLabels.PULLING_INTERVAL.name(), Integer.class);
- }
-
- List<String> selectedNodeNames = extractor.selectedMultiValues(OpcUaLabels.AVAILABLE_NODES.name(), String.class);
-
- if (useURL && unauthenticated){
-
- String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
- serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
-
- return new OpcUa(serverAddress, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
-
- } else if(!useURL && unauthenticated){
- String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
- serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
- int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
-
- return new OpcUa(serverAddress, port, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
- } else {
-
- String username = extractor.singleValueParameter(OpcUaLabels.USERNAME.name(), String.class);
- String password = extractor.secretValue(OpcUaLabels.PASSWORD.name());
-
- if (useURL) {
- String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_URL.name(), String.class);
- serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
-
- return new OpcUa(serverAddress, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
- } else {
- String serverAddress = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
- serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
- int port = extractor.singleValueParameter(OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
-
- return new OpcUa(serverAddress, port, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
- }
- }
-
- }
-
- /***
- * Creates {@link OpcUa} instance in accordance with the given {@link org.apache.streampipes.model.connect.adapter.AdapterDescription}
- * @param adapterDescription description of current adapter
- * @return {@link OpcUa} instance based on information from {@code adapterDescription}
- */
- public static OpcUa from(AdapterDescription adapterDescription){
-
- StaticPropertyExtractor extractor =
- StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
-
- return from(extractor);
- }
-
/***
* Establishes appropriate connection to OPC UA endpoint depending on the {@link OpcUa} instance
*
- * @throws Exception
+ * @throws Exception An exception occurring during OPC connection
*/
public void connect() throws Exception {
-
- List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(this.opcServerURL).get();
- String host = this.opcServerURL.split("://")[1].split(":")[0];
-
- EndpointDescription tmpEndpoint = endpoints
- .stream()
- .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
- .findFirst()
- .orElseThrow(() -> new Exception("No endpoint with security policy none"));
-
- tmpEndpoint = updateEndpointUrl(tmpEndpoint, host);
- endpoints = Collections.singletonList(tmpEndpoint);
-
- EndpointDescription endpoint = endpoints
- .stream()
- .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
- .findFirst().orElseThrow(() -> new Exception("no desired endpoints returned"));
-
- OpcUaClientConfig config;
- if (this.unauthenticated) {
- config = OpcUaClientConfig.builder()
- .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
- .setApplicationUri("urn:eclipse:milo:examples:client")
- .setEndpoint(endpoint)
- .build();
- } else {
- config = OpcUaClientConfig.builder()
- .setIdentityProvider(new UsernameProvider(this.user, this.password))
- .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
- .setApplicationUri("urn:eclipse:milo:examples:client")
- .setEndpoint(endpoint)
- .build();
- }
- this.client = OpcUaClient.create(config);
+ OpcUaClientConfig clientConfig = new MiloOpcUaConfigurationProvider().makeClientConfig(spOpcConfig);
+ this.client = OpcUaClient.create(clientConfig);
client.connect().get();
}
@@ -283,38 +93,14 @@ public class OpcUa {
client.disconnect();
}
- private EndpointDescription updateEndpointUrl(
- EndpointDescription original, String hostname) throws URISyntaxException {
-
- URI uri = new URI(original.getEndpointUrl()).parseServerAuthority();
-
- String endpointUrl = String.format(
- "%s://%s:%s%s",
- uri.getScheme(),
- hostname,
- uri.getPort(),
- uri.getPath()
- );
-
- return new EndpointDescription(
- endpointUrl,
- original.getServer(),
- original.getServerCertificate(),
- original.getSecurityMode(),
- original.getSecurityPolicyUri(),
- original.getUserIdentityTokens(),
- original.getTransportProfileUri(),
- original.getSecurityLevel()
- );
- }
-
/***
- * Search for related nodes relative to {@link OpcUa#originNodeId}
- * @param selectNodes indicates whether only nodes of {@link OpcUa#selectedNodeNames} should be returned
- * @return List of {@link OpcNode}s related to {@link OpcUa#originNodeId}
+ * Search for related nodes relative to {@link SpOpcUaConfig#getOriginNodeId()}
+ * @param selectNodes indicates whether only nodes of {@link SpOpcUaConfig#getSelectedNodeNames()} should be returned
+ * @return List of {@link OpcNode}s related to {@link SpOpcUaConfig#getOriginNodeId()}
* @throws AdapterException
*/
public List<OpcNode> browseNode(boolean selectNodes) throws AdapterException {
+ NodeId originNodeId = spOpcConfig.getOriginNodeId();
List<OpcNode> discoveredNodes = browseNode(originNodeId, selectNodes);
if (discoveredNodes.size() == 0) {
@@ -367,9 +153,9 @@ public class OpcUa {
}
/***
- * Search for related nodes relative to {@link OpcUa#originNodeId}
- * @param selectNodes indicates whether only nodes of {@link OpcUa#selectedNodeNames} should be returned
- * @return List of {@link OpcNode}s related to {@link OpcUa#originNodeId}
+ * Search for related nodes relative to {@link SpOpcUaConfig#getOriginNodeId()}
+ * @param selectNodes indicates whether only nodes of {@link SpOpcUaConfig#getSelectedNodeNames()} should be returned
+ * @return List of {@link OpcNode}s related to {@link SpOpcUaConfig#getOriginNodeId()}
* @throws AdapterException
*/
private List<OpcNode> browseNode(NodeId browseRoot, boolean selectNodes) throws AdapterException {
@@ -463,12 +249,13 @@ public class OpcUa {
if (selectNodes) {
// filter for nodes that were selected by the user during configuration
- result = result.stream().filter(node -> this.getSelectedNodeNames().contains(node.getLabel()))
+// result = result.stream().filter(node -> this.getSelectedNodeNames().contains(node.getLabel()))
+// .collect(Collectors.toList());
+ result = result.stream().filter(node -> this.getSelectedNodeNames().contains(node.getNodeId().getIdentifier().toString()))
.collect(Collectors.toList());
}
return result;
-
}
@@ -484,7 +271,6 @@ public class OpcUa {
*/
UaSubscription subscription = this.client.getSubscriptionManager().createSubscription(1000.0).get();
-
List<CompletableFuture<DataValue>> values = new ArrayList<>();
for (NodeId node : nodes) {
@@ -540,33 +326,17 @@ public class OpcUa {
System.out.println("failed to create item for " + item.getReadValueId().getNodeId() + item.getStatusCode());
}
}
-
- }
-
- public static boolean isInteger(String s) {
- try {
- Integer.parseInt(s);
- } catch(NumberFormatException | NullPointerException e) {
- return false;
- }
- // only got here if we didn't return false
- return true;
- }
-
-
- public List<String> getSelectedNodeNames() {
- return selectedNodeNames;
}
- public String getOpcServerURL() {
- return opcServerURL;
+ private List<String> getSelectedNodeNames() {
+ return spOpcConfig.getSelectedNodeNames();
}
public boolean inPullMode() {
- return !(this.pullIntervalMilliSeconds == null);
+ return !(spOpcConfig.getPullIntervalMilliSeconds() == null);
}
public int getPullIntervalMilliSeconds() {
- return this.pullIntervalMilliSeconds;
+ return spOpcConfig.getPullIntervalMilliSeconds();
}
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
index dee67e9..e885c87 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/OpcUaAdapter.java
@@ -19,17 +19,18 @@
package org.apache.streampipes.connect.iiot.adapters.opcua;
import org.apache.streampipes.connect.adapter.Adapter;
+import org.apache.streampipes.connect.adapter.util.PollingSettings;
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.connect.adapter.util.PollingSettings;
import org.apache.streampipes.connect.iiot.adapters.PullAdapter;
+import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfigBuilder;
import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil;
import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil.OpcUaLabels;
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.container.api.SupportsRuntimeConfig;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
-import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
@@ -47,7 +48,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvidedOptions {
+public class OpcUaAdapter extends PullAdapter implements SupportsRuntimeConfig {
public static final String ID = "org.apache.streampipes.connect.iiot.adapters.opcua";
@@ -99,7 +100,7 @@ public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvid
@Override
public void startAdapter() throws AdapterException {
- this.opcUa = OpcUa.from(this.adapterDescription);
+ this.opcUa = new OpcUa(SpOpcUaConfigBuilder.from(this.adapterDescription));
if (this.opcUa.inPullMode()) {
super.startAdapter();
@@ -121,21 +122,20 @@ public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvid
@Override
protected void pullData() {
- CompletableFuture<List<DataValue>> response = this.opcUa.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
- try {
-
- List<DataValue> returnValues = response.get();
- for (int i = 0; i<returnValues.size(); i++) {
+ CompletableFuture<List<DataValue>> response = this.opcUa.getClient().readValues(0, TimestampsToReturn.Both, this.allNodeIds);
+ try {
+ List<DataValue> returnValues = response.get();
+ for (int i = 0; i<returnValues.size(); i++) {
- Object value = returnValues.get(i).getValue().getValue();
- this.event.put(this.allNodes.get(i).getLabel(), value);
+ Object value = returnValues.get(i).getValue().getValue();
+ this.event.put(this.allNodes.get(i).getLabel(), value);
- }
- } catch (InterruptedException | ExecutionException ie) {
- ie.printStackTrace();
- }
+ }
+ } catch (InterruptedException | ExecutionException ie) {
+ ie.printStackTrace();
+ }
- adapterPipeline.process(this.event);
+ adapterPipeline.process(this.event);
}
@@ -201,7 +201,7 @@ public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvid
)
.requiredTextParameter(Labels.withId(OpcUaLabels.NAMESPACE_INDEX.name()))
.requiredTextParameter(Labels.withId(OpcUaLabels.NODE_ID.name()))
- .requiredMultiValueSelectionFromContainer(
+ .requiredRuntimeResolvableTreeInput(
Labels.withId(OpcUaLabels.AVAILABLE_NODES.name()),
Arrays.asList(OpcUaLabels.NAMESPACE_INDEX.name(), OpcUaLabels.NODE_ID.name())
)
@@ -219,7 +219,6 @@ public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvid
@Override
public GuessSchema getSchema(SpecificAdapterStreamDescription adapterDescription) throws AdapterException, ParseException {
-
return OpcUaUtil.getSchema(adapterDescription);
}
@@ -229,10 +228,7 @@ public class OpcUaAdapter extends PullAdapter implements ResolvesContainerProvid
}
@Override
- public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
-
- return OpcUaUtil.resolveOptions(requestId, parameterExtractor);
-
+ public StaticProperty resolveConfiguration(String staticPropertyInternalName, StaticPropertyExtractor extractor) {
+ return OpcUaUtil.resolveConfiguration(staticPropertyInternalName, extractor);
}
-
}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/configuration/SpOpcUaConfig.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/configuration/SpOpcUaConfig.java
new file mode 100644
index 0000000..a2a5c1e
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/configuration/SpOpcUaConfig.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.adapters.opcua.configuration;
+
+import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+
+import java.util.List;
+
+public class SpOpcUaConfig {
+
+ private String opcServerURL;
+ private int namespaceIndex;
+
+ private boolean unauthenticated;
+
+ private String username;
+ private String password;
+
+ private List<String> selectedNodeNames;
+ private Integer pullIntervalMilliSeconds;
+ private NodeId originNodeId;
+
+ public SpOpcUaConfig() {
+ }
+
+ /**
+ * Constructor for security level {@code None}, OPC server given by url and subscription-based
+ *
+ * @param opcServerURL complete OPC UA server url
+ * @param namespaceIndex namespace index of the given node
+ * @param nodeId node identifier
+ * @param pullIntervalMilliSeconds duration of pull interval in milliseconds, {@code null} if in subscription mode
+ * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveConfiguration(String, StaticPropertyExtractor)} (String, StaticPropertyExtractor)}
+ */
+ public SpOpcUaConfig(String opcServerURL, int namespaceIndex, String nodeId, int pullIntervalMilliSeconds, List<String> selectedNodeNames) {
+
+ this.opcServerURL = opcServerURL;
+ this.unauthenticated = true;
+ this.pullIntervalMilliSeconds = pullIntervalMilliSeconds;
+ this.selectedNodeNames = selectedNodeNames;
+
+ if (isInteger(nodeId)) {
+ int integerNodeId = Integer.parseInt(nodeId);
+ this.originNodeId = new NodeId(namespaceIndex, integerNodeId);
+ } else {
+ this.originNodeId = new NodeId(namespaceIndex, nodeId);
+ }
+ }
+
+ /**
+ * Constructor for security level {@code None} and OPC server given by hostname and port number
+ *
+ * @param opcServer OPC UA hostname
+ * @param opcServerPort OPC UA port number
+ * @param namespaceIndex namespace index of the given node
+ * @param nodeId node identifier
+ * @param pullIntervalMilliSeconds duration of pull interval in milliseconds, {@code null} if in subscription mode
+ * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveConfiguration(String, StaticPropertyExtractor)}
+ */
+ public SpOpcUaConfig(String opcServer,
+ int opcServerPort,
+ int namespaceIndex,
+ String nodeId,
+ int pullIntervalMilliSeconds,
+ List<String> selectedNodeNames) {
+ this( opcServer + ":" + opcServerPort, namespaceIndex, nodeId, pullIntervalMilliSeconds, selectedNodeNames);
+ }
+
+ /**
+ * Constructor for security level {@code Sign} and OPC server given by url
+ *
+ * @param opcServerURL complete OPC UA server url
+ * @param namespaceIndex namespace index of the given node
+ * @param nodeId node identifier
+ * @param username username to authenticate at the OPC UA server
+ * @param password corresponding password to given user name
+ * @param pullIntervalMilliSeconds duration of pull interval in milliseconds, {@code null} if in subscription mode
+ * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveConfiguration(String, StaticPropertyExtractor)}
+ */
+ public SpOpcUaConfig(String opcServerURL,
+ int namespaceIndex,
+ String nodeId,
+ String username,
+ String password,
+ int pullIntervalMilliSeconds,
+ List<String> selectedNodeNames) {
+ this(opcServerURL, namespaceIndex, nodeId, pullIntervalMilliSeconds, selectedNodeNames);
+ this.unauthenticated = false;
+ this.username = username;
+ this.password = password;
+ }
+
+ /**
+ * Constructor for OPC UA security level {@code Sign} and OPC server given by hostname and port number
+ *
+ * @param opcServer OPC UA hostname
+ * @param opcServerPort OPC UA port number
+ * @param namespaceIndex namespace index of the given node
+ * @param nodeId node identifier
+ * @param username username to authenticate at the OPC UA server
+ * @param password corresponding password to given user name
+ * @param pullIntervalMilliSeconds duration of pull interval in milliseconds, {@code null} if in subscription mode
+ * @param selectedNodeNames list of node names provided from {@link OpcUaUtil#resolveConfiguration(String, StaticPropertyExtractor)}
+ */
+ public SpOpcUaConfig(String opcServer,
+ int opcServerPort,
+ int namespaceIndex,
+ String nodeId,
+ String username,
+ String password,
+ int pullIntervalMilliSeconds,
+ List<String> selectedNodeNames) {
+ this (opcServer, opcServerPort, namespaceIndex, nodeId, pullIntervalMilliSeconds, selectedNodeNames);
+ this.unauthenticated = false;
+ this.username = username;
+ this.password = password;
+ }
+
+ public String getOpcServerURL() {
+ return opcServerURL;
+ }
+
+ public void setOpcServerURL(String opcServerURL) {
+ this.opcServerURL = opcServerURL;
+ }
+
+ public boolean isUnauthenticated() {
+ return unauthenticated;
+ }
+
+ public void setUnauthenticated(boolean unauthenticated) {
+ this.unauthenticated = unauthenticated;
+ }
+
+ public int getNamespaceIndex() {
+ return namespaceIndex;
+ }
+
+ public void setNamespaceIndex(int namespaceIndex) {
+ this.namespaceIndex = namespaceIndex;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public List<String> getSelectedNodeNames() {
+ return selectedNodeNames;
+ }
+
+ public void setSelectedNodeNames(List<String> selectedNodeNames) {
+ this.selectedNodeNames = selectedNodeNames;
+ }
+
+ public Integer getPullIntervalMilliSeconds() {
+ return pullIntervalMilliSeconds;
+ }
+
+ public void setPullIntervalMilliSeconds(Integer pullIntervalMilliSeconds) {
+ this.pullIntervalMilliSeconds = pullIntervalMilliSeconds;
+ }
+
+ public NodeId getOriginNodeId() {
+ return originNodeId;
+ }
+
+ public void setOriginNodeId(NodeId originNodeId) {
+ this.originNodeId = originNodeId;
+ }
+
+ public static boolean isInteger(String s) {
+ try {
+ Integer.parseInt(s);
+ } catch(NumberFormatException | NullPointerException e) {
+ return false;
+ }
+ // only got here if we didn't return false
+ return true;
+ }
+}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/configuration/SpOpcUaConfigBuilder.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/configuration/SpOpcUaConfigBuilder.java
new file mode 100644
index 0000000..f37d5a1
--- /dev/null
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/configuration/SpOpcUaConfigBuilder.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.adapters.opcua.configuration;
+
+import org.apache.streampipes.connect.iiot.adapters.opcua.utils.OpcUaUtil;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SpOpcUaConfigBuilder {
+
+ /**
+ * Creates {@link SpOpcUaConfig} instance in accordance with the given {@link org.apache.streampipes.sdk.extractor.StaticPropertyExtractor}.
+ * @param extractor extractor for user inputs
+ * @return {@link SpOpcUaConfig} instance based on information from {@code extractor}
+ */
+ public static SpOpcUaConfig from(StaticPropertyExtractor extractor) {
+
+ String selectedAlternativeConnection = extractor.selectedAlternativeInternalId(OpcUaUtil.OpcUaLabels.OPC_HOST_OR_URL.name());
+ String selectedAlternativeAuthentication = extractor.selectedAlternativeInternalId(OpcUaUtil.OpcUaLabels.ACCESS_MODE.name());
+
+ int namespaceIndex = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.NAMESPACE_INDEX.name(), int.class);
+ String nodeId = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.NODE_ID.name(), String.class);
+
+ boolean usePullMode = extractor.selectedAlternativeInternalId(OpcUaUtil.OpcUaLabels.ADAPTER_TYPE.name()).equals(OpcUaUtil.OpcUaLabels.PULL_MODE.name());
+ boolean useURL = selectedAlternativeConnection.equals(OpcUaUtil.OpcUaLabels.OPC_URL.name());
+ boolean unauthenticated = selectedAlternativeAuthentication.equals(OpcUaUtil.OpcUaLabels.UNAUTHENTICATED.name());
+
+ Integer pullIntervalSeconds = null;
+ if (usePullMode) {
+ pullIntervalSeconds = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.PULLING_INTERVAL.name(), Integer.class);
+ }
+
+ List<String> selectedNodeNames = extractor.selectedTreeNodesInternalNames(OpcUaUtil.OpcUaLabels.AVAILABLE_NODES.name(), String.class, true);
+
+ if (useURL && unauthenticated){
+
+ String serverAddress = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.OPC_SERVER_URL.name(), String.class);
+ serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
+
+ return new SpOpcUaConfig(serverAddress, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
+
+ } else if(!useURL && unauthenticated){
+ String serverAddress = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
+ serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
+ int port = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
+
+ return new SpOpcUaConfig(serverAddress, port, namespaceIndex, nodeId, pullIntervalSeconds, selectedNodeNames);
+ } else {
+
+ String username = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.USERNAME.name(), String.class);
+ String password = extractor.secretValue(OpcUaUtil.OpcUaLabels.PASSWORD.name());
+
+ if (useURL) {
+ String serverAddress = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.OPC_SERVER_URL.name(), String.class);
+ serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
+
+ return new SpOpcUaConfig(serverAddress, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
+ } else {
+ String serverAddress = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.OPC_SERVER_HOST.name(), String.class);
+ serverAddress = OpcUaUtil.formatServerAddress(serverAddress);
+ int port = extractor.singleValueParameter(OpcUaUtil.OpcUaLabels.OPC_SERVER_PORT.name(), int.class);
+
+ return new SpOpcUaConfig(serverAddress, port, namespaceIndex, nodeId, username, password, pullIntervalSeconds, selectedNodeNames);
+ }
+ }
+ }
+
+ /***
+ * Creates {@link SpOpcUaConfig} instance in accordance with the given {@link org.apache.streampipes.model.connect.adapter.AdapterDescription}
+ * @param adapterDescription description of current adapter
+ * @return {@link SpOpcUaConfig} instance based on information from {@code adapterDescription}
+ */
+ public static SpOpcUaConfig from(AdapterDescription adapterDescription){
+
+ StaticPropertyExtractor extractor =
+ StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
+
+ return from(extractor);
+ }
+}
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/utils/OpcUaNodeVariants.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/utils/OpcUaNodeVariants.java
index abf0b87..51700c4 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/utils/OpcUaNodeVariants.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/utils/OpcUaNodeVariants.java
@@ -29,7 +29,7 @@ public enum OpcUaNodeVariants {
Property(68),
EUInformation(887);
- // ID as specific in OPC UA standard
+ // ID as specified in OPC UA standard
private final int id;
private OpcUaNodeVariants(int id){
diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/utils/OpcUaUtil.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/utils/OpcUaUtil.java
index ef164ab..a882d74 100644
--- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/utils/OpcUaUtil.java
+++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/opcua/utils/OpcUaUtil.java
@@ -22,11 +22,13 @@ import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.connect.iiot.adapters.opcua.OpcNode;
import org.apache.streampipes.connect.iiot.adapters.opcua.OpcUa;
+import org.apache.streampipes.connect.iiot.adapters.opcua.configuration.SpOpcUaConfigBuilder;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
+import org.apache.streampipes.model.staticproperty.TreeInputNode;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
@@ -35,7 +37,8 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
/***
* Collection of several utility functions in context of OPC UA
@@ -68,7 +71,7 @@ public class OpcUaUtil {
EventSchema eventSchema = new EventSchema();
List<EventProperty> allProperties = new ArrayList<>();
- OpcUa opcUa = OpcUa.from(adapterStreamDescription);
+ OpcUa opcUa = new OpcUa(SpOpcUaConfigBuilder.from(adapterStreamDescription));
try {
opcUa.connect();
@@ -108,28 +111,32 @@ public class OpcUaUtil {
/***
* OPC UA specific implementation of {@link org.apache.streampipes.container.api.ResolvesContainerProvidedOptions#resolveOptions(String, StaticPropertyExtractor)}. }
- * @param requestId
+ * @param internalName The internal name of the Static Property
* @param parameterExtractor
* @return {@code List<Option>} with available node names for the given OPC UA configuration
*/
- public static List<Option> resolveOptions (String requestId, StaticPropertyExtractor parameterExtractor) {
+ public static RuntimeResolvableTreeInputStaticProperty resolveConfiguration (String internalName,
+ StaticPropertyExtractor parameterExtractor) {
+ RuntimeResolvableTreeInputStaticProperty config = parameterExtractor
+ .getStaticPropertyByName(internalName, RuntimeResolvableTreeInputStaticProperty.class);
// access mode and host/url have to be selected
try {
parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.OPC_HOST_OR_URL.name());
parameterExtractor.selectedAlternativeInternalId(OpcUaLabels.ACCESS_MODE.name());
} catch (NullPointerException nullPointerException) {
- return new ArrayList<>();
+ return config;
}
- OpcUa opcUa = OpcUa.from(parameterExtractor);
+ OpcUa opcUa = new OpcUa(SpOpcUaConfigBuilder.from(parameterExtractor));
- List<Option> nodeOptions = new ArrayList<>();
+ List<TreeInputNode> nodeOptions = new ArrayList<>();
try{
opcUa.connect();
for(OpcNode opcNode: opcUa.browseNode(false)) {
- nodeOptions.add(new Option(opcNode.getLabel(), opcNode.getNodeId().getIdentifier().toString()));
+ TreeInputNode node = makeTreeInputNode(opcNode);
+ nodeOptions.add(node);
}
opcUa.disconnect();
@@ -137,7 +144,17 @@ public class OpcUaUtil {
e.printStackTrace();
}
- return nodeOptions;
+ config.setNodes(nodeOptions);
+
+ return config;
+ }
+
+ private static TreeInputNode makeTreeInputNode(OpcNode opcNode) {
+ TreeInputNode node = new TreeInputNode();
+ node.setNodeName(opcNode.getLabel());
+ node.setInternalNodeName(opcNode.getNodeId().getIdentifier().toString());
+ node.setDataNode(true);
+ return node;
}
public static String getRuntimeNameOfNode(NodeId nodeId) {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
index 8c785f6..27891d7 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/util/Cloner.java
@@ -117,6 +117,8 @@ public class Cloner {
return new ColorPickerStaticProperty((ColorPickerStaticProperty) o);
} else if (o instanceof SlideToggleStaticProperty) {
return new SlideToggleStaticProperty((SlideToggleStaticProperty) o);
+ } else if (o instanceof RuntimeResolvableTreeInputStaticProperty) {
+ return new RuntimeResolvableTreeInputStaticProperty((RuntimeResolvableTreeInputStaticProperty) o);
} else {
return new StaticPropertyAlternative((StaticPropertyAlternative) o);
}
diff --git a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
index 4d50ce7..eedd0a8 100644
--- a/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
+++ b/streampipes-sdk/src/main/java/org/apache/streampipes/sdk/extractor/AbstractParameterExtractor.java
@@ -102,7 +102,7 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
}
public String codeblockValue(String internalName) {
- return getStaticPropertyByName(internalName,CodeInputStaticProperty.class).getValue();
+ return getStaticPropertyByName(internalName, CodeInputStaticProperty.class).getValue();
}
public String selectedColor(String internalName) {
@@ -110,8 +110,7 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
}
/**
- * @deprecated
- * This won't work after release 0.69.0 as all API requests against the core need to be authenticated.
+ * @deprecated This won't work after release 0.69.0 as all API requests against the core need to be authenticated.
* Use the StreamPipes Client File API instead (e.g., StreamPipesClientResolver.makeStreamPipesClientInstance()).
**/
@Deprecated
@@ -120,8 +119,7 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
}
/**
- * @deprecated
- * This won't work after release 0.69.0 as all API requests against the core need to be authenticated.
+ * @deprecated This won't work after release 0.69.0 as all API requests against the core need to be authenticated.
* Use the StreamPipes Client File API instead (e.g., StreamPipesClientResolver.makeStreamPipesClientInstance()).
**/
public byte[] fileContentsAsByteArray(String internalName) throws IOException {
@@ -129,8 +127,7 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
}
/**
- * @deprecated
- * This won't work after release 0.69.0 as all API requests against the core need to be authenticated.
+ * @deprecated This won't work after release 0.69.0 as all API requests against the core need to be authenticated.
* Use the StreamPipes Client File API instead (e.g., StreamPipesClientResolver.makeStreamPipesClientInstance()).
**/
public InputStream fileContentsAsStream(String internalName) throws IOException {
@@ -142,8 +139,7 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
}
/**
- * @deprecated
- * This won't work after release 0.69.0 as all API requests against the core need to be authenticated.
+ * @deprecated This won't work after release 0.69.0 as all API requests against the core need to be authenticated.
* Use the StreamPipes Client File API instead (e.g., StreamPipesClientResolver.makeStreamPipesClientInstance()).
**/
public String selectedFileFetchUrl(String internalName) {
@@ -198,8 +194,8 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
}
public Boolean comparePropertyRuntimeType(EventProperty eventProperty,
- Datatypes datatype,
- boolean ignoreListElements) {
+ Datatypes datatype,
+ boolean ignoreListElements) {
EventPropertyPrimitive testProperty = null;
if (eventProperty instanceof EventPropertyList && !ignoreListElements) {
testProperty = (EventPropertyPrimitive) ((EventPropertyList) eventProperty).getEventProperty();
@@ -240,6 +236,40 @@ public abstract class AbstractParameterExtractor<T extends InvocableStreamPipesE
.collect(Collectors.toList());
}
+ public <V> List<V> selectedTreeNodesInternalNames(String internalName,
+ Class<V> targetClass,
+ boolean onlyDataNodes) {
+ List<TreeInputNode> allNodes = new ArrayList<>();
+ RuntimeResolvableTreeInputStaticProperty sp = getStaticPropertyByName(internalName, RuntimeResolvableTreeInputStaticProperty.class);
+ if (sp.getNodes().size() > 0) {
+ sp.getNodes().forEach(node -> buildFlatTree(node, allNodes));
+ }
+
+ if (allNodes.size() > 0) {
+ return allNodes
+ .stream()
+ .filter(node -> {
+ if (!onlyDataNodes) {
+ return true;
+ } else {
+ return node.isDataNode();
+ }
+ })
+ .filter(TreeInputNode::isSelected)
+ .map(node -> typeParser.parse(node.getInternalNodeName(), targetClass))
+ .collect(Collectors.toList());
+ } else {
+ return new ArrayList<>();
+ }
+ }
+
+ private void buildFlatTree(TreeInputNode parent, List<TreeInputNode> collector) {
+ collector.add(parent);
+ if (parent.hasChildren()) {
+ parent.getChildren().forEach(child -> buildFlatTree(child, collector));
+ }
+ }
+
public <S extends StaticProperty> S getStaticPropertyByName(String internalName, Class<S>
spType) {
return spType.cast(getStaticPropertyByName(internalName));