You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2023/10/19 11:56:33 UTC
[camel] branch main updated: CAMEL-20011: Fix usage of deprecated Vert.x APIs
This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 4275cfdb978 CAMEL-20011: Fix usage of deprecated Vert.x APIs
4275cfdb978 is described below
commit 4275cfdb978ab71dc4a0bfa7edae6d37417f5b4e
Author: James Netherton <ja...@gmail.com>
AuthorDate: Thu Oct 19 08:03:14 2023 +0100
CAMEL-20011: Fix usage of deprecated Vert.x APIs
---
.../knative/http/KnativeHttpConsumer.java | 26 +++------
.../component/knative/http/KnativeHttpTest.java | 21 ++++---
.../http/vertx/VertxPlatformHttpConsumer.java | 20 ++-----
.../http/vertx/VertxPlatformHttpServer.java | 2 +-
.../reactive/vertx/VertXReactiveExecutor.java | 5 +-
.../reactive/vertx/VertXThreadPoolFactory.java | 6 +-
.../websocket/VertxWebsocketClientConsumer.java | 64 +++++++++-------------
.../vertx/websocket/VertxWebsocketConsumer.java | 21 ++-----
8 files changed, 62 insertions(+), 103 deletions(-)
diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index eec36a5045b..626d47997d1 100644
--- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -191,25 +191,13 @@ public class KnativeHttpConsumer extends DefaultConsumer {
// from("knative:event/my.event")
// .to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service");
//
- routingContext.vertx().executeBlocking(
- promise -> {
- try {
- createUoW(exchange);
- } catch (Exception e) {
- promise.fail(e);
- return;
- }
-
- getAsyncProcessor().process(exchange, c -> {
- if (!exchange.isFailed()) {
- promise.complete();
- } else {
- promise.fail(exchange.getException());
- }
- });
- },
- false,
- result -> {
+ routingContext.vertx().executeBlocking(() -> {
+ createUoW(exchange);
+ getAsyncProcessor().process(exchange);
+ return null;
+ },
+ false)
+ .onComplete(result -> {
try {
Throwable failure = null;
diff --git a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 89d153a5e11..68667d417ae 100644
--- a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -1801,17 +1801,16 @@ public class KnativeHttpTest {
@EnumSource(CloudEvents.class)
void testSlowConsumer(CloudEvent ce) throws Exception {
final KnativeHttpServer server = new KnativeHttpServer(context, event -> {
- event.vertx().executeBlocking(
- promise -> {
- try {
- Thread.sleep(5000);
- promise.complete();
- } catch (InterruptedException e) {
- promise.fail(e);
- }
- },
- false,
- result -> {
+ event.vertx().executeBlocking(() -> {
+ try {
+ Thread.sleep(5000);
+ return null;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }, false)
+ .onComplete(result -> {
event.response().setStatusCode(200);
event.response().end("");
});
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index 4b96d86c504..2a3c91e03c2 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -186,21 +186,13 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer implements Suspen
exchange.getMessage().setHeader(Exchange.HTTP_HOST, httpHeaders.get("Host"));
exchange.getMessage().removeHeader("Proxy-Connection");
}
- vertx.executeBlocking(
- promise -> {
- try {
- createUoW(exchange);
- } catch (Exception e) {
- promise.fail(e);
- return;
- }
- getAsyncProcessor().process(exchange, c -> {
- promise.complete();
- });
- },
- false,
- result -> {
+ vertx.executeBlocking(() -> {
+ createUoW(exchange);
+ getProcessor().process(exchange);
+ return null;
+ }, false)
+ .onComplete(result -> {
Throwable failure = null;
try {
if (result.succeeded()) {
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServer.java
index a7abed73a53..c81e3e8d5b0 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServer.java
@@ -183,7 +183,7 @@ public class VertxPlatformHttpServer extends ServiceSupport implements CamelCont
subRouter.route().handler(createCorsHandler(configuration));
}
- router.mountSubRouter(configuration.getPath(), subRouter);
+ router.route(configuration.getPath() + "*").subRouter(subRouter);
context.getRegistry().bind(
VertxPlatformHttpRouter.PLATFORM_HTTP_ROUTER_NAME,
diff --git a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index c778bcaf65d..cf74cd5c549 100644
--- a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -99,10 +99,9 @@ public class VertXReactiveExecutor extends ServiceSupport implements CamelContex
public void scheduleSync(Runnable runnable) {
LOG.trace("scheduleSync: {}", runnable);
final Runnable task = runnable;
- vertx.executeBlocking(future -> {
+ vertx.executeBlocking(() -> {
task.run();
- future.complete();
- }, res -> {
+ return null;
});
}
diff --git a/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java b/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
index 5dd384c150e..288ea317fca 100644
--- a/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
+++ b/components/camel-threadpoolfactory-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXThreadPoolFactory.java
@@ -133,10 +133,10 @@ public class VertXThreadPoolFactory extends DefaultThreadPoolFactory implements
LOG.trace("submit: {}", task);
final CompletableFuture<?> answer = new CompletableFuture<>();
// used by vertx
- vertx.executeBlocking(future -> {
+ vertx.executeBlocking(() -> {
task.run();
- future.complete();
- }, res -> answer.complete(null));
+ return null;
+ }).onComplete(res -> answer.complete(null));
return answer;
}
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
index 42aece3170d..0cb287314e8 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketClientConsumer.java
@@ -57,30 +57,29 @@ public class VertxWebsocketClientConsumer extends DefaultConsumer {
Vertx vertx = getEndpoint().getVertx();
vertx.setPeriodic(configuration.getReconnectInitialDelay(), configuration.getReconnectInterval(), timerId -> {
- vertx.executeBlocking(promise -> {
- try {
- configureWebSocketHandlers(getEndpoint().getWebSocket());
- vertx.cancelTimer(timerId);
- promise.complete();
- } catch (Exception e) {
- promise.fail(e);
- }
- }, false, result -> {
- if (result.failed()) {
- Throwable cause = result.cause();
- if (cause != null) {
- LOG.debug("WebSocket reconnect to {} failed due to {}", webSocket.remoteAddress(), cause);
- }
+ vertx.executeBlocking(() -> {
+ configureWebSocketHandlers(getEndpoint().getWebSocket());
+ vertx.cancelTimer(timerId);
+ return null;
+ }, false)
+ .onComplete(result -> {
+ if (result.failed()) {
+ Throwable cause = result.cause();
+ if (cause != null) {
+ LOG.debug("WebSocket reconnect to {} failed due to {}", webSocket.remoteAddress(),
+ cause);
+ }
- if (configuration.getMaxReconnectAttempts() > 0) {
- if (reconnectAttempts.incrementAndGet() == configuration.getMaxReconnectAttempts()) {
- LOG.warn("Reconnect max attempts ({}) exhausted. Giving up trying to reconnect to {}",
- configuration.getMaxReconnectAttempts(), webSocket.remoteAddress());
- vertx.cancelTimer(timerId);
+ if (configuration.getMaxReconnectAttempts() > 0) {
+ if (reconnectAttempts.incrementAndGet() == configuration.getMaxReconnectAttempts()) {
+ LOG.warn(
+ "Reconnect max attempts ({}) exhausted. Giving up trying to reconnect to {}",
+ configuration.getMaxReconnectAttempts(), webSocket.remoteAddress());
+ vertx.cancelTimer(timerId);
+ }
+ }
}
- }
- }
- });
+ });
});
}
});
@@ -105,21 +104,12 @@ public class VertxWebsocketClientConsumer extends DefaultConsumer {
protected void processExchange(Exchange exchange) {
Vertx vertx = getEndpoint().getVertx();
- vertx.executeBlocking(
- promise -> {
- try {
- createUoW(exchange);
- } catch (Exception e) {
- promise.fail(e);
- return;
- }
-
- getAsyncProcessor().process(exchange, c -> {
- promise.complete();
- });
- },
- false,
- result -> {
+ vertx.executeBlocking(() -> {
+ createUoW(exchange);
+ getProcessor().process(exchange);
+ return null;
+ }, false)
+ .onComplete(result -> {
try {
if (result.failed()) {
Throwable cause = result.cause();
diff --git a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
index d505029d671..538fada8a1c 100644
--- a/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
+++ b/components/camel-vertx/camel-vertx-websocket/src/main/java/org/apache/camel/component/vertx/websocket/VertxWebsocketConsumer.java
@@ -108,21 +108,12 @@ public class VertxWebsocketConsumer extends DefaultConsumer {
}
protected void processExchange(Exchange exchange, RoutingContext routingContext) {
- routingContext.vertx().executeBlocking(
- promise -> {
- try {
- createUoW(exchange);
- } catch (Exception e) {
- promise.fail(e);
- return;
- }
-
- getAsyncProcessor().process(exchange, c -> {
- promise.complete();
- });
- },
- false,
- result -> {
+ routingContext.vertx().executeBlocking(() -> {
+ createUoW(exchange);
+ getProcessor().process(exchange);
+ return null;
+ }, false)
+ .onComplete(result -> {
try {
if (result.failed()) {
Throwable cause = result.cause();