You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 16:02:46 UTC

[camel] 01/02: camel-platform-http-vertx: handle requests using a thread from the worker pool

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f8027157fcb4bae1891a38c629abc88aea1daa4d
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Sep 30 15:26:03 2020 +0200

    camel-platform-http-vertx: handle requests using a thread from the worker pool
---
 .../http/vertx/VertxPlatformHttpConsumer.java      | 101 ++++++++++++++------
 .../http/vertx/VertxPlatformHttpSupport.java       |  23 ++---
 .../http/vertx/VertxPlatformHttpEngineTest.java    | 105 ++++++++++++++++++++-
 3 files changed, 182 insertions(+), 47 deletions(-)

diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index f816ea6..125a4ed 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
 
 import io.vertx.core.Handler;
 import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.ext.web.FileUpload;
@@ -63,15 +64,16 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
     private Route route;
 
     public VertxPlatformHttpConsumer(
-            PlatformHttpEndpoint endpoint,
-            Processor processor,
-            List<Handler<RoutingContext>> handlers,
-            UploadAttacher uploadAttacher) {
+                                     PlatformHttpEndpoint endpoint,
+                                     Processor processor,
+                                     List<Handler<RoutingContext>> handlers,
+                                     UploadAttacher uploadAttacher) {
         super(endpoint, processor);
 
         this.handlers = handlers;
         this.uploadAttacher = uploadAttacher;
-        this.fileNameExtWhitelist = endpoint.getFileNameExtWhitelist() == null ? null : endpoint.getFileNameExtWhitelist().toLowerCase(Locale.US);
+        this.fileNameExtWhitelist
+                = endpoint.getFileNameExtWhitelist() == null ? null : endpoint.getFileNameExtWhitelist().toLowerCase(Locale.US);
     }
 
     @Override
@@ -101,28 +103,11 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
         }
 
         newRoute.handler(router.bodyHandler());
-        for (Handler<RoutingContext> handler: handlers) {
+        for (Handler<RoutingContext> handler : handlers) {
             newRoute.handler(handler);
         }
 
-        newRoute.handler(
-            ctx -> {
-                Exchange exchg = null;
-                try {
-                    final Exchange exchange = exchg = toExchange(ctx);
-                    createUoW(exchange);
-                    getAsyncProcessor().process(
-                        exchange,
-                        doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()));
-                } catch (Exception e) {
-                    ctx.fail(e);
-                    getExceptionHandler().handleException("Failed handling platform-http endpoint " + endpoint.getPath(), exchg, e);
-                } finally {
-                    if (exchg != null) {
-                        doneUoW(exchg);
-                    }
-                }
-            });
+        newRoute.handler(this::handleRequest);
 
         this.route = newRoute;
     }
@@ -161,6 +146,67 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
         return PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
     }
 
+    private void handleRequest(RoutingContext ctx) {
+        final Vertx vertx = ctx.vertx();
+        final Exchange exchange = toExchange(ctx);
+
+        //
+        // We do not know if any of the processing logic of the route is synchronous or not so we
+        // need to process the request on a thread on the Vert.x worker pool.
+        //
+        // As example, assuming the platform-http component is configured as the transport provider
+        // for the rest dsl, then the following code may result in a blocking operation that could
+        // block Vert.x event-loop for too long if the target service takes long to respond, as
+        // example in case the service is a knative service scaled to zero that could take some time
+        // to be come available:
+        //
+        //     rest("/results")
+        //         .get("/{id}")
+        //         .route()
+        //             .removeHeaders("*", "CamelHttpPath")
+        //             .to("rest:get:?bridgeEndpoint=true");
+        //
+        vertx.executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        if (!exchange.isFailed()) {
+                            promise.complete();
+                        } else {
+                            promise.fail(exchange.getException());
+                        }
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.succeeded()) {
+                            try {
+                                writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException(
+                                        "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                        e);
+                            }
+                        } else {
+                            getExceptionHandler().handleException(
+                                    "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                    result.cause());
+
+                            ctx.fail(result.cause());
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                    }
+                });
+    }
+
     private Exchange toExchange(RoutingContext ctx) {
         final Exchange exchange = getEndpoint().createExchange();
         final Message in = toCamelMessage(ctx, exchange);
@@ -187,7 +233,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
             final Map<String, Object> body = new HashMap<>();
             for (String key : formData.names()) {
                 for (String value : formData.getAll(key)) {
-                    if (headerFilterStrategy != null && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
+                    if (headerFilterStrategy != null
+                            && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
                         appendHeader(result.getHeaders(), key, value);
                         appendHeader(body, key, value);
                     }
@@ -239,8 +286,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
                 uploadAttacher.attachUpload(localFile, fileName, message);
             } else {
                 LOGGER.debug(
-                    "Cannot add file as attachment: {} because the file is not accepted according to fileNameExtWhitelist: {}",
-                    fileName, fileNameExtWhitelist);
+                        "Cannot add file as attachment: {} because the file is not accepted according to fileNameExtWhitelist: {}",
+                        fileName, fileNameExtWhitelist);
             }
         }
     }
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
index 61277a7..b428d87 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.platform.http.vertx;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -34,8 +33,6 @@ import io.vertx.core.http.HttpServerResponse;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.NoTypeConversionAvailableException;
-import org.apache.camel.TypeConversionException;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.ExchangeHelper;
@@ -159,10 +156,11 @@ public final class VertxPlatformHttpSupport {
         return codeToUse;
     }
 
