You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/03/02 21:09:16 UTC

[camel] branch main updated (33b2c6c8f52 -> 0e1faf0d33f)

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 33b2c6c8f52 Upgrade to spring 6.0.6
     new 1c5e254ab31 CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers
     new 0e1faf0d33f camel-vertx-websocket: Fix flaky tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../websocket/VertxWebsocketClientConsumer.java    | 35 +++++++++--
 .../vertx/websocket/VertxWebsocketConsumer.java    | 45 +++++++++-----
 .../vertx/websocket/VertxWebSocketEventTest.java   |  9 ++-
 .../VertxWebSocketSlowClientConsumerTest.java      | 68 ++++++++++++++++++++++
 ...st.java => VertxWebSocketSlowConsumerTest.java} | 47 +++++++++------
 .../vertx/websocket/VertxWebSocketTestSupport.java | 35 +++++++++++
 ...WebsocketConsumerAsClientMaxReconnectTest.java} | 49 ++--------------
 ...ertxWebsocketConsumerAsClientReconnectTest.java | 42 -------------
 8 files changed, 204 insertions(+), 126 deletions(-)
 create mode 100644 components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java
 copy components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/{VertxWebsocketMultiConsumerTest.java => VertxWebSocketSlowConsumerTest.java} (51%)
 copy components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/{VertxWebsocketConsumerAsClientReconnectTest.java => VertxWebsocketConsumerAsClientMaxReconnectTest.java} (66%)


[camel] 02/02: camel-vertx-websocket: Fix flaky tests

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0e1faf0d33f70af5983d1e4b498df8587f653005
Author: James Netherton <ja...@gmail.com>
AuthorDate: Thu Mar 2 14:19:32 2023 +0000

    camel-vertx-websocket: Fix flaky tests
---
 .../vertx/websocket/VertxWebSocketEventTest.java   |  9 ++--
 ...WebsocketConsumerAsClientMaxReconnectTest.java} | 49 ++--------------------
 ...ertxWebsocketConsumerAsClientReconnectTest.java | 42 -------------------
 3 files changed, 10 insertions(+), 90 deletions(-)

diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketEventTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketEventTest.java
index ec581d33730..e980c651b6d 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketEventTest.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketEventTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.vertx.websocket;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -34,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class VertxWebSocketEventTest extends VertxWebSocketTestSupport {
 
     private static final String MESSAGE_BODY = "Hello World";
-    private ServerWebSocket webSocket;
+    private final CompletableFuture<ServerWebSocket> webSocketFuture = new CompletableFuture<>();
 
     @BindToRegistry("serverOptions")
     public HttpServerOptions serverOptions() {
@@ -46,10 +47,12 @@ public class VertxWebSocketEventTest extends VertxWebSocketTestSupport {
     @Test
     void webSocketEvents() throws Exception {
         MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
-        mockEndpoint.expectedBodiesReceived("WebSocket Open", "WebSocket Message", "WebSocket Error", "WebSocket Close");
+        mockEndpoint.expectedBodiesReceivedInAnyOrder("WebSocket Open", "WebSocket Message", "WebSocket Error",
+                "WebSocket Close");
 
         template.sendBody("vertx-websocket:localhost:" + port + "/test", MESSAGE_BODY);
 
+        ServerWebSocket webSocket = webSocketFuture.get(5000, TimeUnit.SECONDS);
         assertNotNull(webSocket);
 
         // Trigger error event (message length > max allowed)
@@ -82,7 +85,7 @@ public class VertxWebSocketEventTest extends VertxWebSocketTestSupport {
                         .when(simple("${header.CamelVertxWebsocket.event} == 'OPEN'"))
                         .process(exchange -> {
                             Message message = exchange.getMessage();
-                            webSocket = message.getBody(ServerWebSocket.class);
+                            webSocketFuture.complete(message.getBody(ServerWebSocket.class));
                         })
                         .setBody().constant("WebSocket Open")
                         .to("mock:result")
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientReconnectTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientMaxReconnectTest.java
similarity index 66%
copy from components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientReconnectTest.java
copy to components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientMaxReconnectTest.java
index 1805dae64f3..789dacbdf7c 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientReconnectTest.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientMaxReconnectTest.java
@@ -26,47 +26,10 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class VertxWebsocketConsumerAsClientReconnectTest extends VertxWebSocketTestSupport {
-    @Test
-    void testReconnect() throws Exception {
-        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
-        mockEndpoint.expectedBodiesReceived("Hello World");
-
-        String uri = String.format("vertx-websocket:localhost:%d/echo", port);
-        template.sendBody(uri, "Hello World");
-        mockEndpoint.assertIsSatisfied();
-
-        // Stop server
-        mockEndpoint.reset();
-        mockEndpoint.expectedBodiesReceived("Hello World Again");
-
-        context.getRouteController().stopRoute("server");
-
-        // Verify that we cannot send messages
-        Exchange exchange = template.send(uri, new Processor() {
-            @Override
-            public void process(Exchange exchange) throws Exception {
-                exchange.getMessage().setBody("Hello World Again");
-            }
-        });
-        Exception exception = exchange.getException();
-        Assertions.assertNotNull(exception);
-        Assertions.assertInstanceOf(ConnectException.class, exception.getCause());
-
-        // Restart server
-        context.getRouteController().startRoute("server");
-
-        // Wait for client consumer reconnect
-        Thread.sleep(300);
-
-        // Verify that the client consumer reconnected
-        template.sendBody(uri, "Hello World Again");
-        mockEndpoint.assertIsSatisfied();
-    }
-
+public class VertxWebsocketConsumerAsClientMaxReconnectTest extends VertxWebSocketTestSupport {
     @Test
     void testMaxReconnect() throws Exception {
-        MockEndpoint mockEndpoint = getMockEndpoint("mock:result2");
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
         mockEndpoint.expectedBodiesReceived("Hello World");
 
         String uri = String.format("vertx-websocket:localhost:%d/echo", port);
@@ -110,14 +73,10 @@ public class VertxWebsocketConsumerAsClientReconnectTest extends VertxWebSocketT
                         .log("Server consumer: Received message: ${body}")
                         .toF("vertx-websocket:localhost:%d/echo?sendToAll=true", port);
 
-                fromF("vertx-websocket:localhost:%d/echo?consumeAsClient=true&reconnectInterval=10", port)
-                        .log("Client consumer 1: Received message: ${body}")
-                        .to("mock:result");
-
                 fromF("vertx-websocket:localhost:%d/echo?consumeAsClient=true&reconnectInterval=10&maxReconnectAttempts=1",
                         port)
-                        .log("Client consumer 2: Received message: ${body}")
-                        .to("mock:result2");
+                        .log("Client consumer 1: Received message: ${body}")
+                        .to("mock:result");
             }
         };
     }
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientReconnectTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientReconnectTest.java
index 1805dae64f3..e452f85e1a9 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientReconnectTest.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumerAsClientReconnectTest.java
@@ -64,43 +64,6 @@ public class VertxWebsocketConsumerAsClientReconnectTest extends VertxWebSocketT
         mockEndpoint.assertIsSatisfied();
     }
 
-    @Test
-    void testMaxReconnect() throws Exception {
-        MockEndpoint mockEndpoint = getMockEndpoint("mock:result2");
-        mockEndpoint.expectedBodiesReceived("Hello World");
-
-        String uri = String.format("vertx-websocket:localhost:%d/echo", port);
-        template.sendBody(uri, "Hello World");
-        mockEndpoint.assertIsSatisfied();
-
-        // Stop server
-        mockEndpoint.reset();
-        mockEndpoint.expectedMessageCount(0);
-
-        context.getRouteController().stopRoute("server");
-
-        // Verify that we cannot send messages
-        Exchange exchange = template.send(uri, new Processor() {
-            @Override
-            public void process(Exchange exchange) throws Exception {
-                exchange.getMessage().setBody("Hello World Again");
-            }
-        });
-        Exception exception = exchange.getException();
-        Assertions.assertNotNull(exception);
-        Assertions.assertInstanceOf(ConnectException.class, exception.getCause());
-
-        // Wait for client consumer reconnect max attempts to be exhausted
-        Thread.sleep(300);
-
-        // Restart server
-        context.getRouteController().startRoute("server");
-
-        // Verify that the client consumer gave up reconnecting
-        template.sendBody(uri, "Hello World Again");
-        mockEndpoint.assertIsSatisfied();
-    }
-
     @Override
     protected RoutesBuilder createRouteBuilder() {
         return new RouteBuilder() {
@@ -113,11 +76,6 @@ public class VertxWebsocketConsumerAsClientReconnectTest extends VertxWebSocketT
                 fromF("vertx-websocket:localhost:%d/echo?consumeAsClient=true&reconnectInterval=10", port)
                         .log("Client consumer 1: Received message: ${body}")
                         .to("mock:result");
-
-                fromF("vertx-websocket:localhost:%d/echo?consumeAsClient=true&reconnectInterval=10&maxReconnectAttempts=1",
-                        port)
-                        .log("Client consumer 2: Received message: ${body}")
-                        .to("mock:result2");
             }
         };
     }


[camel] 01/02: CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1c5e254ab311e12576c36d5a0cd991d9ab7fb33a
Author: James Netherton <ja...@gmail.com>
AuthorDate: Thu Mar 2 11:38:37 2023 +0000

    CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers
---
 .../websocket/VertxWebsocketClientConsumer.java    | 35 +++++++++--
 .../vertx/websocket/VertxWebsocketConsumer.java    | 45 +++++++++-----
 .../VertxWebSocketSlowClientConsumerTest.java      | 68 ++++++++++++++++++++++
 .../websocket/VertxWebSocketSlowConsumerTest.java  | 68 ++++++++++++++++++++++
 .../vertx/websocket/VertxWebSocketTestSupport.java | 35 +++++++++++
 5 files changed, 234 insertions(+), 17 deletions(-)

diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
index d00e5bd67c2..42aece3170d 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
@@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.WebSocket;
 import io.vertx.core.net.impl.ConnectionBase;
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -98,10 +97,38 @@ public class VertxWebsocketClientConsumer extends DefaultConsumer {
     }
 
     protected void handleResult(Object result) {
-        Exchange exchange = createExchange(true);
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        Exchange exchange = createExchange(false);
         Message message = exchange.getMessage();
         message.setBody(result);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange);
+    }
+
+    protected void processExchange(Exchange exchange) {
+        Vertx vertx = getEndpoint().getVertx();
+        vertx.executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        promise.complete();
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.failed()) {
+                            Throwable cause = result.cause();
+                            getExceptionHandler().handleException(cause);
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                        releaseExchange(exchange, false);
+                    }
+                });
     }
 }
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
index 39cd8de9db0..d505029d671 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
@@ -22,7 +22,6 @@ import io.vertx.core.http.ServerWebSocket;
 import io.vertx.core.net.SocketAddress;
 import io.vertx.core.net.impl.ConnectionBase;
 import io.vertx.ext.web.RoutingContext;
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
@@ -65,10 +64,7 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
         Exchange exchange = createExchange(true);
         exchange.getMessage().setBody(message);
         populateExchangeHeaders(exchange, connectionKey, remote, routingContext, VertxWebsocketEvent.MESSAGE);
