You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/24 07:54:00 UTC

[camel] branch master updated: camel-milo: added possibility to read node values via producer. (#4909)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 811d93c  camel-milo: added possibility to read node values via producer. (#4909)
811d93c is described below

commit 811d93c1e4199612aac81ab25a2ee51aceb3474c
Author: attrobit <gi...@attrobit.de>
AuthorDate: Sun Jan 24 08:53:23 2021 +0100

    camel-milo: added possibility to read node values via producer. (#4909)
    
    * camel-milo: added possibility to read node values via producer.
    
    * camel-milo: added possibility to read node values via producer.
    
    * Fixed Producer construction after rebase.
    
    * documentation CamelMiloNodeIds
    
    Co-authored-by: attrobit <gi...@attrobit.com>
---
 .../camel/catalog/docs/milo-client-component.adoc  | 19 +++++
 .../src/main/docs/milo-client-component.adoc       | 19 +++++
 .../milo/client/MiloClientConnection.java          |  7 ++
 .../component/milo/client/MiloClientEndpoint.java  |  2 +-
 .../component/milo/client/MiloClientProducer.java  | 16 ++++-
 .../milo/client/internal/SubscriptionManager.java  | 39 ++++++++--
 .../camel/component/milo/ReadValuesClientTest.java | 83 ++++++++++++++++++++++
 .../modules/ROOT/pages/milo-client-component.adoc  | 19 +++++
 8 files changed, 197 insertions(+), 7 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc
index 985979f..fdec769 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/milo-client-component.adoc
@@ -185,6 +185,7 @@ might be different, and wrong from the point of view of the connecting client (e
 In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information,
 but override the host information with the value of the original URI.
 
+[[nodeid]]
 === Node ID
 
 
@@ -228,6 +229,24 @@ Input parameters are taken from the body:
 * If the body is a `Variant`, then it will be wrapped in a `Variant[]` array
 * Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]`
 
+=== Read Values from Nodes
+
+The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format).
+
+Example:
+```java
+from("direct:start")
+    .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1")))
+    .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites"
+        .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() {
+
+            @Override
+            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                return newExchange;
+            }
+        }).to("mock:test1");
+```
+
 === Security policies
 
 When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`)
diff --git a/components/camel-milo/src/main/docs/milo-client-component.adoc b/components/camel-milo/src/main/docs/milo-client-component.adoc
index 985979f..fdec769 100644
--- a/components/camel-milo/src/main/docs/milo-client-component.adoc
+++ b/components/camel-milo/src/main/docs/milo-client-component.adoc
@@ -185,6 +185,7 @@ might be different, and wrong from the point of view of the connecting client (e
 In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information,
 but override the host information with the value of the original URI.
 
+[[nodeid]]
 === Node ID
 
 
@@ -228,6 +229,24 @@ Input parameters are taken from the body:
 * If the body is a `Variant`, then it will be wrapped in a `Variant[]` array
 * Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]`
 
+=== Read Values from Nodes
+
+The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format).
+
+Example:
+```java
+from("direct:start")
+    .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1")))
+    .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites"
+        .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() {
+
+            @Override
+            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                return newExchange;
+            }
+        }).to("mock:test1");
+```
+
 === Security policies
 
 When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`)
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
index 7af45a6..b282a2d 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConnection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.milo.client;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
@@ -107,6 +108,12 @@ public class MiloClientConnection implements AutoCloseable {
         return this.manager.write(nodeId, mapWriteValue(value));
     }
 
