You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/08/02 09:03:09 UTC

[camel] branch main updated: CAMEL-19692: Improve producer sendToAll handling of parameterized server paths

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e539c3056a CAMEL-19692: Improve producer sendToAll handling of parameterized server paths
8e539c3056a is described below

commit 8e539c3056a1cd5105e5bd12efb2f0b3ffdbc16f
Author: James Netherton <ja...@gmail.com>
AuthorDate: Tue Aug 1 10:36:01 2023 +0100

    CAMEL-19692: Improve producer sendToAll handling of parameterized server paths
---
 .../vertx/websocket/VertxWebsocketEndpoint.java    |  22 +-
 .../vertx/websocket/VertxWebsocketHelper.java      |  20 +-
 .../vertx/websocket/VertxWebsocketHost.java        |  40 +--
 .../vertx/websocket/VertxWebsocketPeer.java        |  70 ++++++
 .../vertx/websocket/VertxWebsocketHelperTest.java  |  21 --
 .../vertx/websocket/VertxWebsocketTest.java        | 279 ++++++++++++++++++++-
 6 files changed, 392 insertions(+), 60 deletions(-)

diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
index 3a345624138..e160b4204b4 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
@@ -202,10 +202,10 @@ public class VertxWebsocketEndpoint extends DefaultEndpoint {
     protected ServerWebSocket findPeerForConnectionKey(String connectionKey) {
         Map<VertxWebsocketHostKey, VertxWebsocketHost> registry = getVertxHostRegistry();
         for (VertxWebsocketHost host : registry.values()) {
-            Map<String, ServerWebSocket> hostPeers = host.getConnectedPeers();
-            if (hostPeers.containsKey(connectionKey) && host.isManagedHost(getConfiguration().getWebsocketURI().getHost())
+            VertxWebsocketPeer peer = host.getConnectedPeer(connectionKey);
+            if (peer != null && host.isManagedHost(getConfiguration().getWebsocketURI().getHost())
                     && host.isManagedPort(getConfiguration().getWebsocketURI().getPort())) {
-                return hostPeers.get(connectionKey);
+                return peer.getWebSocket();
             }
         }
         return null;
@@ -220,9 +220,17 @@ public class VertxWebsocketEndpoint extends DefaultEndpoint {
                 .stream()
                 .filter(host -> host.isManagedHost(getConfiguration().getWebsocketURI().getHost()))
                 .filter(host -> host.isManagedPort(getConfiguration().getWebsocketURI().getPort()))
-                .flatMap(host -> host.getConnectedPeers().entrySet().stream())
-                .filter(entry -> VertxWebsocketHelper.webSocketHostPathMatches(entry.getValue().path(),
-                        getConfiguration().getWebsocketURI().getPath()))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+                .flatMap(host -> host.getConnectedPeers().stream())
+                .filter(connectedPeer -> {
+                    String producerPath = getConfiguration().getWebsocketURI().getPath();
+                    String peerConnectedPath;
+                    if (producerPath.contains("{") || producerPath.contains("*")) {
+                        peerConnectedPath = connectedPeer.getRawPath();
+                    } else {
+                        peerConnectedPath = connectedPeer.getPath();
+                    }
+                    return VertxWebsocketHelper.webSocketHostPathMatches(peerConnectedPath, producerPath);
+                })
+                .collect(Collectors.toMap(VertxWebsocketPeer::getConnectionKey, VertxWebsocketPeer::getWebSocket));
     }
 }
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java
index 27f61c5b483..3e580902b83 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java
@@ -84,24 +84,10 @@ public final class VertxWebsocketHelper {
             return false;
         }
 
-        if (normalizedHostPath.contains("{")) {
-            // For a parameterized paths verify the non-parameterized elements match
-            for (int i = 0; i < hostPathElements.length; i++) {
-                String hostPathElement = hostPathElements[i];
-                String targetPathElement = targetPathElements[i];
-                if (!hostPathElement.startsWith("{") && !hostPathElement.endsWith("}")
-                        && !hostPathElement.equals(targetPathElement)) {
-                    return false;
-                }
-            }
+        if (exactPathMatch) {
+            return normalizedHostPath.equals(normalizedTargetPath);
         } else {
-            if (exactPathMatch) {
-                return normalizedHostPath.equals(normalizedTargetPath);
-            } else {
-                return normalizedTargetPath.startsWith(normalizedHostPath);
-            }
+            return normalizedTargetPath.startsWith(normalizedHostPath);
         }
-
-        return true;
     }
 }
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
index 3386bc7d1b5..c473e3325c8 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
@@ -17,11 +17,12 @@
 package org.apache.camel.component.vertx.websocket;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
@@ -52,7 +53,7 @@ public class VertxWebsocketHost {
     private final VertxWebsocketHostConfiguration hostConfiguration;
     private final VertxWebsocketHostKey hostKey;
     private final Map<String, Route> routeRegistry = new HashMap<>();
-    private final Map<String, ServerWebSocket> connectedPeers = new ConcurrentHashMap<>();
+    private final List<VertxWebsocketPeer> connectedPeers = Collections.synchronizedList(new ArrayList<>());
     private final CamelContext camelContext;
     private HttpServer server;
     private int port = VertxWebsocketConstants.DEFAULT_VERTX_SERVER_PORT;
@@ -110,39 +111,40 @@ public class VertxWebsocketHost {
                         SocketAddress socketAddress = webSocket.localAddress();
                         SocketAddress remote = webSocket.remoteAddress();
 
-                        String connectionKey = UUID.randomUUID().toString();
-                        connectedPeers.put(connectionKey, webSocket);
+                        VertxWebsocketPeer peer = new VertxWebsocketPeer(webSocket, websocketURI.getPath());
+                        connectedPeers.add(peer);
 
                         if (LOG.isDebugEnabled()) {
                             if (socketAddress != null) {
-                                LOG.debug("WebSocket peer {} connected from {}", connectionKey, socketAddress.host());
+                                LOG.debug("WebSocket peer {} connected from {}", peer.getConnectionKey(), socketAddress.host());
                             }
                         }
 
                         webSocket.textMessageHandler(
-                                message -> consumer.onMessage(connectionKey, message, remote, routingContext));
+                                message -> consumer.onMessage(peer.getConnectionKey(), message, remote, routingContext));
                         webSocket
                                 .binaryMessageHandler(
-                                        message -> consumer.onMessage(connectionKey, message.getBytes(), remote,
+                                        message -> consumer.onMessage(peer.getConnectionKey(), message.getBytes(), remote,
                                                 routingContext));
                         webSocket.exceptionHandler(
-                                exception -> consumer.onException(connectionKey, exception, remote, routingContext));
+                                exception -> consumer.onException(peer.getConnectionKey(), exception, remote, routingContext));
                         webSocket.closeHandler(closeEvent -> {
                             if (LOG.isDebugEnabled()) {
                                 if (socketAddress != null) {
-                                    LOG.debug("WebSocket peer {} disconnected from {}", connectionKey, socketAddress.host());
+                                    LOG.debug("WebSocket peer {} disconnected from {}", peer.getConnectionKey(),
+                                            socketAddress.host());
                                 }
                             }
 
                             if (configuration.isFireWebSocketConnectionEvents()) {
-                                consumer.onClose(connectionKey, remote, routingContext);
+                                consumer.onClose(peer.getConnectionKey(), remote, routingContext);
                             }
 
-                            connectedPeers.remove(connectionKey);
+                            connectedPeers.remove(peer);
                         });
 
                         if (configuration.isFireWebSocketConnectionEvents()) {
-                            consumer.onOpen(connectionKey, remote, routingContext, webSocket);
+                            consumer.onOpen(peer.getConnectionKey(), remote, routingContext, webSocket);
                         }
                     } else {
                         // the upgrade failed
@@ -238,10 +240,20 @@ public class VertxWebsocketHost {
     /**
      * Gets all WebSocket peers connected to the Vert.x HTTP sever together with their associated connection key
      */
-    public Map<String, ServerWebSocket> getConnectedPeers() {
+    public List<VertxWebsocketPeer> getConnectedPeers() {
         return connectedPeers;
     }
 
+    /**
+     * Gets a connected peer for the given connection key
+     */
+    public VertxWebsocketPeer getConnectedPeer(String connectionKey) {
+        return getConnectedPeers().stream()
+                .filter(peer -> peer.getConnectionKey().equals(connectionKey))
+                .findFirst()
+                .orElse(null);
+    }
+
     /**
      * Gets the port that the server is bound to. This could be a random value if 0 was specified as the initial port
      * number
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java
new file mode 100644
index 00000000000..5c401854ce7
--- /dev/null
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.websocket;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import io.vertx.core.http.ServerWebSocket;
+
+/**
+ * Represents a WebSocket peer connection
+ */
+public class VertxWebsocketPeer {
+    private final ServerWebSocket webSocket;
+    private final String rawPath;
+    private final String path;
+    private final String connectionKey;
+
+    public VertxWebsocketPeer(ServerWebSocket webSocket, String rawPath) {
+        this.webSocket = Objects.requireNonNull(webSocket);
+        this.rawPath = Objects.requireNonNull(rawPath);
+        this.path = webSocket.path();
+        this.connectionKey = UUID.randomUUID().toString();
+    }
+
+    public ServerWebSocket getWebSocket() {
+        return webSocket;
+    }
+
+    public String getRawPath() {
+        return rawPath;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public String getConnectionKey() {
+        return connectionKey;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        VertxWebsocketPeer that = (VertxWebsocketPeer) o;
+        return Objects.equals(connectionKey, that.connectionKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(connectionKey);
+    }
+}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java
index 4af16a6f68a..2d2f1d3d312 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java
@@ -44,13 +44,6 @@ public class VertxWebsocketHelperTest {
         assertFalse(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
     }
 
-    @Test
-    void webSocketHostExactPathWithParamsMatches() {
-        String hostPath = "/foo/{bar}/cheese/{wine}";
-        String targetPath = "/foo/bar/cheese/wine";
-        assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
-    }
-
     @Test
     void webSocketHostExactPathWithParamsNotMatches() {
         String hostPath = "/foo/{bar}/cheese/{wine}";
@@ -72,13 +65,6 @@ public class VertxWebsocketHelperTest {
         assertFalse(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
     }
 
-    @Test
-    void webSocketHostWildcardPathWithParamsMatches() {
-        String hostPath = "/foo/{bar}/cheese/{wine}*";
-        String targetPath = "/foo/bar/cheese/wine/beer/additional/path";
-        assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
-    }
-
     @Test
     void webSocketHostWildcardPathWithParamsNotMatches() {
         String hostPath = "/foo/{bar}/cheese/{wine}*";
@@ -100,13 +86,6 @@ public class VertxWebsocketHelperTest {
         assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
     }
 
-    @Test
-    void webSocketHostWildcardPathWithTrailingSlashStarMatches() {
-        String hostPath = "/foo/{bar}/cheese/{wine}/*";
-        String targetPath = "/foo/bar/cheese/wine/beer/additional/path";
-        assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
-    }
-
     @Test
     void webSocketHostDefaultPathMatches() {
         String hostPath = "/";
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
index fd055af6263..fbae556d594 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
@@ -119,7 +119,147 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport {
     }
 
     @Test
-    public void testSendWithInvalidConnectionKey() throws Exception {
+    void testSendWithConnectionKeyForParameterizedPath() throws Exception {
+        int expectedResultCount = 1;
+        CountDownLatch latch = new CountDownLatch(expectedResultCount);
+        List<String> results = new ArrayList<>();
+
+        for (int i = 0; i < 2; i++) {
+            openWebSocketConnection("localhost", port, "/test/paramA/other/paramB", message -> {
+                synchronized (latch) {
+                    results.add(message);
+                    latch.countDown();
+                }
+            });
+        }
+
+        VertxWebsocketEndpoint endpoint
+                = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB",
+                        VertxWebsocketEndpoint.class);
+        Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort();
+        assertEquals(2, connectedPeers.size());
+
+        String connectionKey = connectedPeers.keySet().iterator().next();
+
+        template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB",
+                "Hello World",
+                VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertEquals(expectedResultCount, results.size());
+        assertTrue(results.contains("Hello World"));
+    }
+
+    @Test
+    void testSendWithConnectionKeyForRawParameterizedPath() throws Exception {
+        int expectedResultCount = 1;
+        CountDownLatch latch = new CountDownLatch(expectedResultCount);
+        List<String> results = new ArrayList<>();
+
+        for (int i = 0; i < 2; i++) {
+            openWebSocketConnection("localhost", port, "/test/paramA/other/paramB", message -> {
+                synchronized (latch) {
+                    results.add(message);
+                    latch.countDown();
+                }
+            });
+        }
+
+        VertxWebsocketEndpoint endpoint
+                = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB",
+                        VertxWebsocketEndpoint.class);
+        Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort();
+        assertEquals(2, connectedPeers.size());
+
+        String connectionKey = connectedPeers.keySet().iterator().next();
+
+        template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/{paramA}/other/{paramB}",
+                "Hello World",
+                VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertEquals(expectedResultCount, results.size());
+        assertTrue(results.contains("Hello World"));
+    }
+
+    @Test
+    void testSendWithConnectionKeyForWildcardPath() throws Exception {
+        int expectedResultCount = 1;
+        CountDownLatch latch = new CountDownLatch(expectedResultCount);
+        List<String> results = new ArrayList<>();
+
+        for (int i = 0; i < 2; i++) {
+            openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> {
+                synchronized (latch) {
+                    results.add(message);
+                    latch.countDown();
+                }
+            });
+        }
+
+        openWebSocketConnection("localhost", port, "/test/wildcarded/otherpath", message -> {
+            synchronized (latch) {
+                results.add(message);
+                latch.countDown();
+            }
+        });
+
+        VertxWebsocketEndpoint endpoint
+                = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/wildcarded/path",
+                        VertxWebsocketEndpoint.class);
+        Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort();
+        assertEquals(2, connectedPeers.size());
+
+        String connectionKey = connectedPeers.keySet().iterator().next();
+
+        template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/wildcarded/path", "Hello World",
+                VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertEquals(expectedResultCount, results.size());
+        assertTrue(results.contains("Hello World"));
+    }
+
+    @Test
+    void testSendWithConnectionKeyForRawPath() throws Exception {
+        int expectedResultCount = 1;
+        CountDownLatch latch = new CountDownLatch(expectedResultCount);
+        List<String> results = new ArrayList<>();
+
+        for (int i = 0; i < 2; i++) {
+            openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> {
+                synchronized (latch) {
+                    results.add(message);
+                    latch.countDown();
+                }
+            });
+        }
+
+        openWebSocketConnection("localhost", port, "/test/wildcarded/otherpath", message -> {
+            synchronized (latch) {
+                results.add(message);
+                latch.countDown();
+            }
+        });
+
+        VertxWebsocketEndpoint endpoint
+                = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/wildcarded/path",
+                        VertxWebsocketEndpoint.class);
+        Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort();
+        assertEquals(2, connectedPeers.size());
+
+        String connectionKey = connectedPeers.keySet().iterator().next();
+
+        template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/wildcarded/*", "Hello World",
+                VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertEquals(expectedResultCount, results.size());
+        assertTrue(results.contains("Hello World"));
+    }
+
+    @Test
+    void testSendWithInvalidConnectionKey() throws Exception {
         MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
         mockEndpoint.expectedBodiesReceived("Hello world");
         mockEndpoint.setResultWaitTime(500);
@@ -249,6 +389,137 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport {
         mockEndpoint.assertIsSatisfied(TimeUnit.SECONDS.toMillis(1));
     }
 
+    @Test
+    void testSendToAllForExactParameterizedPath() throws Exception {
+        int expectedResultCount = 5;
+        CountDownLatch latch = new CountDownLatch(expectedResultCount);
+        List<String> results = new ArrayList<>();
+
+        for (int i = 0; i < expectedResultCount; i++) {
+            openWebSocketConnection("localhost", port, "/test/firstParam/other/secondParam", message -> {
+                synchronized (latch) {
+                    results.add(message + " " + latch.getCount());
+                    latch.countDown();
+                }
+            });
+
+            // Below we produce to an explicit path so this peer should be ignored
+            openWebSocketConnection("localhost", port, "/test/otherFirstParam/other/otherSecondParam", message -> {
+                synchronized (latch) {
+                    results.add(message + " " + latch.getCount());
+                    latch.countDown();
+                }
+            });
+        }
+
+        template.sendBody("vertx-websocket:localhost:" + port + "/test/firstParam/other/secondParam?sendToAll=true",
+                "Hello World");
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertEquals(expectedResultCount, results.size());
+
+        for (int i = 1; i <= expectedResultCount; i++) {
+            assertTrue(results.contains("Hello World " + i));
+        }
+    }
+
+    @Test
+    void testSendToAllForRawParameterizedPath() throws Exception {
+        int expectedResultCount = 10;
+        CountDownLatch latch = new CountDownLatch(expectedResultCount);
+        List<String> results = new ArrayList<>();
+
+        for (int i = 0; i < 5; i++) {
+            openWebSocketConnection("localhost", port, "/test/firstParam/other/secondParam", message -> {
+                synchronized (latch) {
+                    results.add(message + " " + latch.getCount());
+                    latch.countDown();
+                }
+            });
+
+            openWebSocketConnection("localhost", port, "/test/otherFirstParam/other/otherSecondParam", message -> {
+                synchronized (latch) {
+                    results.add(message + " " + latch.getCount());
+                    latch.countDown();
+                }
+            });
+        }
+
+        template.sendBody("vertx-websocket:localhost:" + port + "/test/{paramA}/other/{paramB}?sendToAll=true", "Hello World");
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertEquals(expectedResultCount, results.size());
+
+        for (int i = 1; i <= expectedResultCount; i++) {
+            assertTrue(results.contains("Hello World " + i));
+        }
+    }
+
+    @Test
+    void testSendToAllForWildcardPath() throws Exception {
+        int expectedResultCount = 5;
+        CountDownLatch latch = new CountDownLatch(expectedResultCount);
+        List<String> results = new ArrayList<>();
+
+        for (int i = 0; i < expectedResultCount; i++) {
+            openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> {
+                synchronized (latch) {
+                    results.add(message + " " + latch.getCount());
+                    latch.countDown();
+                }
+            });
+
+            // Below we produce to an explicit path so this peer should be ignored
+            openWebSocketConnection("localhost", port, "/test/wildcarded/other", message -> {
+                synchronized (latch) {
+                    results.add(message + " " + latch.getCount());
+                    latch.countDown();
+                }
+            });
+        }
+
+        template.sendBody("vertx-websocket:localhost:" + port + "/test/wildcarded/path?sendToAll=true", "Hello World");
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertEquals(expectedResultCount, results.size());
+
+        for (int i = 1; i <= expectedResultCount; i++) {
+            assertTrue(results.contains("Hello World " + i));
+        }
+    }
+
+    @Test
+    void testSendToAllForRawWildcardPath() throws Exception {
+        int expectedResultCount = 10;
+        CountDownLatch latch = new CountDownLatch(expectedResultCount);
+        List<String> results = new ArrayList<>();
+
+        for (int i = 0; i < 5; i++) {
+            openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> {
+                synchronized (latch) {
+                    results.add(message + " " + latch.getCount());
+                    latch.countDown();
+                }
+            });
+
+            openWebSocketConnection("localhost", port, "/test/wildcarded/other", message -> {
+                synchronized (latch) {
+                    results.add(message + " " + latch.getCount());
+                    latch.countDown();
+                }
+            });
+        }
+
+        template.sendBody("vertx-websocket:localhost:" + port + "/test/wildcarded/*?sendToAll=true", "Hello World");
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertEquals(expectedResultCount, results.size());
+
+        for (int i = 1; i <= expectedResultCount; i++) {
+            assertTrue(results.contains("Hello World " + i));
+        }
+    }
+
     @Test
     public void testEchoRoute() throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
@@ -422,6 +693,9 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport {
                         .setBody(simple("${body} ${header.firstParam} ${header.secondParam}"))
                         .toF("vertx-websocket:localhost:%d/testA/echo/testB", port);
 
+                fromF("vertx-websocket:localhost:%d/test/{paramA}/other/{paramB}", port)
+                        .setBody(simple("${header.firstParam} ${header.secondParam}"));
+
                 fromF("vertx-websocket:localhost:%d/query/params", port)
                         .setBody(simple("${header.firstParam} ${header.secondParam}"))
                         .to("mock:queryParamResult");
@@ -448,6 +722,9 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport {
                 fromF("vertx-websocket:localhost:%d/wildcard/echo*", port)
                         .setBody().simple("${body} World")
                         .toF("vertx-websocket:localhost:%d/wildcard/echo/foo/bar", port);
+
+                fromF("vertx-websocket:localhost:%d/test/wildcarded*", port)
+                        .setBody().simple("Hello ${body} from the wildcard path");
             }
         };
     }