You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 20:34:54 UTC

[camel-quarkus] branch master updated: platform-http: handle requests using a thread from the worker pool

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/master by this push:
     new 00cd740  platform-http: handle requests using a thread from the worker pool
00cd740 is described below

commit 00cd740abcc7e39f16ea21fca682651eebe74518
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Sep 30 18:22:01 2020 +0200

    platform-http: handle requests using a thread from the worker pool
---
 .../http/runtime/QuarkusPlatformHttpConsumer.java  | 261 ++++++++++++---------
 1 file changed, 153 insertions(+), 108 deletions(-)

diff --git a/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java b/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java
index 087ed58..cf9fc15 100644
--- a/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java
+++ b/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java
@@ -35,6 +35,7 @@ import java.util.regex.Pattern;
 
 import io.vertx.core.Handler;
 import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.HttpServerRequest;
@@ -71,10 +72,10 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
 
     private final Router router;
     private final List<Handler<RoutingContext>> handlers;
-    private Route route;
     private final String fileNameExtWhitelist;
     private final UploadAttacher uploadAttacher;
     private final Pattern PATH_PARAMETER_PATTERN = Pattern.compile("\\{([^/}]+)\\}");
+    private Route route;
 
     public QuarkusPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor processor, Router router,
             List<Handler<RoutingContext>> handlers, UploadAttacher uploadAttacher) {
@@ -86,81 +87,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
         this.uploadAttacher = uploadAttacher;
     }
 
-    @Override
-    public PlatformHttpEndpoint getEndpoint() {
-        return (PlatformHttpEndpoint) super.getEndpoint();
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-
-        final PlatformHttpEndpoint endpoint = getEndpoint();
-        final String path = endpoint.getPath();
-        /* Transform from the Camel path param syntax /path/{key} to vert.x web's /path/:key */
-        final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
-        final Route newRoute = router.route(vertxPathParamPath);
-
-        final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict());
-        if (!methods.equals(Method.getAll())) {
-            methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name())));
-        }
-        if (endpoint.getConsumes() != null) {
-            newRoute.consumes(endpoint.getConsumes());
-        }
-        if (endpoint.getProduces() != null) {
-            newRoute.produces(endpoint.getProduces());
-        }
-
-        handlers.forEach(newRoute::handler);
-
-        newRoute.handler(
-                ctx -> {
-                    Exchange exchg = null;
-                    try {
-                        final Exchange exchange = exchg = toExchange(ctx);
-                        createUoW(exchange);
-                        getAsyncProcessor().process(
-                                exchange,
-                                doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()));
-                    } catch (Exception e) {
-                        ctx.fail(e);
-                        getExceptionHandler().handleException("Failed handling platform-http endpoint " + path, exchg, e);
-                    } finally {
-                        if (exchg != null) {
-                            doneUoW(exchg);
-                        }
-                    }
-                });
-
-        this.route = newRoute;
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (route != null) {
-            route.remove();
-            route = null;
-        }
-        super.doStop();
-    }
-
-    @Override
-    protected void doSuspend() throws Exception {
-        if (route != null) {
-            route.disable();
-        }
-        super.doSuspend();
-    }
-
-    @Override
-    protected void doResume() throws Exception {
-        if (route != null) {
-            route.enable();
-        }
-        super.doResume();
-    }
-
     static Object toHttpResponse(HttpServerResponse response, Message message, HeaderFilterStrategy headerFilterStrategy) {
         final Exchange exchange = message.getExchange();
 
@@ -307,20 +233,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
 
     }
 
-    Exchange toExchange(RoutingContext ctx) {
-        final Exchange exchange = getEndpoint().createExchange();
-        Message in = toCamelMessage(ctx, exchange);
-
-        final String charset = ctx.parsedHeaders().contentType().parameter("charset");
-        if (charset != null) {
-            exchange.setProperty(Exchange.CHARSET_NAME, charset);
-            in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
-        }
-
-        exchange.setIn(in);
-        return exchange;
-    }
-
     static void populateCamelHeaders(
             RoutingContext ctx,
             Map<String, Object> headersMap,
@@ -377,6 +289,157 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
         headersMap.put(Exchange.HTTP_RAW_QUERY, request.query());
     }
 
