You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/12/14 06:05:14 UTC

[GitHub] [camel] davsclaus commented on a change in pull request #6526: CAMEL-17319: Camel Milo: Browsing functionality

davsclaus commented on a change in pull request #6526:
URL: https://github.com/apache/camel/pull/6526#discussion_r768338741



##########
File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/browse/MiloBrowseEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.milo.browse;
+
+import java.util.Objects;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.milo.client.MiloClientConfiguration;
+import org.apache.camel.component.milo.client.MiloClientConnection;
+import org.apache.camel.component.milo.client.MiloClientConnectionManager;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connect to OPC UA servers using the binary protocol for browsing the node tree.
+ */
+@UriEndpoint(firstVersion = "3.13.0", scheme = "milo-browse", syntax = "milo-browse:endpointUri", title = "OPC UA Browser",
+             category = { Category.IOT })
+public class MiloBrowseEndpoint extends DefaultEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MiloBrowseEndpoint.class);
+
+    private final MiloClientConnectionManager connectionManager;
+
+    /**
+     * The OPC UA server endpoint
+     */
+    @UriPath
+    @Metadata(required = true)
+    private final String endpointUri;
+
+    /**
+     * The node definition (see Node ID)
+     */
+    @UriParam(defaultValue = "ns=0;id=84", defaultValueNote = "Root folder as per OPC-UA spec")
+    private String node = Identifiers.RootFolder.toParseableString();
+
+    /**
+     * The direction to browse (forward, inverse, ...)
+     */
+    @UriParam(defaultValue = "Forward",
+              defaultValueNote = "The direction to browse; See org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection")
+    private BrowseDirection direction = BrowseDirection.Forward;
+
+    /**
+     * Whether to include sub-types for browsing; only applicable for non-recursive browsing
+     */
+    @UriParam(defaultValue = "true")
+    private boolean includeSubTypes = true;
+
+    /**
+     * The mask indicating the node classes of interest in browsing
+     */
+    @UriParam(defaultValue = "Variable,Object,DataType",
+              defaultValueNote = "Comma-separated node class list; see org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass")
+    private String nodeClasses = NodeClass.Variable + "," + NodeClass.Object + "," + NodeClass.DataType;
+
+    private int nodeClassMask = NodeClass.Variable.getValue() | NodeClass.Object.getValue() | NodeClass.DataType.getValue();
+
+    /**
+     * Whether to browse recursively into sub-types, ignores includeSubTypes setting as it's implied to be set to true
+     */
+    @UriParam(defaultValue = "false",
+              defaultValueNote = "Whether to recursively browse sub-types: true|false")
+    private boolean recursive;
+
+    /**
+     * When browsing recursively into sub-types, what's the maximum search depth for diving into the tree
+     */
+    @UriParam(defaultValue = "3", defaultValueNote = "Maximum depth for browsing recursively (only if recursive = true)")
+    private int depth = 3;
+
+    /**
+     * Filter out node ids to limit browsing
+     */
+    @UriParam(defaultValue = "None", defaultValueNote = "Regular filter expression matching node ids")
+    private String filter;
+
+    /**
+     * The maximum number node ids requested per server call
+     */
+    @UriParam(defaultValue = "10",
+              defaultValueNote = "Maximum number of node ids requested per browse call (applies to browsing sub-types only; only if recursive = true)")
+    private int maxNodeIdsPerRequest = 10;
+
+    /**
+     * The client configuration
+     */
+    @UriParam
+    private MiloClientConfiguration configuration;
+
+    /**
+     * Default "await" setting for writes
+     */
+    @UriParam
+    private boolean defaultAwaitWrites;
+
+    public MiloBrowseEndpoint(final String uri, final MiloBrowseComponent component, final String endpointUri,
+                              final MiloClientConnectionManager connectionManager) {
+        super(uri, component);
+
+        Objects.requireNonNull(component);
+        Objects.requireNonNull(endpointUri);
+        Objects.requireNonNull(connectionManager);
+
+        this.endpointUri = endpointUri;
+        this.connectionManager = connectionManager;
+    }
+
+    public void setConfiguration(MiloClientConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public MiloClientConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new MiloBrowseProducer(this, defaultAwaitWrites);
+    }
+
+    @Override
+    public Consumer createConsumer(final Processor processor) throws Exception {
+        throw new UnsupportedOperationException(MiloBrowseEndpoint.class.getName() + " doesn't support a consumer");
+    }
+
+    public MiloClientConnection createConnection() {
+        return this.connectionManager.createConnection(configuration, null);
+    }
+
+    public void releaseConnection(MiloClientConnection connection) {
+        this.connectionManager.releaseConnection(connection);
+    }
+
+    public void setNode(final String node) {
+        this.node = node;
+    }
+
+    public String getNode() {
+        return node;
+    }
+
+    NodeId getNodeId() {
+        return getNodeId(this.node);
+    }
+
+    NodeId getNodeId(String nodeId) {
+        if (nodeId != null) {
+            return NodeId.parse(nodeId);
+        } else {
+            return null;
+        }
+    }
+
+    public BrowseDirection getDirection() {
+        return direction;
+    }
+
+    public void setDirection(String direction) {
+        try {
+            this.direction = BrowseDirection.valueOf(direction);
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Browsing direction '" + direction + "' not supported", e);
+        }
+    }
+
+    public boolean isIncludeSubTypes() {
+        return includeSubTypes;
+    }
+
+    public void setIncludeSubTypes(boolean includeSubTypes) {
+        this.includeSubTypes = includeSubTypes;
+    }
+
+    public String getNodeClasses() {
+        return nodeClasses;
+    }
+
+    public void setNodeClasses(String nodeClasses) {
+        this.nodeClasses = nodeClasses;
+        final String[] nodeClassArray = nodeClasses.split(",");
+        int mask = 0;
+        try {
+            for (String nodeClass : nodeClassArray) {
+                mask |= NodeClass.valueOf(nodeClass).getValue();
+            }
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Invalid node class specified: " + nodeClasses, e);
+        }
+        LOG.debug("Node class list conversion {} -> {}", nodeClasses, mask);
+        nodeClassMask = mask;
+    }
+
+    public int getNodeClassMask() {
+        return nodeClassMask;
+    }
+
+    public void setDirection(BrowseDirection direction) {
+        this.direction = direction;
+    }
+
+    public boolean isDefaultAwaitWrites() {
+        return defaultAwaitWrites;
+    }
+
+    public void setDefaultAwaitWrites(boolean defaultAwaitWrites) {
+        this.defaultAwaitWrites = defaultAwaitWrites;
+    }
+
+    public boolean isRecursive() {
+        return recursive;
+    }
+
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+
+    public int getDepth() {
+        return depth;
+    }
+
+    public void setDepth(int depth) {
+        this.depth = depth;
+    }
+
+    public String getFilter() {
+        return filter;
+    }
+
+    public void setFilter(String filter) {
+        this.filter = filter;
+    }
+
+    public int getMaxNodeIdsPerRequest() {
+        return maxNodeIdsPerRequest;
+    }
+
+    public void setMaxNodeIdsPerRequest(int maxNodeIdsPerRequest) {
+        this.maxNodeIdsPerRequest = maxNodeIdsPerRequest;
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review comment:
       Remove equals

##########
File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/browse/MiloBrowseEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.milo.browse;
+
+import java.util.Objects;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.milo.client.MiloClientConfiguration;
+import org.apache.camel.component.milo.client.MiloClientConnection;
+import org.apache.camel.component.milo.client.MiloClientConnectionManager;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connect to OPC UA servers using the binary protocol for browsing the node tree.
+ */
+@UriEndpoint(firstVersion = "3.13.0", scheme = "milo-browse", syntax = "milo-browse:endpointUri", title = "OPC UA Browser",
+             category = { Category.IOT })
+public class MiloBrowseEndpoint extends DefaultEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MiloBrowseEndpoint.class);
+
+    private final MiloClientConnectionManager connectionManager;
+
+    /**
+     * The OPC UA server endpoint
+     */
+    @UriPath
+    @Metadata(required = true)
+    private final String endpointUri;
+
+    /**
+     * The node definition (see Node ID)
+     */
+    @UriParam(defaultValue = "ns=0;id=84", defaultValueNote = "Root folder as per OPC-UA spec")
+    private String node = Identifiers.RootFolder.toParseableString();
+
+    /**
+     * The direction to browse (forward, inverse, ...)
+     */
+    @UriParam(defaultValue = "Forward",
+              defaultValueNote = "The direction to browse; See org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection")
+    private BrowseDirection direction = BrowseDirection.Forward;
+
+    /**
+     * Whether to include sub-types for browsing; only applicable for non-recursive browsing
+     */
+    @UriParam(defaultValue = "true")
+    private boolean includeSubTypes = true;
+
+    /**
+     * The mask indicating the node classes of interest in browsing
+     */
+    @UriParam(defaultValue = "Variable,Object,DataType",
+              defaultValueNote = "Comma-separated node class list; see org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass")
+    private String nodeClasses = NodeClass.Variable + "," + NodeClass.Object + "," + NodeClass.DataType;
+
+    private int nodeClassMask = NodeClass.Variable.getValue() | NodeClass.Object.getValue() | NodeClass.DataType.getValue();
+
+    /**
+     * Whether to browse recursively into sub-types, ignores includeSubTypes setting as it's implied to be set to true
+     */
+    @UriParam(defaultValue = "false",
+              defaultValueNote = "Whether to recursively browse sub-types: true|false")
+    private boolean recursive;
+
+    /**
+     * When browsing recursively into sub-types, what's the maximum search depth for diving into the tree
+     */
+    @UriParam(defaultValue = "3", defaultValueNote = "Maximum depth for browsing recursively (only if recursive = true)")
+    private int depth = 3;
+
+    /**
+     * Filter out node ids to limit browsing
+     */
+    @UriParam(defaultValue = "None", defaultValueNote = "Regular filter expression matching node ids")
+    private String filter;
+
+    /**
+     * The maximum number node ids requested per server call
+     */
+    @UriParam(defaultValue = "10",
+              defaultValueNote = "Maximum number of node ids requested per browse call (applies to browsing sub-types only; only if recursive = true)")
+    private int maxNodeIdsPerRequest = 10;
+
+    /**
+     * The client configuration
+     */
+    @UriParam
+    private MiloClientConfiguration configuration;
+
+    /**
+     * Default "await" setting for writes
+     */
+    @UriParam
+    private boolean defaultAwaitWrites;
+
+    public MiloBrowseEndpoint(final String uri, final MiloBrowseComponent component, final String endpointUri,
+                              final MiloClientConnectionManager connectionManager) {
+        super(uri, component);
+
+        Objects.requireNonNull(component);
+        Objects.requireNonNull(endpointUri);
+        Objects.requireNonNull(connectionManager);
+
+        this.endpointUri = endpointUri;
+        this.connectionManager = connectionManager;
+    }
+
+    public void setConfiguration(MiloClientConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public MiloClientConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new MiloBrowseProducer(this, defaultAwaitWrites);
+    }
+
+    @Override
+    public Consumer createConsumer(final Processor processor) throws Exception {
+        throw new UnsupportedOperationException(MiloBrowseEndpoint.class.getName() + " doesn't support a consumer");
+    }
+
+    public MiloClientConnection createConnection() {
+        return this.connectionManager.createConnection(configuration, null);
+    }
+
+    public void releaseConnection(MiloClientConnection connection) {
+        this.connectionManager.releaseConnection(connection);
+    }
+
+    public void setNode(final String node) {
+        this.node = node;
+    }
+
+    public String getNode() {
+        return node;
+    }
+
+    NodeId getNodeId() {
+        return getNodeId(this.node);
+    }
+
+    NodeId getNodeId(String nodeId) {
+        if (nodeId != null) {
+            return NodeId.parse(nodeId);
+        } else {
+            return null;
+        }
+    }
+
+    public BrowseDirection getDirection() {
+        return direction;
+    }
+
+    public void setDirection(String direction) {
+        try {
+            this.direction = BrowseDirection.valueOf(direction);
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Browsing direction '" + direction + "' not supported", e);
+        }
+    }
+
+    public boolean isIncludeSubTypes() {
+        return includeSubTypes;
+    }
+
+    public void setIncludeSubTypes(boolean includeSubTypes) {
+        this.includeSubTypes = includeSubTypes;
+    }
+
+    public String getNodeClasses() {
+        return nodeClasses;
+    }
+
+    public void setNodeClasses(String nodeClasses) {
+        this.nodeClasses = nodeClasses;
+        final String[] nodeClassArray = nodeClasses.split(",");
+        int mask = 0;
+        try {
+            for (String nodeClass : nodeClassArray) {
+                mask |= NodeClass.valueOf(nodeClass).getValue();
+            }
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Invalid node class specified: " + nodeClasses, e);
+        }
+        LOG.debug("Node class list conversion {} -> {}", nodeClasses, mask);
+        nodeClassMask = mask;
+    }
+
+    public int getNodeClassMask() {
+        return nodeClassMask;
+    }
+
+    public void setDirection(BrowseDirection direction) {
+        this.direction = direction;
+    }
+
+    public boolean isDefaultAwaitWrites() {
+        return defaultAwaitWrites;
+    }
+
+    public void setDefaultAwaitWrites(boolean defaultAwaitWrites) {
+        this.defaultAwaitWrites = defaultAwaitWrites;
+    }
+
+    public boolean isRecursive() {
+        return recursive;
+    }
+
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+
+    public int getDepth() {
+        return depth;
+    }
+
+    public void setDepth(int depth) {
+        this.depth = depth;
+    }
+
+    public String getFilter() {
+        return filter;
+    }
+
+    public void setFilter(String filter) {
+        this.filter = filter;
+    }
+
+    public int getMaxNodeIdsPerRequest() {
+        return maxNodeIdsPerRequest;
+    }
+
+    public void setMaxNodeIdsPerRequest(int maxNodeIdsPerRequest) {
+        this.maxNodeIdsPerRequest = maxNodeIdsPerRequest;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        MiloBrowseEndpoint that = (MiloBrowseEndpoint) o;
+        boolean nodeClassEquality = nodeClassMask == that.nodeClassMask && Objects.equals(nodeClasses, that.nodeClasses);
+        return includeSubTypes == that.includeSubTypes && recursive == that.recursive
+                && depth == that.depth && maxNodeIdsPerRequest == that.maxNodeIdsPerRequest
+                && defaultAwaitWrites == that.defaultAwaitWrites && Objects.equals(endpointUri, that.endpointUri)
+                && Objects.equals(node, that.node) && direction == that.direction && nodeClassEquality
+                && Objects.equals(filter, that.filter);
+    }
+
+    @Override
+    public int hashCode() {

Review comment:
       Remove

##########
File path: components/camel-milo/src/test/java/org/apache/camel/component/milo/AbstractMiloServerTest.java
##########
@@ -153,4 +156,18 @@ boolean isJavaVersionSatisfied(int requiredVersion) {
         return false;
     }
 
+    protected Predicate assertPredicate(Consumer<Exchange> consumer) {
+
+        return exchange -> {
+            try {
+                consumer.accept(exchange);
+                return true;
+            } catch (AssertionFailedError error) {
+                System.err.println("Assertion error: " + error.getMessage());
+                error.printStackTrace(System.err);

Review comment:
       use logger instead of system out

##########
File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
##########
@@ -368,6 +379,197 @@ public void dispose() {
                 return this.client.readValues(0, TimestampsToReturn.Server, nodeIds);
             });
         }
+
+        private BrowseResult filter(final BrowseResult browseResult, final Pattern pattern) {
+
+            final ReferenceDescription[] references = browseResult.getReferences();
+
+            if (null == references || null == pattern) {
+                return browseResult;
+            }
+
+            final List<ReferenceDescription> filteredReferences = new ArrayList<>();
+            for (final ReferenceDescription reference : references) {
+                final String id = reference.getNodeId().toParseableString();
+                if (!(pattern.matcher(id).matches())) {
+                    LOG.trace("Node {} excluded by filter", id);
+                    continue;
+                }
+                filteredReferences.add(reference);
+            }
+
+            return new BrowseResult(
+                    browseResult.getStatusCode(), browseResult.getContinuationPoint(),
+                    filteredReferences.toArray(new ReferenceDescription[0]));
+        }
+
+        private CompletableFuture<Map<ExpandedNodeId, BrowseResult>> flatten(
+                List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> browseResults) {
+            return CompletableFuture.allOf(browseResults.toArray(new CompletableFuture[0]))
+                    .thenApply(__ -> browseResults
+                            .stream()
+                            .map(CompletableFuture::join)
+                            .map(Map::entrySet)
+                            .flatMap(Set::stream)
+                            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2)));
+        }
+
+        // Browse at continuation point if any
+        public CompletableFuture<BrowseResult> browse(BrowseResult previousBrowseResult) {
+
+            final ByteString continuationPoint = previousBrowseResult.getContinuationPoint();
+
+            if (previousBrowseResult.getStatusCode().isGood() && continuationPoint.isNotNull()) {
+
+                return this.client.browseNext(false, continuationPoint)
+
+                        .thenCompose(browseResult -> {
+
+                            final ReferenceDescription[] previousReferences = previousBrowseResult.getReferences();
+                            final ReferenceDescription[] references = browseResult.getReferences();
+
+                            if (null == references) {
+
+                                LOG.info("Browse continuation point -> no references");
+                                return completedFuture(previousBrowseResult);
+                            } else if (null == previousReferences) {
+
+                                LOG.info("Browse continuation point -> previous references not obtained");
+                                return completedFuture(browseResult);
+                            }
+
+                            final ReferenceDescription[] combined
+                                    = Arrays.copyOf(previousReferences, previousReferences.length + references.length);
+                            System.arraycopy(references, 0, combined, previousReferences.length, references.length);
+
+                            LOG.debug("Browse continuation point -> {}: {} reference(s); total: {} reference(s)",
+                                    browseResult.getStatusCode(), references.length, combined.length);
+
+                            return browse(new BrowseResult(
+                                    browseResult.getStatusCode(), browseResult.getContinuationPoint(), combined));
+                        });
+
+            } else {
+
+                return completedFuture(previousBrowseResult);
+            }
+        }
+
+        // Browse a single node, retrieve additional results, filter node ids and eventually browse deeper into the tree
+        public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
+                BrowseDescription browseDescription, BrowseResult browseResult, int depth, int maxDepth, Pattern pattern,
+                int maxNodesPerRequest) {
+
+            return browse(browseResult)
+
+                    .thenCompose(preliminary -> completedFuture(filter(preliminary, pattern)))
+
+                    .thenCompose(filtered -> {
+
+                        final ExpandedNodeId expandedNodeId = browseDescription.getNodeId().expanded();
+                        final Map<ExpandedNodeId, BrowseResult> root = Collections.singletonMap(expandedNodeId, filtered);
+                        final CompletableFuture<Map<ExpandedNodeId, BrowseResult>> finalFuture = completedFuture(root);
+                        final ReferenceDescription[] references = filtered.getReferences();
+
+                        if (depth >= maxDepth || null == references) {
+                            return finalFuture;
+                        }
+
+                        final List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> futures = new ArrayList<>();
+
+                        // Save current node
+                        futures.add(finalFuture);
+
+                        final List<ExpandedNodeId> nodeIds = Stream.of(references)
+                                .map(ReferenceDescription::getNodeId).collect(Collectors.toList());
+
+                        final List<List<ExpandedNodeId>> lists = Lists.partition(nodeIds, maxNodesPerRequest);
+                        for (final List<ExpandedNodeId> list : lists) {
+                            futures.add(browse(list, browseDescription.getBrowseDirection(),
+                                    browseDescription.getNodeClassMask().intValue(), depth + 1, maxDepth, pattern,
+                                    browseDescription.getIncludeSubtypes(), maxNodesPerRequest));
+                        }
+
+                        return flatten(futures);
+                    });
+        }
+
+        // Browse according to a list of browse descriptions
+        public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
+                List<BrowseDescription> browseDescriptions,
+                int depth, int maxDepth, Pattern pattern, int maxNodesPerRequest) {
+
+            return this.client.browse(browseDescriptions)
+
+                    .thenCompose(partials -> {
+
+                        // Fail a bit more gracefully in case of missing results
+                        if (partials.size() != browseDescriptions.size()) {
+
+                            // @TODO: Replace with Java 9 functionality
+                            final CompletableFuture<Map<ExpandedNodeId, BrowseResult>> failedFuture = new CompletableFuture<>();
+                            failedFuture.completeExceptionally(new IllegalArgumentException(
+                                    format(
+                                            "Invalid number of browse results: %s, expected %s", partials.size(),
+                                            browseDescriptions.size())));
+                            return failedFuture;
+                        }
+
+                        final List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> futures = new ArrayList<>();
+
+                        for (int i = 0; i < partials.size(); i++) {
+
+                            futures.add(browse(browseDescriptions.get(i), partials.get(i), depth, maxDepth, pattern,
+                                    maxNodesPerRequest));
+                        }
+
+                        return flatten(futures);
+                    });
+        }
+
+        // Wrapper for looking up nodes and instantiating initial browse descriptions according to the configuration provided
+        @SuppressWarnings("unchecked")
+        public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
+                List<ExpandedNodeId> expandedNodeIds, BrowseDirection direction, int nodeClasses, int depth, int maxDepth,
+                Pattern pattern, boolean includeSubTypes, int maxNodesPerRequest) {
+
+            final CompletableFuture<NodeId>[] futures = expandedNodeIds.stream().map(this::lookupNamespace)
+                    .toArray(CompletableFuture[]::new);
+
+            return CompletableFuture.allOf(futures)
+
+                    .thenCompose(__ -> {
+
+                        List<NodeId> nodeIds = Stream.of(futures).map(CompletableFuture::join)
+                                .collect(Collectors.toList());
+
+                        return completedFuture(nodeIds.stream().map(nodeId -> new BrowseDescription(
+                                nodeId, direction, Identifiers.References, includeSubTypes, uint(nodeClasses),
+                                uint(BrowseResultMask.All.getValue()))).collect(Collectors.toList()));
+                    })
+
+                    .thenCompose(descriptions -> browse(descriptions, depth, maxDepth, pattern, maxNodesPerRequest))
+
+                    .whenComplete((actual, error) -> {
+
+                        if (!LOG.isErrorEnabled()) {
+
+                            return;
+                        }
+
+                        final String expandedNodeIdsString = expandedNodeIds.stream()
+                                .map(ExpandedNodeId::toParseableString)
+                                .collect(Collectors.joining(", "));
+
+                        if (actual != null) {
+                            LOG.debug("Browse node(s) {} -> {} result(s)", expandedNodeIdsString, actual.size());
+
+                        } else {
+                            LOG.error("Browse node(s) {} -> failed: {}", expandedNodeIdsString, error);

Review comment:
       Can we do something else than just log an error? Set an exception on the exchange as a result that this failed

##########
File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/browse/MiloBrowseEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.milo.browse;
+
+import java.util.Objects;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.milo.client.MiloClientConfiguration;
+import org.apache.camel.component.milo.client.MiloClientConnection;
+import org.apache.camel.component.milo.client.MiloClientConnectionManager;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Connect to OPC UA servers using the binary protocol for browsing the node tree.
+ */
+@UriEndpoint(firstVersion = "3.13.0", scheme = "milo-browse", syntax = "milo-browse:endpointUri", title = "OPC UA Browser",
+             category = { Category.IOT })
+public class MiloBrowseEndpoint extends DefaultEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MiloBrowseEndpoint.class);
+
+    private final MiloClientConnectionManager connectionManager;
+
+    /**
+     * The OPC UA server endpoint
+     */
+    @UriPath
+    @Metadata(required = true)
+    private final String endpointUri;
+
+    /**
+     * The node definition (see Node ID)
+     */
+    @UriParam(defaultValue = "ns=0;id=84", defaultValueNote = "Root folder as per OPC-UA spec")
+    private String node = Identifiers.RootFolder.toParseableString();
+
+    /**
+     * The direction to browse (forward, inverse, ...)
+     */
+    @UriParam(defaultValue = "Forward",
+              defaultValueNote = "The direction to browse; See org.eclipse.milo.opcua.stack.core.types.enumerated.BrowseDirection")
+    private BrowseDirection direction = BrowseDirection.Forward;
+
+    /**
+     * Whether to include sub-types for browsing; only applicable for non-recursive browsing
+     */
+    @UriParam(defaultValue = "true")
+    private boolean includeSubTypes = true;
+
+    /**
+     * The mask indicating the node classes of interest in browsing
+     */
+    @UriParam(defaultValue = "Variable,Object,DataType",
+              defaultValueNote = "Comma-separated node class list; see org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass")
+    private String nodeClasses = NodeClass.Variable + "," + NodeClass.Object + "," + NodeClass.DataType;
+
+    private int nodeClassMask = NodeClass.Variable.getValue() | NodeClass.Object.getValue() | NodeClass.DataType.getValue();
+
+    /**
+     * Whether to browse recursively into sub-types, ignores includeSubTypes setting as it's implied to be set to true
+     */
+    @UriParam(defaultValue = "false",
+              defaultValueNote = "Whether to recursively browse sub-types: true|false")
+    private boolean recursive;
+
+    /**
+     * When browsing recursively into sub-types, what's the maximum search depth for diving into the tree
+     */
+    @UriParam(defaultValue = "3", defaultValueNote = "Maximum depth for browsing recursively (only if recursive = true)")
+    private int depth = 3;
+
+    /**
+     * Filter out node ids to limit browsing
+     */
+    @UriParam(defaultValue = "None", defaultValueNote = "Regular filter expression matching node ids")
+    private String filter;
+
+    /**
+     * The maximum number node ids requested per server call
+     */
+    @UriParam(defaultValue = "10",
+              defaultValueNote = "Maximum number of node ids requested per browse call (applies to browsing sub-types only; only if recursive = true)")
+    private int maxNodeIdsPerRequest = 10;
+
+    /**
+     * The client configuration
+     */
+    @UriParam
+    private MiloClientConfiguration configuration;
+
+    /**
+     * Default "await" setting for writes
+     */
+    @UriParam
+    private boolean defaultAwaitWrites;
+
+    public MiloBrowseEndpoint(final String uri, final MiloBrowseComponent component, final String endpointUri,
+                              final MiloClientConnectionManager connectionManager) {
+        super(uri, component);
+
+        Objects.requireNonNull(component);
+        Objects.requireNonNull(endpointUri);
+        Objects.requireNonNull(connectionManager);
+
+        this.endpointUri = endpointUri;
+        this.connectionManager = connectionManager;
+    }
+
+    public void setConfiguration(MiloClientConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public MiloClientConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new MiloBrowseProducer(this, defaultAwaitWrites);
+    }
+
+    @Override
+    public Consumer createConsumer(final Processor processor) throws Exception {
+        throw new UnsupportedOperationException(MiloBrowseEndpoint.class.getName() + " doesn't support a consumer");
+    }
+
+    public MiloClientConnection createConnection() {
+        return this.connectionManager.createConnection(configuration, null);
+    }
+
+    public void releaseConnection(MiloClientConnection connection) {
+        this.connectionManager.releaseConnection(connection);
+    }
+
+    public void setNode(final String node) {
+        this.node = node;
+    }
+
+    public String getNode() {
+        return node;
+    }
+
+    NodeId getNodeId() {
+        return getNodeId(this.node);
+    }
+
+    NodeId getNodeId(String nodeId) {
+        if (nodeId != null) {
+            return NodeId.parse(nodeId);
+        } else {
+            return null;
+        }
+    }
+
+    public BrowseDirection getDirection() {
+        return direction;
+    }
+
+    public void setDirection(String direction) {

Review comment:
       The getter/setter type should be the same - Camel knows how to convert string to enum

##########
File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/browse/MiloBrowseProducer.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.milo.browse;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.milo.client.MiloClientConnection;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.structured.BrowseResult;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReferenceDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.camel.component.milo.NodeIds.HEADER_NODE_IDS;
+
+public class MiloBrowseProducer extends DefaultAsyncProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MiloBrowseProducer.class);
+
+    private MiloClientConnection connection;
+
+    private final boolean defaultAwaitWrites;
+
+    public MiloBrowseProducer(final MiloBrowseEndpoint endpoint, final boolean defaultAwaitWrites) {
+        super(endpoint);
+
+        this.defaultAwaitWrites = defaultAwaitWrites;
+    }
+
+    @Override
+    public MiloBrowseEndpoint getEndpoint() {
+        return (MiloBrowseEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.connection = getEndpoint().createConnection();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (null != this.connection) {
+            getEndpoint().releaseConnection(connection);
+        }
+        super.doStop();
+    }
+
+    private ExpandedNodeId tryParse(String nodeString) {
+        final Optional<NodeId> nodeId = NodeId.parseSafe(nodeString);
+        return nodeId.map(NodeId::expanded).orElseGet(() -> ExpandedNodeId.parse(nodeString));
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback async) {
+
+        final Message message = exchange.getMessage();
+        final List<ExpandedNodeId> expandedNodeIds = new ArrayList<>();
+
+        if (message.getHeaders().containsKey(HEADER_NODE_IDS)) {
+
+            final List<?> nodes
+                    = message.getHeader(HEADER_NODE_IDS, Collections.singletonList(getEndpoint().getNode()), List.class);
+            message.removeHeader(HEADER_NODE_IDS);
+            if (null == nodes) {
+
+                return false;

Review comment:
       this is wrong, you should
   
   async.done(true)
   return true

##########
File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/browse/MiloBrowseProducer.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.milo.browse;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.milo.client.MiloClientConnection;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.structured.BrowseResult;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReferenceDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.camel.component.milo.NodeIds.HEADER_NODE_IDS;
+
+public class MiloBrowseProducer extends DefaultAsyncProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MiloBrowseProducer.class);
+
+    private MiloClientConnection connection;
+
+    private final boolean defaultAwaitWrites;
+
+    public MiloBrowseProducer(final MiloBrowseEndpoint endpoint, final boolean defaultAwaitWrites) {
+        super(endpoint);
+
+        this.defaultAwaitWrites = defaultAwaitWrites;
+    }
+
+    @Override
+    public MiloBrowseEndpoint getEndpoint() {
+        return (MiloBrowseEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.connection = getEndpoint().createConnection();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (null != this.connection) {
+            getEndpoint().releaseConnection(connection);
+        }
+        super.doStop();
+    }
+
+    private ExpandedNodeId tryParse(String nodeString) {
+        final Optional<NodeId> nodeId = NodeId.parseSafe(nodeString);
+        return nodeId.map(NodeId::expanded).orElseGet(() -> ExpandedNodeId.parse(nodeString));
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback async) {
+
+        final Message message = exchange.getMessage();
+        final List<ExpandedNodeId> expandedNodeIds = new ArrayList<>();
+
+        if (message.getHeaders().containsKey(HEADER_NODE_IDS)) {
+
+            final List<?> nodes
+                    = message.getHeader(HEADER_NODE_IDS, Collections.singletonList(getEndpoint().getNode()), List.class);
+            message.removeHeader(HEADER_NODE_IDS);
+            if (null == nodes) {
+
+                return false;
+            }
+
+            for (final Object node : nodes) {
+                expandedNodeIds.add(tryParse(node.toString()));
+            }
+        } else {
+
+            expandedNodeIds.add(tryParse(this.getEndpoint().getNode()));
+        }
+
+        final MiloBrowseEndpoint endpoint = this.getEndpoint();
+        final int depth = endpoint.isRecursive() ? endpoint.getDepth() : -1;
+        final boolean subTypes = endpoint.isIncludeSubTypes() || endpoint.isRecursive();
+
+        final CompletableFuture<?> future = this.connection
+                .browse(expandedNodeIds, endpoint.getDirection(), endpoint.getNodeClassMask(), depth, endpoint.getFilter(),
+                        subTypes, endpoint.getMaxNodeIdsPerRequest())
+                .thenApply(browseResults -> {
+
+                    final List<String> expandedNodes = browseResults.values().stream()
+                            .map(BrowseResult::getReferences)
+                            .flatMap(Stream::of)
+                            .map(ReferenceDescription::getNodeId)
+                            .map(ExpandedNodeId::toParseableString)
+                            .collect(Collectors.toList());
+
+                    // For convenience, to be used with the milo-client producer
+                    exchange.getMessage().setHeader(HEADER_NODE_IDS, expandedNodes);
+
+                    exchange.getMessage().setBody(browseResults);
+
+                    return browseResults;
+                });
+
+        final Boolean await = message.getHeader("await", this.defaultAwaitWrites, Boolean.class);

Review comment:
       This seems wrong that a header determines to wait for the future - what is the special use-case. We do not do this in other components, so please remove.

##########
File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/browse/MiloBrowseProducer.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.milo.browse;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.milo.client.MiloClientConnection;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.structured.BrowseResult;
+import org.eclipse.milo.opcua.stack.core.types.structured.ReferenceDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.Boolean.TRUE;
+import static org.apache.camel.component.milo.NodeIds.HEADER_NODE_IDS;
+
+public class MiloBrowseProducer extends DefaultAsyncProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MiloBrowseProducer.class);
+
+    private MiloClientConnection connection;
+
+    private final boolean defaultAwaitWrites;
+
+    public MiloBrowseProducer(final MiloBrowseEndpoint endpoint, final boolean defaultAwaitWrites) {
+        super(endpoint);
+
+        this.defaultAwaitWrites = defaultAwaitWrites;
+    }
+
+    @Override
+    public MiloBrowseEndpoint getEndpoint() {
+        return (MiloBrowseEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.connection = getEndpoint().createConnection();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (null != this.connection) {
+            getEndpoint().releaseConnection(connection);
+        }
+        super.doStop();
+    }
+
+    private ExpandedNodeId tryParse(String nodeString) {
+        final Optional<NodeId> nodeId = NodeId.parseSafe(nodeString);
+        return nodeId.map(NodeId::expanded).orElseGet(() -> ExpandedNodeId.parse(nodeString));
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback async) {
+
+        final Message message = exchange.getMessage();
+        final List<ExpandedNodeId> expandedNodeIds = new ArrayList<>();
+
+        if (message.getHeaders().containsKey(HEADER_NODE_IDS)) {
+
+            final List<?> nodes
+                    = message.getHeader(HEADER_NODE_IDS, Collections.singletonList(getEndpoint().getNode()), List.class);
+            message.removeHeader(HEADER_NODE_IDS);
+            if (null == nodes) {
+
+                return false;
+            }
+
+            for (final Object node : nodes) {
+                expandedNodeIds.add(tryParse(node.toString()));
+            }
+        } else {
+
+            expandedNodeIds.add(tryParse(this.getEndpoint().getNode()));
+        }
+
+        final MiloBrowseEndpoint endpoint = this.getEndpoint();
+        final int depth = endpoint.isRecursive() ? endpoint.getDepth() : -1;
+        final boolean subTypes = endpoint.isIncludeSubTypes() || endpoint.isRecursive();
+
+        final CompletableFuture<?> future = this.connection
+                .browse(expandedNodeIds, endpoint.getDirection(), endpoint.getNodeClassMask(), depth, endpoint.getFilter(),
+                        subTypes, endpoint.getMaxNodeIdsPerRequest())
+                .thenApply(browseResults -> {
+
+                    final List<String> expandedNodes = browseResults.values().stream()
+                            .map(BrowseResult::getReferences)
+                            .flatMap(Stream::of)
+                            .map(ReferenceDescription::getNodeId)
+                            .map(ExpandedNodeId::toParseableString)
+                            .collect(Collectors.toList());
+
+                    // For convenience, to be used with the milo-client producer
+                    exchange.getMessage().setHeader(HEADER_NODE_IDS, expandedNodes);
+
+                    exchange.getMessage().setBody(browseResults);
+
+                    return browseResults;
+                });
+
+        final Boolean await = message.getHeader("await", this.defaultAwaitWrites, Boolean.class);
+
+        if (TRUE.equals(await)) {
+            future.whenComplete((v, ex) -> async.done(false));
+            return false;
+        } else {
+            return true;

Review comment:
       async.done(true)
   return true

##########
File path: components/camel-milo/src/main/java/org/apache/camel/component/milo/client/internal/SubscriptionManager.java
##########
@@ -368,6 +379,197 @@ public void dispose() {
                 return this.client.readValues(0, TimestampsToReturn.Server, nodeIds);
             });
         }
+
+        private BrowseResult filter(final BrowseResult browseResult, final Pattern pattern) {
+
+            final ReferenceDescription[] references = browseResult.getReferences();
+
+            if (null == references || null == pattern) {
+                return browseResult;
+            }
+
+            final List<ReferenceDescription> filteredReferences = new ArrayList<>();
+            for (final ReferenceDescription reference : references) {
+                final String id = reference.getNodeId().toParseableString();
+                if (!(pattern.matcher(id).matches())) {
+                    LOG.trace("Node {} excluded by filter", id);
+                    continue;
+                }
+                filteredReferences.add(reference);
+            }
+
+            return new BrowseResult(
+                    browseResult.getStatusCode(), browseResult.getContinuationPoint(),
+                    filteredReferences.toArray(new ReferenceDescription[0]));
+        }
+
+        private CompletableFuture<Map<ExpandedNodeId, BrowseResult>> flatten(
+                List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> browseResults) {
+            return CompletableFuture.allOf(browseResults.toArray(new CompletableFuture[0]))
+                    .thenApply(__ -> browseResults
+                            .stream()
+                            .map(CompletableFuture::join)
+                            .map(Map::entrySet)
+                            .flatMap(Set::stream)
+                            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2)));
+        }
+
+        // Browse at continuation point if any
+        public CompletableFuture<BrowseResult> browse(BrowseResult previousBrowseResult) {
+
+            final ByteString continuationPoint = previousBrowseResult.getContinuationPoint();
+
+            if (previousBrowseResult.getStatusCode().isGood() && continuationPoint.isNotNull()) {
+
+                return this.client.browseNext(false, continuationPoint)
+
+                        .thenCompose(browseResult -> {
+
+                            final ReferenceDescription[] previousReferences = previousBrowseResult.getReferences();
+                            final ReferenceDescription[] references = browseResult.getReferences();
+
+                            if (null == references) {
+
+                                LOG.info("Browse continuation point -> no references");
+                                return completedFuture(previousBrowseResult);
+                            } else if (null == previousReferences) {
+
+                                LOG.info("Browse continuation point -> previous references not obtained");
+                                return completedFuture(browseResult);
+                            }
+
+                            final ReferenceDescription[] combined
+                                    = Arrays.copyOf(previousReferences, previousReferences.length + references.length);
+                            System.arraycopy(references, 0, combined, previousReferences.length, references.length);
+
+                            LOG.debug("Browse continuation point -> {}: {} reference(s); total: {} reference(s)",
+                                    browseResult.getStatusCode(), references.length, combined.length);
+
+                            return browse(new BrowseResult(
+                                    browseResult.getStatusCode(), browseResult.getContinuationPoint(), combined));
+                        });
+
+            } else {
+
+                return completedFuture(previousBrowseResult);
+            }
+        }
+
+        // Browse a single node, retrieve additional results, filter node ids and eventually browse deeper into the tree
+        public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
+                BrowseDescription browseDescription, BrowseResult browseResult, int depth, int maxDepth, Pattern pattern,
+                int maxNodesPerRequest) {
+
+            return browse(browseResult)
+
+                    .thenCompose(preliminary -> completedFuture(filter(preliminary, pattern)))
+
+                    .thenCompose(filtered -> {
+
+                        final ExpandedNodeId expandedNodeId = browseDescription.getNodeId().expanded();
+                        final Map<ExpandedNodeId, BrowseResult> root = Collections.singletonMap(expandedNodeId, filtered);
+                        final CompletableFuture<Map<ExpandedNodeId, BrowseResult>> finalFuture = completedFuture(root);
+                        final ReferenceDescription[] references = filtered.getReferences();
+
+                        if (depth >= maxDepth || null == references) {
+                            return finalFuture;
+                        }
+
+                        final List<CompletableFuture<Map<ExpandedNodeId, BrowseResult>>> futures = new ArrayList<>();
+
+                        // Save current node
+                        futures.add(finalFuture);
+
+                        final List<ExpandedNodeId> nodeIds = Stream.of(references)
+                                .map(ReferenceDescription::getNodeId).collect(Collectors.toList());
+
+                        final List<List<ExpandedNodeId>> lists = Lists.partition(nodeIds, maxNodesPerRequest);
+                        for (final List<ExpandedNodeId> list : lists) {
+                            futures.add(browse(list, browseDescription.getBrowseDirection(),
+                                    browseDescription.getNodeClassMask().intValue(), depth + 1, maxDepth, pattern,
+                                    browseDescription.getIncludeSubtypes(), maxNodesPerRequest));
+                        }
+
+                        return flatten(futures);
+                    });
+        }
+
+        // Browse according to a list of browse descriptions
+        public CompletableFuture<Map<ExpandedNodeId, BrowseResult>> browse(
+                List<BrowseDescription> browseDescriptions,
+                int depth, int maxDepth, Pattern pattern, int maxNodesPerRequest) {
+
+            return this.client.browse(browseDescriptions)
+
+                    .thenCompose(partials -> {
+
+                        // Fail a bit more gracefully in case of missing results
+                        if (partials.size() != browseDescriptions.size()) {
+
+                            // @TODO: Replace with Java 9 functionality

Review comment:
       Camel 3.15 will drop java 8, so when we are ready then do this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org