You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/10/19 11:56:33 UTC

[camel] branch main updated: CAMEL-20011: Fix usage of deprecated Vert.x APIs

This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 4275cfdb978 CAMEL-20011: Fix usage of deprecated Vert.x APIs
4275cfdb978 is described below

commit 4275cfdb978ab71dc4a0bfa7edae6d37417f5b4e
Author: James Netherton <ja...@gmail.com>
AuthorDate: Thu Oct 19 08:03:14 2023 +0100

    CAMEL-20011: Fix usage of deprecated Vert.x APIs
---
 .../knative/http/KnativeHttpConsumer.java          | 26 +++------
 .../component/knative/http/KnativeHttpTest.java    | 21 ++++---
 .../http/vertx/VertxPlatformHttpConsumer.java      | 20 ++-----
 .../http/vertx/VertxPlatformHttpServer.java        |  2 +-
 .../reactive/vertx/VertXReactiveExecutor.java      |  5 +-
 .../reactive/vertx/VertXThreadPoolFactory.java     |  6 +-
 .../websocket/VertxWebsocketClientConsumer.java    | 64 +++++++++-------------
 .../vertx/websocket/VertxWebsocketConsumer.java    | 21 ++-----
 8 files changed, 62 insertions(+), 103 deletions(-)

diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index eec36a5045b..626d47997d1 100644
--- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -191,25 +191,13 @@ public class KnativeHttpConsumer extends DefaultConsumer {
         // from("knative:event/my.event")
         //        .to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service");
         //
-        routingContext.vertx().executeBlocking(
-                promise -> {
-                    try {
-                        createUoW(exchange);
-                    } catch (Exception e) {
-                        promise.fail(e);
-                        return;
-                    }
-
-                    getAsyncProcessor().process(exchange, c -> {
-                        if (!exchange.isFailed()) {
-                            promise.complete();
-                        } else {
-                            promise.fail(exchange.getException());
-                        }
-                    });
-                },
-                false,
-                result -> {
+        routingContext.vertx().executeBlocking(() -> {
+            createUoW(exchange);
+            getAsyncProcessor().process(exchange);
+            return null;
+        },
+                false)
+                .onComplete(result -> {
                     try {
                         Throwable failure = null;
 
diff --git a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 89d153a5e11..68667d417ae 100644
--- a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -1801,17 +1801,16 @@ public class KnativeHttpTest {
     @EnumSource(CloudEvents.class)
     void testSlowConsumer(CloudEvent ce) throws Exception {
         final KnativeHttpServer server = new KnativeHttpServer(context, event -> {
-            event.vertx().executeBlocking(
-                    promise -> {
-                        try {
-                            Thread.sleep(5000);
-                            promise.complete();
-                        } catch (InterruptedException e) {
-                            promise.fail(e);
-                        }
-                    },
-                    false,
-                    result -> {
+            event.vertx().executeBlocking(() -> {
+                try {
+                    Thread.sleep(5000);
+                    return null;
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw e;
+                }
+            }, false)
+                    .onComplete(result -> {
                         event.response().setStatusCode(200);
                         event.response().end("");
                     });
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index 4b96d86c504..2a3c91e03c2 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -186,21 +186,13 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
             exchange.getMessage().setHeader(Exchange.HTTP_HOST, httpHeaders.get("Host"));
             exchange.getMessage().removeHeader("Proxy-Connection");
         }
-        vertx.executeBlocking(
-                promise -> {
-                    try {
-                        createUoW(exchange);
-                    } catch (Exception e) {
-                        promise.fail(e);
-                        return;
-                    }
 
-                    getAsyncProcessor().process(exchange, c -> {
-                        promise.complete();
-                    });
-                },
-                false,
-                result -> {
+        vertx.executeBlocking(() -> {
+            createUoW(exchange);
+            getProcessor().process(exchange);
+            return null;
+        }, false)
+                .onComplete(result -> {
                     Throwable failure = null;
                     try {
                         if (result.succeeded()) {
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServer.java
index a7abed73a53..c81e3e8d5b0 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServer.java
@@ -183,7 +183,7 @@ public class VertxPlatformHttpServer extends ServiceSupport implements CamelCont
             subRouter.route().handler(createCorsHandler(configuration));
         }
 
-        router.mountSubRouter(configuration.getPath(), subRouter);
+        router.route(configuration.getPath() + "*").subRouter(subRouter);
 
         context.getRegistry().bind(
                 VertxPlatformHttpRouter.PLATFORM_HTTP_ROUTER_NAME,
diff --git a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index c778bcaf65d..cf74cd5c549 100644
--- a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -99,10 +99,9 @@ public class VertXReactiveExecutor extends ServiceSupport implements CamelContex
     public void scheduleSync(Runnable runnable) {
         LOG.trace("scheduleSync: {}", runnable);
         final Runnable task = runnable;
-        vertx.executeBlocking(future -> {
+        vertx.executeBlocking(() -> {
             task.run();
-            future.complete();
-        }, res -> {
+            return null;
         });
     }
 
diff --git a/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java b/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
index 5dd384c150e..288ea317fca 100644
--- a/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
+++ b/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
@@ -133,10 +133,10 @@ public class VertXThreadPoolFactory extends DefaultThreadPoolFactory implements
             LOG.trace("submit: {}", task);
             final CompletableFuture<?> answer = new CompletableFuture<>();
             // used by vertx
-            vertx.executeBlocking(future -> {
+            vertx.executeBlocking(() -> {
                 task.run();
-                future.complete();
-            }, res -> answer.complete(null));
+                return null;
+            }).onComplete(res -> answer.complete(null));
             return answer;
         }
 
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
index 42aece3170d..0cb287314e8 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
@@ -57,30 +57,29 @@ public class VertxWebsocketClientConsumer extends DefaultConsumer {
 
                 Vertx vertx = getEndpoint().getVertx();
                 vertx.setPeriodic(configuration.getReconnectInitialDelay(), configuration.getReconnectInterval(), timerId -> {
-                    vertx.executeBlocking(promise -> {
-                        try {
-                            configureWebSocketHandlers(getEndpoint().getWebSocket());
-                            vertx.cancelTimer(timerId);
-                            promise.complete();
-                        } catch (Exception e) {
-                            promise.fail(e);
-                        }
-                    }, false, result -> {
-                        if (result.failed()) {
-                            Throwable cause = result.cause();
-                            if (cause != null) {
-                                LOG.debug("WebSocket reconnect to {} failed due to {}", webSocket.remoteAddress(), cause);
-                            }
+                    vertx.executeBlocking(() -> {
+                        configureWebSocketHandlers(getEndpoint().getWebSocket());
+                        vertx.cancelTimer(timerId);
+                        return null;
+                    }, false)
+                            .onComplete(result -> {
+                                if (result.failed()) {
+                                    Throwable cause = result.cause();
+                                    if (cause != null) {
+                                        LOG.debug("WebSocket reconnect to {} failed due to {}", webSocket.remoteAddress(),
+                                                cause);
+                                    }
 
-                            if (configuration.getMaxReconnectAttempts() > 0) {
-                                if (reconnectAttempts.incrementAndGet() == configuration.getMaxReconnectAttempts()) {
-                                    LOG.warn("Reconnect max attempts ({}) exhausted. Giving up trying to reconnect to {}",
-                                            configuration.getMaxReconnectAttempts(), webSocket.remoteAddress());
-                                    vertx.cancelTimer(timerId);
+                                    if (configuration.getMaxReconnectAttempts() > 0) {
+                                        if (reconnectAttempts.incrementAndGet() == configuration.getMaxReconnectAttempts()) {
+                                            LOG.warn(
+                                                    "Reconnect max attempts ({}) exhausted. Giving up trying to reconnect to {}",
+                                                    configuration.getMaxReconnectAttempts(), webSocket.remoteAddress());
+                                            vertx.cancelTimer(timerId);
+                                        }
+                                    }
                                 }
-                            }
-                        }
-                    });
+                            });
                 });
             }
         });
@@ -105,21 +104,12 @@ public class VertxWebsocketClientConsumer extends DefaultConsumer {
 
     protected void processExchange(Exchange exchange) {
         Vertx vertx = getEndpoint().getVertx();
-        vertx.executeBlocking(
-                promise -> {
-                    try {
-                        createUoW(exchange);
-                    } catch (Exception e) {
-                        promise.fail(e);
-                        return;
-                    }
-
-                    getAsyncProcessor().process(exchange, c -> {
-                        promise.complete();
-                    });
-                },
-                false,
-                result -> {
+        vertx.executeBlocking(() -> {
+            createUoW(exchange);
+            getProcessor().process(exchange);
+            return null;
+        }, false)
+                .onComplete(result -> {
                     try {
                         if (result.failed()) {
                             Throwable cause = result.cause();
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
index d505029d671..538fada8a1c 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
@@ -108,21 +108,12 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
     }
 
     protected void processExchange(Exchange exchange, RoutingContext routingContext) {
-        routingContext.vertx().executeBlocking(
-                promise -> {
-                    try {
-                        createUoW(exchange);
-                    } catch (Exception e) {
-                        promise.fail(e);
-                        return;
-                    }
-
-                    getAsyncProcessor().process(exchange, c -> {
-                        promise.complete();
-                    });
-                },
-                false,
-                result -> {
+        routingContext.vertx().executeBlocking(() -> {
+            createUoW(exchange);
+            getProcessor().process(exchange);
+            return null;
+        }, false)
+                .onComplete(result -> {
                     try {
                         if (result.failed()) {
                             Throwable cause = result.cause();