+    @SuppressWarnings("unchecked")
+    static void appendHeader(Map<String, Object> headers, String key, Object value) {
+        if (headers.containsKey(key)) {
+            Object existing = headers.get(key);
+            List<Object> list;
+            if (existing instanceof List) {
+                list = (List<Object>) existing;
+            } else {
+                list = new ArrayList<>();
+                list.add(existing);
+            }
+            list.add(value);
+            value = list;
+        }
+
+        headers.put(key, value);
+    }
+
+    @Override
+    public PlatformHttpEndpoint getEndpoint() {
+        return (PlatformHttpEndpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        final PlatformHttpEndpoint endpoint = getEndpoint();
+        final String path = endpoint.getPath();
+        /* Transform from the Camel path param syntax /path/{key} to vert.x web's /path/:key */
+        final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
+        final Route newRoute = router.route(vertxPathParamPath);
+
+        final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict());
+        if (!methods.equals(Method.getAll())) {
+            methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name())));
+        }
+        if (endpoint.getConsumes() != null) {
+            newRoute.consumes(endpoint.getConsumes());
+        }
+        if (endpoint.getProduces() != null) {
+            newRoute.produces(endpoint.getProduces());
+        }
+
+        handlers.forEach(newRoute::handler);
+
+        newRoute.handler(this::handleRequest);
+
+        this.route = newRoute;
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (route != null) {
+            route.remove();
+            route = null;
+        }
+        super.doStop();
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        if (route != null) {
+            route.disable();
+        }
+        super.doSuspend();
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        if (route != null) {
+            route.enable();
+        }
+        super.doResume();
+    }
+
+    private void handleRequest(RoutingContext ctx) {
+        final Vertx vertx = ctx.vertx();
+        final Exchange exchange = toExchange(ctx);
+
+        //
+        // We do not know if any of the processing logic of the route is synchronous or not so we
+        // need to process the request on a thread on the Vert.x worker pool.
+        //
+        // As example, assuming the platform-http component is configured as the transport provider
+        // for the rest dsl, then the following code may result in a blocking operation that could
+        // block Vert.x event-loop for too long if the target service takes long to respond, as
+        // example in case the service is a knative service scaled to zero that could take some time
+        // to be come available:
+        //
+        //     rest("/results")
+        //         .get("/{id}")
+        //         .route()
+        //             .removeHeaders("*", "CamelHttpPath")
+        //             .to("rest:get:?bridgeEndpoint=true");
+        //
+        vertx.executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        if (!exchange.isFailed()) {
+                            promise.complete();
+                        } else {
+                            promise.fail(exchange.getException());
+                        }
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.succeeded()) {
+                            try {
+                                writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException(
+                                        "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                        e);
+                            }
+                        } else {
+                            getExceptionHandler().handleException(
+                                    "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                    result.cause());
+
+                            ctx.fail(result.cause());
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                    }
+                });
+    }
+
+    Exchange toExchange(RoutingContext ctx) {
+        final Exchange exchange = getEndpoint().createExchange();
+        Message in = toCamelMessage(ctx, exchange);
+
+        final String charset = ctx.parsedHeaders().contentType().parameter("charset");
+        if (charset != null) {
+            exchange.setProperty(Exchange.CHARSET_NAME, charset);
+            in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
+        }
+
+        exchange.setIn(in);
+        return exchange;
+    }
+
     Message toCamelMessage(RoutingContext ctx, Exchange exchange) {
         final Message result = new DefaultMessage(exchange);
 
@@ -414,24 +477,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
         return result;
     }
 
-    @SuppressWarnings("unchecked")
-    static void appendHeader(Map<String, Object> headers, String key, Object value) {
-        if (headers.containsKey(key)) {
-            Object existing = headers.get(key);
-            List<Object> list;
-            if (existing instanceof List) {
-                list = (List<Object>) existing;
-            } else {
-                list = new ArrayList<>();
-                list.add(existing);
-            }
-            list.add(value);
-            value = list;
-        }
-
-        headers.put(key, value);
-    }
-
     void populateAttachments(Set<FileUpload> uploads, Message message) {
         for (FileUpload upload : uploads) {
             final String name = upload.name();