-
-        // use default consumer callback
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange, routingContext);
     }
 
     public void onException(String connectionKey, Throwable cause, SocketAddress remote, RoutingContext routingContext) {
@@ -88,19 +84,13 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
         Exchange exchange = createExchange(true);
         populateExchangeHeaders(exchange, connectionKey, remote, routingContext, VertxWebsocketEvent.OPEN);
         exchange.getMessage().setBody(webSocket);
-
-        // use default consumer callback
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange, routingContext);
     }
 
     public void onClose(String connectionKey, SocketAddress remote, RoutingContext routingContext) {
         Exchange exchange = createExchange(true);
         populateExchangeHeaders(exchange, connectionKey, remote, routingContext, VertxWebsocketEvent.CLOSE);
-
-        // use default consumer callback
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange, routingContext);
     }
 
     protected void populateExchangeHeaders(
@@ -116,4 +106,33 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
         routingContext.pathParams()
                 .forEach((name, value) -> VertxWebsocketHelper.appendHeader(headers, name, value));
     }
+
+    protected void processExchange(Exchange exchange, RoutingContext routingContext) {
+        routingContext.vertx().executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        promise.complete();
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.failed()) {
+                            Throwable cause = result.cause();
+                            getExceptionHandler().handleException(cause);
+                            routingContext.fail(cause);
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                        releaseExchange(exchange, false);
+                    }
+                });
+    }
 }
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java
new file mode 100644
index 00000000000..fa6997a6ddf
--- /dev/null
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowClientConsumerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.websocket;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class VertxWebSocketSlowClientConsumerTest extends VertxWebSocketTestSupport {
+    private static final String MESSAGE_BODY = "Hello World";
+    private final BlockedThreadReporter reporter = new BlockedThreadReporter();
+
+    @AfterEach
+    public void afterEach() {
+        reporter.reset();
+    }
+
+    @BindToRegistry
+    public Vertx createVertx() {
+        return createVertxWithThreadBlockedHandler(reporter);
+    }
+
+    @Test
+    void slowClientConsumerDoesNotBlockEventLoop() throws Exception {
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:clientConsumerResult");
+        mockEndpoint.expectedBodiesReceived(MESSAGE_BODY);
+
+        template.sendBody("vertx-websocket:localhost:" + port + "/echo/slow", MESSAGE_BODY);
+
+        mockEndpoint.assertIsSatisfied();
+        assertFalse(reporter.isEventLoopBlocked(), "Expected Vert.x event loop to not be blocked");
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                fromF("vertx-websocket:localhost:%d/echo/slow", port)
+                        .toF("vertx-websocket:localhost:%d/echo/slow?sendToAll=true", port);
+
+                fromF("vertx-websocket:localhost:%d/echo/slow?consumeAsClient=true", port)
+                        .delay(600).syncDelayed()
+                        .to("mock:clientConsumerResult");
+            }
+        };
+    }
+}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
new file mode 100644
index 00000000000..de982c6d61b
--- /dev/null
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.websocket;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class VertxWebSocketSlowConsumerTest extends VertxWebSocketTestSupport {
+    private static final String MESSAGE_BODY = "Hello World";
+    private final BlockedThreadReporter reporter = new BlockedThreadReporter();
+
+    @AfterEach
+    public void afterEach() {
+        reporter.reset();
+    }
+
+    @BindToRegistry
+    public Vertx createVertx() {
+        return createVertxWithThreadBlockedHandler(reporter);
+    }
+
+    @Test
+    void slowConsumerDoesNotBlockEventLoop() throws Exception {
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+        mockEndpoint.expectedBodiesReceived(MESSAGE_BODY);
+
+        template.requestBody("direct:start", MESSAGE_BODY);
+
+        mockEndpoint.assertIsSatisfied();
+        assertFalse(reporter.isEventLoopBlocked(), "Expected Vert.x event loop to not be blocked");
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .toF("vertx-websocket:localhost:%d/slow", port);
+
+                fromF("vertx-websocket:localhost:%d/slow", port)
+                        .delay(600).syncDelayed()
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
index ac5ff4e0882..f695197d6e5 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
@@ -24,8 +24,12 @@ import java.util.function.Consumer;
 
 import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
+import io.vertx.core.VertxException;
+import io.vertx.core.VertxOptions;
 import io.vertx.core.http.HttpClient;
 import io.vertx.core.http.WebSocket;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.btc.BlockedThreadEvent;
 import io.vertx.ext.web.Route;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.RoutingContext;
@@ -79,4 +83,35 @@ public class VertxWebSocketTestSupport extends CamelTestSupport {
         route.handler(handler);
         return router;
     }
+
+    public Vertx createVertxWithThreadBlockedHandler(Handler<BlockedThreadEvent> handler) {
+        VertxOptions vertxOptions = new VertxOptions();
+        vertxOptions.setMaxEventLoopExecuteTime(500);
+        vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);
+        vertxOptions.setBlockedThreadCheckInterval(10);
+        vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.MILLISECONDS);
+        Vertx vertx = Vertx.vertx(vertxOptions);
+        ((VertxInternal) vertx).blockedThreadChecker().setThreadBlockedHandler(handler);
+        return vertx;
+    }
+
+    static class BlockedThreadReporter implements Handler<BlockedThreadEvent> {
+        private boolean eventLoopBlocked;
+
+        @Override
+        public void handle(BlockedThreadEvent event) {
+            VertxException stackTrace = new VertxException("Thread blocked");
+            stackTrace.setStackTrace(event.thread().getStackTrace());
+            stackTrace.printStackTrace();
+            eventLoopBlocked = true;
+        }
+
+        public boolean isEventLoopBlocked() {
+            return eventLoopBlocked;
+        }
+
+        public void reset() {
+            eventLoopBlocked = false;
+        }
+    }
 }