+    public CompletableFuture<?> readValues(final List<ExpandedNodeId> nodeIds) {
+        checkInit();
+
+        return this.manager.readValues(nodeIds);
+    }
+
     public CompletableFuture<CallMethodResult> call(
             final ExpandedNodeId nodeId, final ExpandedNodeId methodId, final Object value) {
         checkInit();
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
index f689e62..75bc2c6 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientEndpoint.java
@@ -110,7 +110,7 @@ public class MiloClientEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new MiloClientProducer(this, this.defaultAwaitWrites);
+        return new MiloClientProducer(this, this.createConnection(), this.defaultAwaitWrites);
     }
 
     @Override
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
index d244236..8de1e59 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientProducer.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.milo.client;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -35,7 +37,9 @@ public class MiloClientProducer extends DefaultAsyncProducer {
 
     private final boolean defaultAwaitWrites;
 
-    public MiloClientProducer(final MiloClientEndpoint endpoint,
+    private static final String HEADER_NODE_IDS = "CamelMiloNodeIds";
+
+    public MiloClientProducer(final MiloClientEndpoint endpoint, final MiloClientConnection connection,
                               final boolean defaultAwaitWrites) {
         super(endpoint);
 
@@ -74,7 +78,15 @@ public class MiloClientProducer extends DefaultAsyncProducer {
 
         final CompletableFuture<?> future;
 
-        if (this.methodId == null) {
+        if (msg.getHeaders().containsKey(HEADER_NODE_IDS)) {
+            final List<String> nodeIds = msg.getHeader(HEADER_NODE_IDS, List.class);
+            final List<ExpandedNodeId> expandedNodeIds
+                    = nodeIds.stream().map(String.class::cast).map(ExpandedNodeId::parse).collect(Collectors.toList());
+            future = this.connection.readValues(expandedNodeIds).thenApply(nodes -> {
+                exchange.getIn().setBody(nodes);
+                return nodes;
+            });
+        } else if (this.methodId == null) {
             future = this.connection.writeValue(this.nodeId, value);
         } else {
             future = this.connection.call(this.nodeId, this.methodId, value);
diff --git a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
index 95410d0..2b49043 100644
--- a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
+++ b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
@@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.base.Strings;
 import org.apache.camel.component.milo.client.MiloClientConfiguration;
@@ -188,9 +190,8 @@ public class SubscriptionManager {
                     final ReadValueId itemId = new ReadValueId(node, AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
                     Double samplingInterval = s.getSamplingInterval();
 
-                    final MonitoringParameters parameters
-                            = new MonitoringParameters(
-                                    entry.getKey(), samplingInterval, s.createMonitoringFilter(client), null, null);
+                    final MonitoringParameters parameters = new MonitoringParameters(
+                            entry.getKey(), samplingInterval, s.createMonitoringFilter(client), null, null);
                     items.add(new MonitoredItemCreateRequest(itemId, MonitoringMode.Reporting, parameters));
                 }
             }
@@ -355,6 +356,17 @@ public class SubscriptionManager {
 
             });
         }
+
+        public CompletableFuture<List<DataValue>> readValues(List<ExpandedNodeId> expandedNodeIds) {
+
+            final CompletableFuture<NodeId>[] nodeIdFutures
+                    = expandedNodeIds.stream().map(this::lookupNamespace).toArray(CompletableFuture[]::new);
+
+            return CompletableFuture.allOf(nodeIdFutures).thenCompose(param -> {
+                List<NodeId> nodeIds = Stream.of(nodeIdFutures).map(CompletableFuture::join).collect(Collectors.toList());
+                return this.client.readValues(0, TimestampsToReturn.Server, nodeIds);
+            });
+        }
     }
 
     private final MiloClientConfiguration configuration;
@@ -449,7 +461,9 @@ public class SubscriptionManager {
 
         final URI uri = URI.create(getEndpointDiscoveryUri());
 
-        //milo library doesn't allow user info as a part of the uri, it has to be removed before sending to milo
+        // milo library doesn't allow user info as a part of the uri, it has to
+        // be
+        // removed before sending to milo
         final String user = uri.getUserInfo();
         if (user != null && !user.isEmpty()) {
             discoveryUri = discoveryUri.replaceFirst(user + "@", "");
@@ -706,4 +720,21 @@ public class SubscriptionManager {
         }
     }
 
+    public CompletableFuture<?> readValues(final List<ExpandedNodeId> nodeIds) {
+        synchronized (this) {
+            if (this.connected == null) {
+                return newNotConnectedResult();
+            }
+
+            return this.connected.readValues(nodeIds).handleAsync((nodes, e) -> {
+                // handle outside the lock, running using
+                // handleAsync
+                if (e != null) {
+                    handleConnectionFailue(e);
+                }
+                return nodes;
+            }, this.executor);
+        }
+    }
+
 }
diff --git a/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java b/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java
new file mode 100644
index 0000000..7b082fd
--- /dev/null
+++ b/components/camel-milo/src/test/java/org/apache/camel/component/milo/ReadValuesClientTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.milo;
+
+import java.util.Arrays;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+public class ReadValuesClientTest extends AbstractMiloServerTest {
+
+    private static final String DIRECT_START_1 = "direct:start1";
+    private static final String DIRECT_START_2 = "direct:start2";
+
+    private static final String MILO_SERVER_ITEM_1 = "milo-server:myitem1";
+
+    private static final String MILO_CLIENT_ITEM_C1_1
+            = "milo-client:opc.tcp://foo:bar@localhost:@@port@@?allowedSecurityPolicies=None&overrideHost=true";
+
+    private static final String MOCK_TEST_1 = "mock:test1";
+
+    @EndpointInject(MOCK_TEST_1)
+    protected MockEndpoint test1Endpoint;
+
+    @Produce(DIRECT_START_1)
+    protected ProducerTemplate producer1;
+
+    @Produce(DIRECT_START_2)
+    protected ProducerTemplate producer2;
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(DIRECT_START_1).to(MILO_SERVER_ITEM_1);
+
+                from(DIRECT_START_2)
+                        .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1")))
+                        .enrich(resolve(MILO_CLIENT_ITEM_C1_1), new AggregationStrategy() {
+
+                            @Override
+                            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                                return newExchange;
+                            }
+                        }).to(MOCK_TEST_1);
+            }
+        };
+    }
+
+    @Test
+    void testReadValues_Successful() throws Exception {
+        this.test1Endpoint.expectedMinimumMessageCount(1);
+
+        producer1.sendBody("Foo");
+        producer2.sendBody("Bar");
+
+        this.test1Endpoint.await();
+
+        testBody(this.test1Endpoint.message(0), assertGoodValue("Foo"));
+    }
+}
diff --git a/docs/components/modules/ROOT/pages/milo-client-component.adoc b/docs/components/modules/ROOT/pages/milo-client-component.adoc
index a0f4c43..06b739b 100644
--- a/docs/components/modules/ROOT/pages/milo-client-component.adoc
+++ b/docs/components/modules/ROOT/pages/milo-client-component.adoc
@@ -187,6 +187,7 @@ might be different, and wrong from the point of view of the connecting client (e
 In this case it is possible to set the parameter `overrideHost` to `true`, which will take the discovered endpoint information,
 but override the host information with the value of the original URI.
 
+[[nodeid]]
 === Node ID
 
 
@@ -230,6 +231,24 @@ Input parameters are taken from the body:
 * If the body is a `Variant`, then it will be wrapped in a `Variant[]` array
 * Otherwise the body will be converted into a `Variant` and wrapped in an array of `Variant[]`
 
+=== Read Values from Nodes
+
+The component provide a producer to read values from multiple opc-ua nodes. The Node-IDs will be defined in the header `CamelMiloNodeIds` as list of strings. (see <<nodeid, Node-ID>> for the ID format).
+
+Example:
+```java
+from("direct:start")
+    .setHeader("CamelMiloNodeIds", constant(Arrays.asList("nsu=urn:org:apache:camel;s=myitem1")))
+    .setHeader("await", constant(true)) // await: parameter "defaultAwaitWrites"
+        .enrich("milo-client:opc.tcp://localhost:4334", new AggregationStrategy() {
+
+            @Override
+            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                return newExchange;
+            }
+        }).to("mock:test1");
+```
+
 === Security policies
 
 When setting the allowing security policies is it possible to use the well known OPC UA URIs (e.g. `\http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15`)