-    static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy) {
+    static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy)
+            throws Exception {
         final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy);
-
         final HttpServerResponse response = ctx.response();
+
         if (body == null) {
             LOGGER.trace("No payload to send as reply for exchange: {}", camelExchange);
             response.end();
@@ -177,20 +175,15 @@ public final class VertxPlatformHttpSupport {
                     b.appendBytes(bytes, 0, len);
                     response.write(b);
                 }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
             }
             response.end();
         } else {
             final TypeConverter tc = camelExchange.getContext().getTypeConverter();
-            try {
-                final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
-                final Buffer b = Buffer.buffer(bb.capacity());
-                b.setBytes(0, bb);
-                response.end(b);
-            } catch (TypeConversionException | NoTypeConversionAvailableException e) {
-                throw new RuntimeException(e);
-            }
+            final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
+            final Buffer b = Buffer.buffer(bb.capacity());
+
+            b.setBytes(0, bb);
+            response.end(b);
         }
 
     }
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
index e993860..ee54a1c 100644
--- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
@@ -17,7 +17,9 @@
 package org.apache.camel.component.platform.http.vertx;
 
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
+import io.vertx.core.VertxOptions;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.platform.http.PlatformHttpComponent;
@@ -76,6 +78,28 @@ public class VertxPlatformHttpEngineTest {
     }
 
     @Test
+    public void testEngineSetup() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.start();
+
+            assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull();
+            assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> {
+                assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class);
+            });
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
     public void testEngine() throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
         final CamelContext context = new DefaultCamelContext();
@@ -99,11 +123,6 @@ public class VertxPlatformHttpEngineTest {
 
             context.start();
 
-            assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull();
-            assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> {
-                assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class);
-            });
-
             given()
                 .port(conf.getBindPort())
             .when()
@@ -127,6 +146,82 @@ public class VertxPlatformHttpEngineTest {
     }
 
     @Test
+    public void testSlowConsumer() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.getRegistry().bind(
+                    "vertx-options",
+                    new VertxOptions()
+                            .setMaxEventLoopExecuteTime(2)
+                            .setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS));
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("platform-http:/get")
+                            .routeId("get")
+                            .process(e -> Thread.sleep(TimeUnit.SECONDS.toMillis(3)))
+                            .setBody().constant("get");
+                }
+            });
+
+            context.start();
+
+            given()
+                    .port(conf.getBindPort())
+                    .when()
+                    .get("/get")
+                    .then()
+                    .statusCode(200)
+                    .body(equalTo("get"));
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
+    public void testFailingConsumer() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("platform-http:/get")
+                            .routeId("get")
+                            .process(exchange -> {
+                                throw new RuntimeException();
+                            });
+                }
+            });
+
+            context.start();
+
+            given()
+                    .port(conf.getBindPort())
+                    .when()
+                    .get("/get")
+                    .then()
+                    .statusCode(500);
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
     public void testEngineSSL() throws Exception {
         VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
         conf.setSslContextParameters(serverSSLParameters);