You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/24 07:54:00 UTC
[camel] branch master updated: camel-milo: added possibility to
read node values via producer. (#4909)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 811d93c camel-milo: added possibility to read node values via producer. (#4909)
811d93c is described below
commit 811d93c1e4199612aac81ab25a2ee51aceb3474c
Author: attrobit <gi...@attrobit.de>
AuthorDate: Sun Jan 24 08:53:23 2021 +0100
camel-milo: added possibility to read node values via producer. (#4909)
* camel-milo: added possibility to read node values via producer.
* camel-milo: added possibility to read node values via producer.
* Fixed Producer construction after rebase.
* documentation CamelMiloNodeIds
Co-authored-by: attrobit <gi...@attrobit.com>
---
.../camel/catalog/docs/milo-client-component.adoc | 19 +++++
.../src/main/docs/milo-client-component.adoc | 19 +++++
.../milo/client/MiloClientConnection.java | 7 ++
.../component/milo/client/MiloClientEndpoint.java | 2 +-
.../component/milo/client/MiloClientProducer.java | 16 ++++-
.../milo/client/internal/SubscriptionManager.java | 39 ++++++++--
.../camel/component/milo/ReadValuesClientTest.java | 83 ++++++++++++++++++++++
.../modules/ROOT/pages/milo-client-component.adoc | 19 +++++
8 files changed, 197 insertions(+), 7 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc
index 985979f..fdec769 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc
@@ -185,6 +185,7 @@ might be different, and wrong from the point of view of the connecting client (e
In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information,
but override the host information with the value of the original URI.
+[[nodeid]]
=== Node ID
@@ -228,6 +229,24 @@ Input parameters are taken from the body:
* If the body is a `Variant`, then it will be wrapped in a `Variant[]` array
* Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]`
+=== Read Values from Nodes
+
+The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format).
+
+Example:
+```java
+from("direct:start")
+ .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1")))
+ .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites"
+ .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ return newExchange;
+ }
+ }).to("mock:test1");
+```
+
=== Security policies
When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`)
diff --git a/components/camel-milo/src/main/docs/milo-client-component.adoc b/components/camel-milo/src/main/docs/milo-client-component.adoc
index 985979f..fdec769 100644
--- a/components/camel-milo/src/main/docs/milo-client-component.adoc
+++ b/components/camel-milo/src/main/docs/milo-client-component.adoc
@@ -185,6 +185,7 @@ might be different, and wrong from the point of view of the connecting client (e
In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information,
but override the host information with the value of the original URI.
+[[nodeid]]
=== Node ID
@@ -228,6 +229,24 @@ Input parameters are taken from the body:
* If the body is a `Variant`, then it will be wrapped in a `Variant[]` array
* Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]`
+=== Read Values from Nodes
+
+The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format).
+
+Example:
+```java
+from("direct:start")
+ .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1")))
+ .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites"
+ .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ return newExchange;
+ }
+ }).to("mock:test1");
+```
+
=== Security policies
When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`)
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
index 7af45a6..b282a2d 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.milo.client;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -107,6 +108,12 @@ public class MiloClientConnection implements AutoCloseable {
return this.manager.write(nodeId, mapWriteValue(value));
}
+ public CompletableFuture<?> readValues(final List<ExpandedNodeId> nodeIds) {
+ checkInit();
+
+ return this.manager.readValues(nodeIds);
+ }
+
public CompletableFuture<CallMethodResult> call(
final ExpandedNodeId nodeId, final ExpandedNodeId methodId, final Object value) {
checkInit();
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
index f689e62..75bc2c6 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
@@ -110,7 +110,7 @@ public class MiloClientEndpoint extends DefaultEndpoint {
@Override
public Producer createProducer() throws Exception {
- return new MiloClientProducer(this, this.defaultAwaitWrites);
+ return new MiloClientProducer(this, this.createConnection(), this.defaultAwaitWrites);
}
@Override
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
index d244236..8de1e59 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.component.milo.client;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@@ -35,7 +37,9 @@ public class MiloClientProducer extends DefaultAsyncProducer {
private final boolean defaultAwaitWrites;
- public MiloClientProducer(final MiloClientEndpoint endpoint,
+ private static final String HEADER_NODE_IDS = "CamelMiloNodeIds";
+
+ public MiloClientProducer(final MiloClientEndpoint endpoint, final MiloClientConnection connection,
final boolean defaultAwaitWrites) {
super(endpoint);
@@ -74,7 +78,15 @@ public class MiloClientProducer extends DefaultAsyncProducer {
final CompletableFuture<?> future;
- if (this.methodId == null) {
+ if (msg.getHeaders().containsKey(HEADER_NODE_IDS)) {
+ final List<String> nodeIds = msg.getHeader(HEADER_NODE_IDS, List.class);
+ final List<ExpandedNodeId> expandedNodeIds
+ = nodeIds.stream().map(String.class::cast).map(ExpandedNodeId::parse).collect(Collectors.toList());
+ future = this.connection.readValues(expandedNodeIds).thenApply(nodes -> {
+ exchange.getIn().setBody(nodes);
+ return nodes;
+ });
+ } else if (this.methodId == null) {
future = this.connection.writeValue(this.nodeId, value);
} else {
future = this.connection.call(this.nodeId, this.methodId, value);
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
index 95410d0..2b49043 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
@@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.base.Strings;
import org.apache.camel.component.milo.client.MiloClientConfiguration;
@@ -188,9 +190,8 @@ public class SubscriptionManager {
final ReadValueId itemId = new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
Double samplingInterval = s.getSamplingInterval();
- final MonitoringParameters parameters
- = new MonitoringParameters(
- entry.getKey(), samplingInterval, s.createMonitoringFilter(client), null, null);
+ final MonitoringParameters parameters = new MonitoringParameters(
+ entry.getKey(), samplingInterval, s.createMonitoringFilter(client), null, null);
items.add(new MonitoredItemCreateRequest(itemId, MonitoringMode.Reporting, parameters));
}
}
@@ -355,6 +356,17 @@ public class SubscriptionManager {
});
}
+
+ public CompletableFuture<List<DataValue>> readValues(List<ExpandedNodeId> expandedNodeIds) {
+
+ final CompletableFuture<NodeId>[] nodeIdFutures
+ = expandedNodeIds.stream().map(this::lookupNamespace).toArray(CompletableFuture[]::new);
+
+ return CompletableFuture.allOf(nodeIdFutures).thenCompose(param -> {
+ List<NodeId> nodeIds = Stream.of(nodeIdFutures).map(CompletableFuture::join).collect(Collectors.toList());
+ return this.client.readValues(0, TimestampsToReturn.Server, nodeIds);
+ });
+ }
}
private final MiloClientConfiguration configuration;
@@ -449,7 +461,9 @@ public class SubscriptionManager {
final URI uri = URI.create(getEndpointDiscoveryUri());
- //milo library doesn't allow user info as a part of the uri, it has to be removed before sending to milo
+ // milo library doesn't allow user info as a part of the uri, it has to
+ // be
+ // removed before sending to milo
final String user = uri.getUserInfo();
if (user != null && !user.isEmpty()) {
discoveryUri = discoveryUri.replaceFirst(user + "@", "");
@@ -706,4 +720,21 @@ public class SubscriptionManager {
}
}
+ public CompletableFuture<?> readValues(final List<ExpandedNodeId> nodeIds) {
+ synchronized (this) {
+ if (this.connected == null) {
+ return newNotConnectedResult();
+ }
+
+ return this.connected.readValues(nodeIds).handleAsync((nodes, e) -> {
+ // handle outside the lock, running using
+ // handleAsync
+ if (e != null) {
+ handleConnectionFailue(e);
+ }
+ return nodes;
+ }, this.executor);
+ }
+ }
+
}
diff --git a/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java b/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java
new file mode 100644
index 0000000..7b082fd
--- /dev/null
+++ b/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.milo;
+
+import java.util.Arrays;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+public class ReadValuesClientTest extends AbstractMiloServerTest {
+
+ private static final String DIRECT_START_1 = "direct:start1";
+ private static final String DIRECT_START_2 = "direct:start2";
+
+ private static final String MILO_SERVER_ITEM_1 = "milo-server:myitem1";
+
+ private static final String MILO_CLIENT_ITEM_C1_1
+ = "milo-client:opc.tcp://foo:bar@localhost:@@port@@?allowedSecurityPolicies=None&overrideHost=true";
+
+ private static final String MOCK_TEST_1 = "mock:test1";
+
+ @EndpointInject(MOCK_TEST_1)
+ protected MockEndpoint test1Endpoint;
+
+ @Produce(DIRECT_START_1)
+ protected ProducerTemplate producer1;
+
+ @Produce(DIRECT_START_2)
+ protected ProducerTemplate producer2;
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(DIRECT_START_1).to(MILO_SERVER_ITEM_1);
+
+ from(DIRECT_START_2)
+ .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1")))
+ .enrich(resolve(MILO_CLIENT_ITEM_C1_1), new AggregationStrategy() {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ return newExchange;
+ }
+ }).to(MOCK_TEST_1);
+ }
+ };
+ }
+
+ @Test
+ void testReadValues_Successful() throws Exception {
+ this.test1Endpoint.expectedMinimumMessageCount(1);
+
+ producer1.sendBody("Foo");
+ producer2.sendBody("Bar");
+
+ this.test1Endpoint.await();
+
+ testBody(this.test1Endpoint.message(0), assertGoodValue("Foo"));
+ }
+}
diff --git a/docs/components/modules/ROOT/pages/milo-client-component.adoc b/docs/components/modules/ROOT/pages/milo-client-component.adoc
index a0f4c43..06b739b 100644
--- a/docs/components/modules/ROOT/pages/milo-client-component.adoc
+++ b/docs/components/modules/ROOT/pages/milo-client-component.adoc
@@ -187,6 +187,7 @@ might be different, and wrong from the point of view of the connecting client (e
In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information,
but override the host information with the value of the original URI.
+[[nodeid]]
=== Node ID
@@ -230,6 +231,24 @@ Input parameters are taken from the body:
* If the body is a `Variant`, then it will be wrapped in a `Variant[]` array
* Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]`
+=== Read Values from Nodes
+
+The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format).
+
+Example:
+```java
+from("direct:start")
+ .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1")))
+ .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites"
+ .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ return newExchange;
+ }
+ }).to("mock:test1");
+```
+
=== Security policies
When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`)