You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/08/02 09:03:09 UTC
[camel] branch main updated: CAMEL-19692: Improve producer sendToAll handling of parameterized server paths
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 8e539c3056a CAMEL-19692: Improve producer sendToAll handling of parameterized server paths
8e539c3056a is described below
commit 8e539c3056a1cd5105e5bd12efb2f0b3ffdbc16f
Author: James Netherton <ja...@gmail.com>
AuthorDate: Tue Aug 1 10:36:01 2023 +0100
CAMEL-19692: Improve producer sendToAll handling of parameterized server paths
---
.../vertx/websocket/VertxWebsocketEndpoint.java | 22 +-
.../vertx/websocket/VertxWebsocketHelper.java | 20 +-
.../vertx/websocket/VertxWebsocketHost.java | 40 +--
.../vertx/websocket/VertxWebsocketPeer.java | 70 ++++++
.../vertx/websocket/VertxWebsocketHelperTest.java | 21 --
.../vertx/websocket/VertxWebsocketTest.java | 279 ++++++++++++++++++++-
6 files changed, 392 insertions(+), 60 deletions(-)
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
index 3a345624138..e160b4204b4 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketEndpoint.java
@@ -202,10 +202,10 @@ public class VertxWebsocketEndpoint extends DefaultEndpoint {
protected ServerWebSocket findPeerForConnectionKey(String connectionKey) {
Map<VertxWebsocketHostKey, VertxWebsocketHost> registry = getVertxHostRegistry();
for (VertxWebsocketHost host : registry.values()) {
- Map<String, ServerWebSocket> hostPeers = host.getConnectedPeers();
- if (hostPeers.containsKey(connectionKey) && host.isManagedHost(getConfiguration().getWebsocketURI().getHost())
+ VertxWebsocketPeer peer = host.getConnectedPeer(connectionKey);
+ if (peer != null && host.isManagedHost(getConfiguration().getWebsocketURI().getHost())
&& host.isManagedPort(getConfiguration().getWebsocketURI().getPort())) {
- return hostPeers.get(connectionKey);
+ return peer.getWebSocket();
}
}
return null;
@@ -220,9 +220,17 @@ public class VertxWebsocketEndpoint extends DefaultEndpoint {
.stream()
.filter(host -> host.isManagedHost(getConfiguration().getWebsocketURI().getHost()))
.filter(host -> host.isManagedPort(getConfiguration().getWebsocketURI().getPort()))
- .flatMap(host -> host.getConnectedPeers().entrySet().stream())
- .filter(entry -> VertxWebsocketHelper.webSocketHostPathMatches(entry.getValue().path(),
- getConfiguration().getWebsocketURI().getPath()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ .flatMap(host -> host.getConnectedPeers().stream())
+ .filter(connectedPeer -> {
+ String producerPath = getConfiguration().getWebsocketURI().getPath();
+ String peerConnectedPath;
+ if (producerPath.contains("{") || producerPath.contains("*")) {
+ peerConnectedPath = connectedPeer.getRawPath();
+ } else {
+ peerConnectedPath = connectedPeer.getPath();
+ }
+ return VertxWebsocketHelper.webSocketHostPathMatches(peerConnectedPath, producerPath);
+ })
+ .collect(Collectors.toMap(VertxWebsocketPeer::getConnectionKey, VertxWebsocketPeer::getWebSocket));
}
}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java
index 27f61c5b483..3e580902b83 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelper.java
@@ -84,24 +84,10 @@ public final class VertxWebsocketHelper {
return false;
}
- if (normalizedHostPath.contains("{")) {
- // For a parameterized paths verify the non-parameterized elements match
- for (int i = 0; i < hostPathElements.length; i++) {
- String hostPathElement = hostPathElements[i];
- String targetPathElement = targetPathElements[i];
- if (!hostPathElement.startsWith("{") && !hostPathElement.endsWith("}")
- && !hostPathElement.equals(targetPathElement)) {
- return false;
- }
- }
+ if (exactPathMatch) {
+ return normalizedHostPath.equals(normalizedTargetPath);
} else {
- if (exactPathMatch) {
- return normalizedHostPath.equals(normalizedTargetPath);
- } else {
- return normalizedTargetPath.startsWith(normalizedHostPath);
- }
+ return normalizedTargetPath.startsWith(normalizedHostPath);
}
-
- return true;
}
}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
index 3386bc7d1b5..c473e3325c8 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
@@ -17,11 +17,12 @@
package org.apache.camel.component.vertx.websocket;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
@@ -52,7 +53,7 @@ public class VertxWebsocketHost {
private final VertxWebsocketHostConfiguration hostConfiguration;
private final VertxWebsocketHostKey hostKey;
private final Map<String, Route> routeRegistry = new HashMap<>();
- private final Map<String, ServerWebSocket> connectedPeers = new ConcurrentHashMap<>();
+ private final List<VertxWebsocketPeer> connectedPeers = Collections.synchronizedList(new ArrayList<>());
private final CamelContext camelContext;
private HttpServer server;
private int port = VertxWebsocketConstants.DEFAULT_VERTX_SERVER_PORT;
@@ -110,39 +111,40 @@ public class VertxWebsocketHost {
SocketAddress socketAddress = webSocket.localAddress();
SocketAddress remote = webSocket.remoteAddress();
- String connectionKey = UUID.randomUUID().toString();
- connectedPeers.put(connectionKey, webSocket);
+ VertxWebsocketPeer peer = new VertxWebsocketPeer(webSocket, websocketURI.getPath());
+ connectedPeers.add(peer);
if (LOG.isDebugEnabled()) {
if (socketAddress != null) {
- LOG.debug("WebSocket peer {} connected from {}", connectionKey, socketAddress.host());
+ LOG.debug("WebSocket peer {} connected from {}", peer.getConnectionKey(), socketAddress.host());
}
}
webSocket.textMessageHandler(
- message -> consumer.onMessage(connectionKey, message, remote, routingContext));
+ message -> consumer.onMessage(peer.getConnectionKey(), message, remote, routingContext));
webSocket
.binaryMessageHandler(
- message -> consumer.onMessage(connectionKey, message.getBytes(), remote,
+ message -> consumer.onMessage(peer.getConnectionKey(), message.getBytes(), remote,
routingContext));
webSocket.exceptionHandler(
- exception -> consumer.onException(connectionKey, exception, remote, routingContext));
+ exception -> consumer.onException(peer.getConnectionKey(), exception, remote, routingContext));
webSocket.closeHandler(closeEvent -> {
if (LOG.isDebugEnabled()) {
if (socketAddress != null) {
- LOG.debug("WebSocket peer {} disconnected from {}", connectionKey, socketAddress.host());
+ LOG.debug("WebSocket peer {} disconnected from {}", peer.getConnectionKey(),
+ socketAddress.host());
}
}
if (configuration.isFireWebSocketConnectionEvents()) {
- consumer.onClose(connectionKey, remote, routingContext);
+ consumer.onClose(peer.getConnectionKey(), remote, routingContext);
}
- connectedPeers.remove(connectionKey);
+ connectedPeers.remove(peer);
});
if (configuration.isFireWebSocketConnectionEvents()) {
- consumer.onOpen(connectionKey, remote, routingContext, webSocket);
+ consumer.onOpen(peer.getConnectionKey(), remote, routingContext, webSocket);
}
} else {
// the upgrade failed
@@ -238,10 +240,20 @@ public class VertxWebsocketHost {
/**
* Gets all WebSocket peers connected to the Vert.x HTTP sever together with their associated connection key
*/
- public Map<String, ServerWebSocket> getConnectedPeers() {
+ public List<VertxWebsocketPeer> getConnectedPeers() {
return connectedPeers;
}
+ /**
+ * Gets a connected peer for the given connection key
+ */
+ public VertxWebsocketPeer getConnectedPeer(String connectionKey) {
+ return getConnectedPeers().stream()
+ .filter(peer -> peer.getConnectionKey().equals(connectionKey))
+ .findFirst()
+ .orElse(null);
+ }
+
/**
* Gets the port that the server is bound to. This could be a random value if 0 was specified as the initial port
* number
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java
new file mode 100644
index 00000000000..5c401854ce7
--- /dev/null
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketPeer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.websocket;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import io.vertx.core.http.ServerWebSocket;
+
+/**
+ * Represents a WebSocket peer connection
+ */
+public class VertxWebsocketPeer {
+ private final ServerWebSocket webSocket;
+ private final String rawPath;
+ private final String path;
+ private final String connectionKey;
+
+ public VertxWebsocketPeer(ServerWebSocket webSocket, String rawPath) {
+ this.webSocket = Objects.requireNonNull(webSocket);
+ this.rawPath = Objects.requireNonNull(rawPath);
+ this.path = webSocket.path();
+ this.connectionKey = UUID.randomUUID().toString();
+ }
+
+ public ServerWebSocket getWebSocket() {
+ return webSocket;
+ }
+
+ public String getRawPath() {
+ return rawPath;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getConnectionKey() {
+ return connectionKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ VertxWebsocketPeer that = (VertxWebsocketPeer) o;
+ return Objects.equals(connectionKey, that.connectionKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(connectionKey);
+ }
+}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java
index 4af16a6f68a..2d2f1d3d312 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHelperTest.java
@@ -44,13 +44,6 @@ public class VertxWebsocketHelperTest {
assertFalse(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
}
- @Test
- void webSocketHostExactPathWithParamsMatches() {
- String hostPath = "/foo/{bar}/cheese/{wine}";
- String targetPath = "/foo/bar/cheese/wine";
- assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
- }
-
@Test
void webSocketHostExactPathWithParamsNotMatches() {
String hostPath = "/foo/{bar}/cheese/{wine}";
@@ -72,13 +65,6 @@ public class VertxWebsocketHelperTest {
assertFalse(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
}
- @Test
- void webSocketHostWildcardPathWithParamsMatches() {
- String hostPath = "/foo/{bar}/cheese/{wine}*";
- String targetPath = "/foo/bar/cheese/wine/beer/additional/path";
- assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
- }
-
@Test
void webSocketHostWildcardPathWithParamsNotMatches() {
String hostPath = "/foo/{bar}/cheese/{wine}*";
@@ -100,13 +86,6 @@ public class VertxWebsocketHelperTest {
assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
}
- @Test
- void webSocketHostWildcardPathWithTrailingSlashStarMatches() {
- String hostPath = "/foo/{bar}/cheese/{wine}/*";
- String targetPath = "/foo/bar/cheese/wine/beer/additional/path";
- assertTrue(VertxWebsocketHelper.webSocketHostPathMatches(hostPath, targetPath));
- }
-
@Test
void webSocketHostDefaultPathMatches() {
String hostPath = "/";
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
index fd055af6263..fbae556d594 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketTest.java
@@ -119,7 +119,147 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport {
}
@Test
- public void testSendWithInvalidConnectionKey() throws Exception {
+ void testSendWithConnectionKeyForParameterizedPath() throws Exception {
+ int expectedResultCount = 1;
+ CountDownLatch latch = new CountDownLatch(expectedResultCount);
+ List<String> results = new ArrayList<>();
+
+ for (int i = 0; i < 2; i++) {
+ openWebSocketConnection("localhost", port, "/test/paramA/other/paramB", message -> {
+ synchronized (latch) {
+ results.add(message);
+ latch.countDown();
+ }
+ });
+ }
+
+ VertxWebsocketEndpoint endpoint
+ = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB",
+ VertxWebsocketEndpoint.class);
+ Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort();
+ assertEquals(2, connectedPeers.size());
+
+ String connectionKey = connectedPeers.keySet().iterator().next();
+
+ template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB",
+ "Hello World",
+ VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(expectedResultCount, results.size());
+ assertTrue(results.contains("Hello World"));
+ }
+
+ @Test
+ void testSendWithConnectionKeyForRawParameterizedPath() throws Exception {
+ int expectedResultCount = 1;
+ CountDownLatch latch = new CountDownLatch(expectedResultCount);
+ List<String> results = new ArrayList<>();
+
+ for (int i = 0; i < 2; i++) {
+ openWebSocketConnection("localhost", port, "/test/paramA/other/paramB", message -> {
+ synchronized (latch) {
+ results.add(message);
+ latch.countDown();
+ }
+ });
+ }
+
+ VertxWebsocketEndpoint endpoint
+ = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/paramA/other/paramB",
+ VertxWebsocketEndpoint.class);
+ Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort();
+ assertEquals(2, connectedPeers.size());
+
+ String connectionKey = connectedPeers.keySet().iterator().next();
+
+ template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/{paramA}/other/{paramB}",
+ "Hello World",
+ VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(expectedResultCount, results.size());
+ assertTrue(results.contains("Hello World"));
+ }
+
+ @Test
+ void testSendWithConnectionKeyForWildcardPath() throws Exception {
+ int expectedResultCount = 1;
+ CountDownLatch latch = new CountDownLatch(expectedResultCount);
+ List<String> results = new ArrayList<>();
+
+ for (int i = 0; i < 2; i++) {
+ openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> {
+ synchronized (latch) {
+ results.add(message);
+ latch.countDown();
+ }
+ });
+ }
+
+ openWebSocketConnection("localhost", port, "/test/wildcarded/otherpath", message -> {
+ synchronized (latch) {
+ results.add(message);
+ latch.countDown();
+ }
+ });
+
+ VertxWebsocketEndpoint endpoint
+ = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/wildcarded/path",
+ VertxWebsocketEndpoint.class);
+ Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort();
+ assertEquals(2, connectedPeers.size());
+
+ String connectionKey = connectedPeers.keySet().iterator().next();
+
+ template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/wildcarded/path", "Hello World",
+ VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(expectedResultCount, results.size());
+ assertTrue(results.contains("Hello World"));
+ }
+
+ @Test
+ void testSendWithConnectionKeyForRawPath() throws Exception {
+ int expectedResultCount = 1;
+ CountDownLatch latch = new CountDownLatch(expectedResultCount);
+ List<String> results = new ArrayList<>();
+
+ for (int i = 0; i < 2; i++) {
+ openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> {
+ synchronized (latch) {
+ results.add(message);
+ latch.countDown();
+ }
+ });
+ }
+
+ openWebSocketConnection("localhost", port, "/test/wildcarded/otherpath", message -> {
+ synchronized (latch) {
+ results.add(message);
+ latch.countDown();
+ }
+ });
+
+ VertxWebsocketEndpoint endpoint
+ = context.getEndpoint("vertx-websocket:localhost:" + port + "/test/wildcarded/path",
+ VertxWebsocketEndpoint.class);
+ Map<String, ServerWebSocket> connectedPeers = endpoint.findPeersForHostPort();
+ assertEquals(2, connectedPeers.size());
+
+ String connectionKey = connectedPeers.keySet().iterator().next();
+
+ template.sendBodyAndHeader("vertx-websocket:localhost:" + port + "/test/wildcarded/*", "Hello World",
+ VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(expectedResultCount, results.size());
+ assertTrue(results.contains("Hello World"));
+ }
+
+ @Test
+ void testSendWithInvalidConnectionKey() throws Exception {
MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
mockEndpoint.expectedBodiesReceived("Hello world");
mockEndpoint.setResultWaitTime(500);
@@ -249,6 +389,137 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport {
mockEndpoint.assertIsSatisfied(TimeUnit.SECONDS.toMillis(1));
}
+ @Test
+ void testSendToAllForExactParameterizedPath() throws Exception {
+ int expectedResultCount = 5;
+ CountDownLatch latch = new CountDownLatch(expectedResultCount);
+ List<String> results = new ArrayList<>();
+
+ for (int i = 0; i < expectedResultCount; i++) {
+ openWebSocketConnection("localhost", port, "/test/firstParam/other/secondParam", message -> {
+ synchronized (latch) {
+ results.add(message + " " + latch.getCount());
+ latch.countDown();
+ }
+ });
+
+ // Below we produce to an explicit path so this peer should be ignored
+ openWebSocketConnection("localhost", port, "/test/otherFirstParam/other/otherSecondParam", message -> {
+ synchronized (latch) {
+ results.add(message + " " + latch.getCount());
+ latch.countDown();
+ }
+ });
+ }
+
+ template.sendBody("vertx-websocket:localhost:" + port + "/test/firstParam/other/secondParam?sendToAll=true",
+ "Hello World");
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(expectedResultCount, results.size());
+
+ for (int i = 1; i <= expectedResultCount; i++) {
+ assertTrue(results.contains("Hello World " + i));
+ }
+ }
+
+ @Test
+ void testSendToAllForRawParameterizedPath() throws Exception {
+ int expectedResultCount = 10;
+ CountDownLatch latch = new CountDownLatch(expectedResultCount);
+ List<String> results = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ openWebSocketConnection("localhost", port, "/test/firstParam/other/secondParam", message -> {
+ synchronized (latch) {
+ results.add(message + " " + latch.getCount());
+ latch.countDown();
+ }
+ });
+
+ openWebSocketConnection("localhost", port, "/test/otherFirstParam/other/otherSecondParam", message -> {
+ synchronized (latch) {
+ results.add(message + " " + latch.getCount());
+ latch.countDown();
+ }
+ });
+ }
+
+ template.sendBody("vertx-websocket:localhost:" + port + "/test/{paramA}/other/{paramB}?sendToAll=true", "Hello World");
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(expectedResultCount, results.size());
+
+ for (int i = 1; i <= expectedResultCount; i++) {
+ assertTrue(results.contains("Hello World " + i));
+ }
+ }
+
+ @Test
+ void testSendToAllForWildcardPath() throws Exception {
+ int expectedResultCount = 5;
+ CountDownLatch latch = new CountDownLatch(expectedResultCount);
+ List<String> results = new ArrayList<>();
+
+ for (int i = 0; i < expectedResultCount; i++) {
+ openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> {
+ synchronized (latch) {
+ results.add(message + " " + latch.getCount());
+ latch.countDown();
+ }
+ });
+
+ // Below we produce to an explicit path so this peer should be ignored
+ openWebSocketConnection("localhost", port, "/test/wildcarded/other", message -> {
+ synchronized (latch) {
+ results.add(message + " " + latch.getCount());
+ latch.countDown();
+ }
+ });
+ }
+
+ template.sendBody("vertx-websocket:localhost:" + port + "/test/wildcarded/path?sendToAll=true", "Hello World");
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(expectedResultCount, results.size());
+
+ for (int i = 1; i <= expectedResultCount; i++) {
+ assertTrue(results.contains("Hello World " + i));
+ }
+ }
+
+ @Test
+ void testSendToAllForRawWildcardPath() throws Exception {
+ int expectedResultCount = 10;
+ CountDownLatch latch = new CountDownLatch(expectedResultCount);
+ List<String> results = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ openWebSocketConnection("localhost", port, "/test/wildcarded/path", message -> {
+ synchronized (latch) {
+ results.add(message + " " + latch.getCount());
+ latch.countDown();
+ }
+ });
+
+ openWebSocketConnection("localhost", port, "/test/wildcarded/other", message -> {
+ synchronized (latch) {
+ results.add(message + " " + latch.getCount());
+ latch.countDown();
+ }
+ });
+ }
+
+ template.sendBody("vertx-websocket:localhost:" + port + "/test/wildcarded/*?sendToAll=true", "Hello World");
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(expectedResultCount, results.size());
+
+ for (int i = 1; i <= expectedResultCount; i++) {
+ assertTrue(results.contains("Hello World " + i));
+ }
+ }
+
@Test
public void testEchoRoute() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
@@ -422,6 +693,9 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport {
.setBody(simple("${body} ${header.firstParam} ${header.secondParam}"))
.toF("vertx-websocket:localhost:%d/testA/echo/testB", port);
+ fromF("vertx-websocket:localhost:%d/test/{paramA}/other/{paramB}", port)
+ .setBody(simple("${header.firstParam} ${header.secondParam}"));
+
fromF("vertx-websocket:localhost:%d/query/params", port)
.setBody(simple("${header.firstParam} ${header.secondParam}"))
.to("mock:queryParamResult");
@@ -448,6 +722,9 @@ public class VertxWebsocketTest extends VertxWebSocketTestSupport {
fromF("vertx-websocket:localhost:%d/wildcard/echo*", port)
.setBody().simple("${body} World")
.toF("vertx-websocket:localhost:%d/wildcard/echo/foo/bar", port);
+
+ fromF("vertx-websocket:localhost:%d/test/wildcarded*", port)
+ .setBody().simple("Hello ${body} from the wildcard path");
}
};
}