You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/03/03 09:26:54 UTC

[camel] branch camel-3.20.x updated: CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch camel-3.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.20.x by this push:
     new f7cc701c8ad CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers
f7cc701c8ad is described below

commit f7cc701c8ad219d86d17d5baed9e1e3e84ecf3e4
Author: James Netherton <ja...@gmail.com>
AuthorDate: Thu Mar 2 11:38:37 2023 +0000

    CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers
---
 .../vertx/websocket/VertxWebsocketConsumer.java    | 37 ++++++++++--
 .../vertx/websocket/VertxWebsocketHost.java        |  6 +-
 .../websocket/VertxWebSocketSlowConsumerTest.java  | 68 ++++++++++++++++++++++
 .../vertx/websocket/VertxWebSocketTestSupport.java | 35 +++++++++++
 4 files changed, 139 insertions(+), 7 deletions(-)

diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
index cf1991dfdf0..435bf658286 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.vertx.websocket;
 
 import io.vertx.core.net.SocketAddress;
 import io.vertx.core.net.impl.ConnectionBase;
-import org.apache.camel.AsyncCallback;
+import io.vertx.ext.web.RoutingContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
@@ -60,15 +60,13 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
         return endpoint.getComponent();
     }
 
-    public void onMessage(String connectionKey, Object message, SocketAddress remote) {
+    public void onMessage(String connectionKey, Object message, SocketAddress remote, RoutingContext routingContext) {
         Exchange exchange = createExchange(true);
         exchange.getMessage().setHeader(VertxWebsocketConstants.REMOTE_ADDRESS, remote);
         exchange.getMessage().setHeader(VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
         exchange.getMessage().setBody(message);
 
-        // use default consumer callback
-        AsyncCallback cb = defaultConsumerCallback(exchange, true);
-        getAsyncProcessor().process(exchange, cb);
+        processExchange(exchange, routingContext);
     }
 
     public void onException(String connectionKey, Throwable cause, SocketAddress remote) {
@@ -83,4 +81,33 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
         getExceptionHandler().handleException("Error processing exchange", exchange, cause);
         releaseExchange(exchange, false);
     }
+
+    protected void processExchange(Exchange exchange, RoutingContext routingContext) {
+        routingContext.vertx().executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        promise.complete();
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.failed()) {
+                            Throwable cause = result.cause();
+                            getExceptionHandler().handleException(cause);
+                            routingContext.fail(cause);
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                        releaseExchange(exchange, false);
+                    }
+                });
+    }
 }
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
index 16dc26339b6..04d02da222d 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
@@ -112,9 +112,11 @@ public class VertxWebsocketHost {
                             }
                         }
 
-                        webSocket.textMessageHandler(message -> consumer.onMessage(connectionKey, message, remote));
+                        webSocket.textMessageHandler(
+                                message -> consumer.onMessage(connectionKey, message, remote, routingContext));
                         webSocket
-                                .binaryMessageHandler(message -> consumer.onMessage(connectionKey, message.getBytes(), remote));
+                                .binaryMessageHandler(message -> consumer.onMessage(connectionKey, message.getBytes(), remote,
+                                        routingContext));
                         webSocket.exceptionHandler(exception -> consumer.onException(connectionKey, exception, remote));
                         webSocket.closeHandler(closeEvent -> {
                             if (LOG.isDebugEnabled()) {
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
new file mode 100644
index 00000000000..de982c6d61b
--- /dev/null
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.websocket;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class VertxWebSocketSlowConsumerTest extends VertxWebSocketTestSupport {
+    private static final String MESSAGE_BODY = "Hello World";
+    private final BlockedThreadReporter reporter = new BlockedThreadReporter();
+
+    @AfterEach
+    public void afterEach() {
+        reporter.reset();
+    }
+
+    @BindToRegistry
+    public Vertx createVertx() {
+        return createVertxWithThreadBlockedHandler(reporter);
+    }
+
+    @Test
+    void slowConsumerDoesNotBlockEventLoop() throws Exception {
+        MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+        mockEndpoint.expectedBodiesReceived(MESSAGE_BODY);
+
+        template.requestBody("direct:start", MESSAGE_BODY);
+
+        mockEndpoint.assertIsSatisfied();
+        assertFalse(reporter.isEventLoopBlocked(), "Expected Vert.x event loop to not be blocked");
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .toF("vertx-websocket:localhost:%d/slow", port);
+
+                fromF("vertx-websocket:localhost:%d/slow", port)
+                        .delay(600).syncDelayed()
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
index ac5ff4e0882..f695197d6e5 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
@@ -24,8 +24,12 @@ import java.util.function.Consumer;
 
 import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
+import io.vertx.core.VertxException;
+import io.vertx.core.VertxOptions;
 import io.vertx.core.http.HttpClient;
 import io.vertx.core.http.WebSocket;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.btc.BlockedThreadEvent;
 import io.vertx.ext.web.Route;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.RoutingContext;
@@ -79,4 +83,35 @@ public class VertxWebSocketTestSupport extends CamelTestSupport {
         route.handler(handler);
         return router;
     }
+
+    public Vertx createVertxWithThreadBlockedHandler(Handler<BlockedThreadEvent> handler) {
+        VertxOptions vertxOptions = new VertxOptions();
+        vertxOptions.setMaxEventLoopExecuteTime(500);
+        vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);
+        vertxOptions.setBlockedThreadCheckInterval(10);
+        vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.MILLISECONDS);
+        Vertx vertx = Vertx.vertx(vertxOptions);
+        ((VertxInternal) vertx).blockedThreadChecker().setThreadBlockedHandler(handler);
+        return vertx;
+    }
+
+    static class BlockedThreadReporter implements Handler<BlockedThreadEvent> {
+        private boolean eventLoopBlocked;
+
+        @Override
+        public void handle(BlockedThreadEvent event) {
+            VertxException stackTrace = new VertxException("Thread blocked");
+            stackTrace.setStackTrace(event.thread().getStackTrace());
+            stackTrace.printStackTrace();
+            eventLoopBlocked = true;
+        }
+
+        public boolean isEventLoopBlocked() {
+            return eventLoopBlocked;
+        }
+
+        public void reset() {
+            eventLoopBlocked = false;
+        }
+    }
 }