You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 15:41:38 UTC

[camel] branch master updated: camel-platform-http-vertx: handle requests using a thread from the worker pool

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 57fb955  camel-platform-http-vertx: handle requests using a thread from the worker pool
57fb955 is described below

commit 57fb9554c7eeb54d6d707f038fc1febf8cab9cb8
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Sep 30 15:26:03 2020 +0200

    camel-platform-http-vertx: handle requests using a thread from the worker pool
---
 .../http/vertx/VertxPlatformHttpConsumer.java      |  82 ++++++++++++----
 .../http/vertx/VertxPlatformHttpSupport.java       |  23 ++---
 .../http/vertx/VertxPlatformHttpEngineTest.java    | 105 ++++++++++++++++++++-
 3 files changed, 171 insertions(+), 39 deletions(-)

diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index 8c0942d7..125a4ed 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
 
 import io.vertx.core.Handler;
 import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.ext.web.FileUpload;
@@ -106,25 +107,7 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
             newRoute.handler(handler);
         }
 
-        newRoute.handler(
-                ctx -> {
-                    Exchange exchg = null;
-                    try {
-                        final Exchange exchange = exchg = toExchange(ctx);
-                        createUoW(exchange);
-                        getAsyncProcessor().process(
-                                exchange,
-                                doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()));
-                    } catch (Exception e) {
-                        ctx.fail(e);
-                        getExceptionHandler().handleException("Failed handling platform-http endpoint " + endpoint.getPath(),
-                                exchg, e);
-                    } finally {
-                        if (exchg != null) {
-                            doneUoW(exchg);
-                        }
-                    }
-                });
+        newRoute.handler(this::handleRequest);
 
         this.route = newRoute;
     }
@@ -163,6 +146,67 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
         return PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
     }
 
+    private void handleRequest(RoutingContext ctx) {
+        final Vertx vertx = ctx.vertx();
+        final Exchange exchange = toExchange(ctx);
+
+        //
+        // We do not know if any of the processing logic of the route is synchronous or not so we
+        // need to process the request on a thread on the Vert.x worker pool.
+        //
+        // As example, assuming the platform-http component is configured as the transport provider
+        // for the rest dsl, then the following code may result in a blocking operation that could
+        // block Vert.x event-loop for too long if the target service takes long to respond, as
+        // example in case the service is a knative service scaled to zero that could take some time
+        // to be come available:
+        //
+        //     rest("/results")
+        //         .get("/{id}")
+        //         .route()
+        //             .removeHeaders("*", "CamelHttpPath")
+        //             .to("rest:get:?bridgeEndpoint=true");
+        //
+        vertx.executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        if (!exchange.isFailed()) {
+                            promise.complete();
+                        } else {
+                            promise.fail(exchange.getException());
+                        }
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.succeeded()) {
+                            try {
+                                writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException(
+                                        "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                        e);
+                            }
+                        } else {
+                            getExceptionHandler().handleException(
+                                    "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                    result.cause());
+
+                            ctx.fail(result.cause());
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                    }
+                });
+    }
+
     private Exchange toExchange(RoutingContext ctx) {
         final Exchange exchange = getEndpoint().createExchange();
         final Message in = toCamelMessage(ctx, exchange);
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
index d5c4cbc..728b5e4 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.platform.http.vertx;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -34,8 +33,6 @@ import io.vertx.core.http.HttpServerResponse;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.NoTypeConversionAvailableException;
-import org.apache.camel.TypeConversionException;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.ExchangeHelper;
@@ -158,10 +155,11 @@ public final class VertxPlatformHttpSupport {
         return codeToUse;
     }
 
-    static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy) {
+    static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy)
+            throws Exception {
         final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy);
-
         final HttpServerResponse response = ctx.response();
+
         if (body == null) {
             LOGGER.trace("No payload to send as reply for exchange: {}", camelExchange);
             response.end();
@@ -176,20 +174,15 @@ public final class VertxPlatformHttpSupport {
                     b.appendBytes(bytes, 0, len);
                     response.write(b);
                 }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
             }
             response.end();
         } else {
             final TypeConverter tc = camelExchange.getContext().getTypeConverter();
-            try {
-                final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
-                final Buffer b = Buffer.buffer(bb.capacity());
-                b.setBytes(0, bb);
-                response.end(b);
-            } catch (TypeConversionException | NoTypeConversionAvailableException e) {
-                throw new RuntimeException(e);
-            }
+            final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
+            final Buffer b = Buffer.buffer(bb.capacity());
+
+            b.setBytes(0, bb);
+            response.end(b);
         }
 
     }
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
index 0648f1d..06b07e7 100644
--- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
@@ -17,7 +17,9 @@
 package org.apache.camel.component.platform.http.vertx;
 
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
+import io.vertx.core.VertxOptions;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.platform.http.PlatformHttpComponent;
@@ -76,6 +78,28 @@ public class VertxPlatformHttpEngineTest {
     }
 
     @Test
+    public void testEngineSetup() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.start();
+
+            assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull();
+            assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> {
+                assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class);
+            });
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
     public void testEngine() throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
         final CamelContext context = new DefaultCamelContext();
@@ -99,11 +123,6 @@ public class VertxPlatformHttpEngineTest {
 
             context.start();
 
-            assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull();
-            assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> {
-                assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class);
-            });
-
             given()
                     .port(conf.getBindPort())
                     .when()
@@ -127,6 +146,82 @@ public class VertxPlatformHttpEngineTest {
     }
 
     @Test
+    public void testSlowConsumer() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.getRegistry().bind(
+                    "vertx-options",
+                    new VertxOptions()
+                            .setMaxEventLoopExecuteTime(2)
+                            .setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS));
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("platform-http:/get")
+                            .routeId("get")
+                            .process(e -> Thread.sleep(TimeUnit.SECONDS.toMillis(3)))
+                            .setBody().constant("get");
+                }
+            });
+
+            context.start();
+
+            given()
+                    .port(conf.getBindPort())
+                    .when()
+                    .get("/get")
+                    .then()
+                    .statusCode(200)
+                    .body(equalTo("get"));
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
+    public void testFailingConsumer() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("platform-http:/get")
+                            .routeId("get")
+                            .process(exchange -> {
+                                throw new RuntimeException();
+                            });
+                }
+            });
+
+            context.start();
+
+            given()
+                    .port(conf.getBindPort())
+                    .when()
+                    .get("/get")
+                    .then()
+                    .statusCode(500);
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
     public void testEngineSSL() throws Exception {
         VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
         conf.setSslContextParameters(serverSSLParameters);