You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/03/03 09:26:54 UTC
[camel] branch camel-3.20.x updated: CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch camel-3.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.20.x by this push:
new f7cc701c8ad CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers
f7cc701c8ad is described below
commit f7cc701c8ad219d86d17d5baed9e1e3e84ecf3e4
Author: James Netherton <ja...@gmail.com>
AuthorDate: Thu Mar 2 11:38:37 2023 +0000
CAMEL-19109: Avoid blocking Vert.x event loop in vertx-websocket consumers
---
.../vertx/websocket/VertxWebsocketConsumer.java | 37 ++++++++++--
.../vertx/websocket/VertxWebsocketHost.java | 6 +-
.../websocket/VertxWebSocketSlowConsumerTest.java | 68 ++++++++++++++++++++++
.../vertx/websocket/VertxWebSocketTestSupport.java | 35 +++++++++++
4 files changed, 139 insertions(+), 7 deletions(-)
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
index cf1991dfdf0..435bf658286 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.vertx.websocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase;
-import org.apache.camel.AsyncCallback;
+import io.vertx.ext.web.RoutingContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
@@ -60,15 +60,13 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
return endpoint.getComponent();
}
- public void onMessage(String connectionKey, Object message, SocketAddress remote) {
+ public void onMessage(String connectionKey, Object message, SocketAddress remote, RoutingContext routingContext) {
Exchange exchange = createExchange(true);
exchange.getMessage().setHeader(VertxWebsocketConstants.REMOTE_ADDRESS, remote);
exchange.getMessage().setHeader(VertxWebsocketConstants.CONNECTION_KEY, connectionKey);
exchange.getMessage().setBody(message);
- // use default consumer callback
- AsyncCallback cb = defaultConsumerCallback(exchange, true);
- getAsyncProcessor().process(exchange, cb);
+ processExchange(exchange, routingContext);
}
public void onException(String connectionKey, Throwable cause, SocketAddress remote) {
@@ -83,4 +81,33 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
getExceptionHandler().handleException("Error processing exchange", exchange, cause);
releaseExchange(exchange, false);
}
+
+ protected void processExchange(Exchange exchange, RoutingContext routingContext) {
+ routingContext.vertx().executeBlocking(
+ promise -> {
+ try {
+ createUoW(exchange);
+ } catch (Exception e) {
+ promise.fail(e);
+ return;
+ }
+
+ getAsyncProcessor().process(exchange, c -> {
+ promise.complete();
+ });
+ },
+ false,
+ result -> {
+ try {
+ if (result.failed()) {
+ Throwable cause = result.cause();
+ getExceptionHandler().handleException(cause);
+ routingContext.fail(cause);
+ }
+ } finally {
+ doneUoW(exchange);
+ releaseExchange(exchange, false);
+ }
+ });
+ }
}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
index 16dc26339b6..04d02da222d 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketHost.java
@@ -112,9 +112,11 @@ public class VertxWebsocketHost {
}
}
- webSocket.textMessageHandler(message -> consumer.onMessage(connectionKey, message, remote));
+ webSocket.textMessageHandler(
+ message -> consumer.onMessage(connectionKey, message, remote, routingContext));
webSocket
- .binaryMessageHandler(message -> consumer.onMessage(connectionKey, message.getBytes(), remote));
+ .binaryMessageHandler(message -> consumer.onMessage(connectionKey, message.getBytes(), remote,
+ routingContext));
webSocket.exceptionHandler(exception -> consumer.onException(connectionKey, exception, remote));
webSocket.closeHandler(closeEvent -> {
if (LOG.isDebugEnabled()) {
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
new file mode 100644
index 00000000000..de982c6d61b
--- /dev/null
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketSlowConsumerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.websocket;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class VertxWebSocketSlowConsumerTest extends VertxWebSocketTestSupport {
+ private static final String MESSAGE_BODY = "Hello World";
+ private final BlockedThreadReporter reporter = new BlockedThreadReporter();
+
+ @AfterEach
+ public void afterEach() {
+ reporter.reset();
+ }
+
+ @BindToRegistry
+ public Vertx createVertx() {
+ return createVertxWithThreadBlockedHandler(reporter);
+ }
+
+ @Test
+ void slowConsumerDoesNotBlockEventLoop() throws Exception {
+ MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
+ mockEndpoint.expectedBodiesReceived(MESSAGE_BODY);
+
+ template.requestBody("direct:start", MESSAGE_BODY);
+
+ mockEndpoint.assertIsSatisfied();
+ assertFalse(reporter.isEventLoopBlocked(), "Expected Vert.x event loop to not be blocked");
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .toF("vertx-websocket:localhost:%d/slow", port);
+
+ fromF("vertx-websocket:localhost:%d/slow", port)
+ .delay(600).syncDelayed()
+ .to("mock:result");
+ }
+ };
+ }
+}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
index ac5ff4e0882..f695197d6e5 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/test/java/org/apache/camel/component/vertx/websocket/VertxWebSocketTestSupport.java
@@ -24,8 +24,12 @@ import java.util.function.Consumer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
+import io.vertx.core.VertxException;
+import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.btc.BlockedThreadEvent;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
@@ -79,4 +83,35 @@ public class VertxWebSocketTestSupport extends CamelTestSupport {
route.handler(handler);
return router;
}
+
+ public Vertx createVertxWithThreadBlockedHandler(Handler<BlockedThreadEvent> handler) {
+ VertxOptions vertxOptions = new VertxOptions();
+ vertxOptions.setMaxEventLoopExecuteTime(500);
+ vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);
+ vertxOptions.setBlockedThreadCheckInterval(10);
+ vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.MILLISECONDS);
+ Vertx vertx = Vertx.vertx(vertxOptions);
+ ((VertxInternal) vertx).blockedThreadChecker().setThreadBlockedHandler(handler);
+ return vertx;
+ }
+
+ static class BlockedThreadReporter implements Handler<BlockedThreadEvent> {
+ private boolean eventLoopBlocked;
+
+ @Override
+ public void handle(BlockedThreadEvent event) {
+ VertxException stackTrace = new VertxException("Thread blocked");
+ stackTrace.setStackTrace(event.thread().getStackTrace());
+ stackTrace.printStackTrace();
+ eventLoopBlocked = true;
+ }
+
+ public boolean isEventLoopBlocked() {
+ return eventLoopBlocked;
+ }
+
+ public void reset() {
+ eventLoopBlocked = false;
+ }
+ }
}