You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 16:02:45 UTC

[camel] branch camel-3.4.x updated (926640b -> 2c70250)

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a change to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 926640b  Add XML schema references for release 3.4.4
     new f802715  camel-platform-http-vertx: handle requests using a thread from the worker pool
     new 2c70250  camel-platform-http-vertx: reformat

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../services/org/apache/camel/other.properties     |   2 +-
 .../generated/resources/platform-http-vertx.json   |   2 +-
 .../http/vertx/VertxPlatformHttpConsumer.java      |  86 +++++++++++++----
 .../http/vertx/VertxPlatformHttpSupport.java       |  23 ++---
 .../http/vertx/VertxPlatformHttpEngineTest.java    | 105 ++++++++++++++++++++-
 5 files changed, 176 insertions(+), 42 deletions(-)


[camel] 02/02: camel-platform-http-vertx: reformat

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2c70250efd1e25cc8f96f528ba50b9fd701687fc
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Sep 30 17:52:51 2020 +0200

    camel-platform-http-vertx: reformat
---
 .../services/org/apache/camel/other.properties     |  2 +-
 .../generated/resources/platform-http-vertx.json   |  2 +-
 .../http/vertx/VertxPlatformHttpConsumer.java      | 81 +++++++++++-----------
 3 files changed, 42 insertions(+), 43 deletions(-)

diff --git a/components/camel-platform-http-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties b/components/camel-platform-http-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties
index 64045fc..7829961 100644
--- a/components/camel-platform-http-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties
+++ b/components/camel-platform-http-vertx/src/generated/resources/META-INF/services/org/apache/camel/other.properties
@@ -2,6 +2,6 @@
 name=platform-http-vertx
 groupId=org.apache.camel
 artifactId=camel-platform-http-vertx
-version=3.4.4-SNAPSHOT
+version=3.4.5-SNAPSHOT
 projectName=Camel :: Platform HTTP :: Vert.x
 projectDescription=Implementation of the Platform HTTP Engine based on Vert.x Web
diff --git a/components/camel-platform-http-vertx/src/generated/resources/platform-http-vertx.json b/components/camel-platform-http-vertx/src/generated/resources/platform-http-vertx.json
index 45d67f7..f41dbe9 100644
--- a/components/camel-platform-http-vertx/src/generated/resources/platform-http-vertx.json
+++ b/components/camel-platform-http-vertx/src/generated/resources/platform-http-vertx.json
@@ -10,6 +10,6 @@
     "supportLevel": "Stable",
     "groupId": "org.apache.camel",
     "artifactId": "camel-platform-http-vertx",
-    "version": "3.4.4-SNAPSHOT"
+    "version": "3.4.5-SNAPSHOT"
   }
 }
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index 125a4ed..fe37c13 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -64,16 +64,15 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
     private Route route;
 
     public VertxPlatformHttpConsumer(
-                                     PlatformHttpEndpoint endpoint,
-                                     Processor processor,
-                                     List<Handler<RoutingContext>> handlers,
-                                     UploadAttacher uploadAttacher) {
+            PlatformHttpEndpoint endpoint,
+            Processor processor,
+            List<Handler<RoutingContext>> handlers,
+            UploadAttacher uploadAttacher) {
         super(endpoint, processor);
 
         this.handlers = handlers;
         this.uploadAttacher = uploadAttacher;
-        this.fileNameExtWhitelist
-                = endpoint.getFileNameExtWhitelist() == null ? null : endpoint.getFileNameExtWhitelist().toLowerCase(Locale.US);
+        this.fileNameExtWhitelist = endpoint.getFileNameExtWhitelist() == null ? null : endpoint.getFileNameExtWhitelist().toLowerCase(Locale.US);
     }
 
     @Override
@@ -167,44 +166,44 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
         //             .to("rest:get:?bridgeEndpoint=true");
         //
         vertx.executeBlocking(
-                promise -> {
-                    try {
-                        createUoW(exchange);
-                    } catch (Exception e) {
-                        promise.fail(e);
-                        return;
-                    }
+            promise -> {
+                try {
+                    createUoW(exchange);
+                } catch (Exception e) {
+                    promise.fail(e);
+                    return;
+                }
 
-                    getAsyncProcessor().process(exchange, c -> {
-                        if (!exchange.isFailed()) {
-                            promise.complete();
-                        } else {
-                            promise.fail(exchange.getException());
-                        }
-                    });
-                },
-                false,
-                result -> {
-                    try {
-                        if (result.succeeded()) {
-                            try {
-                                writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
-                            } catch (Exception e) {
-                                getExceptionHandler().handleException(
-                                        "Failed handling platform-http endpoint " + getEndpoint().getPath(),
-                                        e);
-                            }
-                        } else {
+                getAsyncProcessor().process(exchange, c -> {
+                    if (!exchange.isFailed()) {
+                        promise.complete();
+                    } else {
+                        promise.fail(exchange.getException());
+                    }
+                });
+            },
+            false,
+            result -> {
+                try {
+                    if (result.succeeded()) {
+                        try {
+                            writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
+                        } catch (Exception e) {
                             getExceptionHandler().handleException(
                                     "Failed handling platform-http endpoint " + getEndpoint().getPath(),
-                                    result.cause());
-
-                            ctx.fail(result.cause());
+                                    e);
                         }
-                    } finally {
-                        doneUoW(exchange);
+                    } else {
+                        getExceptionHandler().handleException(
+                                "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                result.cause());
+
+                        ctx.fail(result.cause());
                     }
-                });
+                } finally {
+                    doneUoW(exchange);
+                }
+            });
     }
 
     private Exchange toExchange(RoutingContext ctx) {
@@ -286,8 +285,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
                 uploadAttacher.attachUpload(localFile, fileName, message);
             } else {
                 LOGGER.debug(
-                        "Cannot add file as attachment: {} because the file is not accepted according to fileNameExtWhitelist: {}",
-                        fileName, fileNameExtWhitelist);
+                    "Cannot add file as attachment: {} because the file is not accepted according to fileNameExtWhitelist: {}",
+                    fileName, fileNameExtWhitelist);
             }
         }
     }


[camel] 01/02: camel-platform-http-vertx: handle requests using a thread from the worker pool

Posted by lb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f8027157fcb4bae1891a38c629abc88aea1daa4d
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Sep 30 15:26:03 2020 +0200

    camel-platform-http-vertx: handle requests using a thread from the worker pool
---
 .../http/vertx/VertxPlatformHttpConsumer.java      | 101 ++++++++++++++------
 .../http/vertx/VertxPlatformHttpSupport.java       |  23 ++---
 .../http/vertx/VertxPlatformHttpEngineTest.java    | 105 ++++++++++++++++++++-
 3 files changed, 182 insertions(+), 47 deletions(-)

diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index f816ea6..125a4ed 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
 
 import io.vertx.core.Handler;
 import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.ext.web.FileUpload;
@@ -63,15 +64,16 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
     private Route route;
 
     public VertxPlatformHttpConsumer(
-            PlatformHttpEndpoint endpoint,
-            Processor processor,
-            List<Handler<RoutingContext>> handlers,
-            UploadAttacher uploadAttacher) {
+                                     PlatformHttpEndpoint endpoint,
+                                     Processor processor,
+                                     List<Handler<RoutingContext>> handlers,
+                                     UploadAttacher uploadAttacher) {
         super(endpoint, processor);
 
         this.handlers = handlers;
         this.uploadAttacher = uploadAttacher;
-        this.fileNameExtWhitelist = endpoint.getFileNameExtWhitelist() == null ? null : endpoint.getFileNameExtWhitelist().toLowerCase(Locale.US);
+        this.fileNameExtWhitelist
+                = endpoint.getFileNameExtWhitelist() == null ? null : endpoint.getFileNameExtWhitelist().toLowerCase(Locale.US);
     }
 
     @Override
@@ -101,28 +103,11 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
         }
 
         newRoute.handler(router.bodyHandler());
-        for (Handler<RoutingContext> handler: handlers) {
+        for (Handler<RoutingContext> handler : handlers) {
             newRoute.handler(handler);
         }
 
-        newRoute.handler(
-            ctx -> {
-                Exchange exchg = null;
-                try {
-                    final Exchange exchange = exchg = toExchange(ctx);
-                    createUoW(exchange);
-                    getAsyncProcessor().process(
-                        exchange,
-                        doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()));
-                } catch (Exception e) {
-                    ctx.fail(e);
-                    getExceptionHandler().handleException("Failed handling platform-http endpoint " + endpoint.getPath(), exchg, e);
-                } finally {
-                    if (exchg != null) {
-                        doneUoW(exchg);
-                    }
-                }
-            });
+        newRoute.handler(this::handleRequest);
 
         this.route = newRoute;
     }
@@ -161,6 +146,67 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
         return PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
     }
 
+    private void handleRequest(RoutingContext ctx) {
+        final Vertx vertx = ctx.vertx();
+        final Exchange exchange = toExchange(ctx);
+
+        //
+        // We do not know if any of the processing logic of the route is synchronous or not so we
+        // need to process the request on a thread on the Vert.x worker pool.
+        //
+        // As example, assuming the platform-http component is configured as the transport provider
+        // for the rest dsl, then the following code may result in a blocking operation that could
+        // block Vert.x event-loop for too long if the target service takes long to respond, as
+        // example in case the service is a knative service scaled to zero that could take some time
+        // to be come available:
+        //
+        //     rest("/results")
+        //         .get("/{id}")
+        //         .route()
+        //             .removeHeaders("*", "CamelHttpPath")
+        //             .to("rest:get:?bridgeEndpoint=true");
+        //
+        vertx.executeBlocking(
+                promise -> {
+                    try {
+                        createUoW(exchange);
+                    } catch (Exception e) {
+                        promise.fail(e);
+                        return;
+                    }
+
+                    getAsyncProcessor().process(exchange, c -> {
+                        if (!exchange.isFailed()) {
+                            promise.complete();
+                        } else {
+                            promise.fail(exchange.getException());
+                        }
+                    });
+                },
+                false,
+                result -> {
+                    try {
+                        if (result.succeeded()) {
+                            try {
+                                writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException(
+                                        "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                        e);
+                            }
+                        } else {
+                            getExceptionHandler().handleException(
+                                    "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+                                    result.cause());
+
+                            ctx.fail(result.cause());
+                        }
+                    } finally {
+                        doneUoW(exchange);
+                    }
+                });
+    }
+
     private Exchange toExchange(RoutingContext ctx) {
         final Exchange exchange = getEndpoint().createExchange();
         final Message in = toCamelMessage(ctx, exchange);
@@ -187,7 +233,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
             final Map<String, Object> body = new HashMap<>();
             for (String key : formData.names()) {
                 for (String value : formData.getAll(key)) {
-                    if (headerFilterStrategy != null && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
+                    if (headerFilterStrategy != null
+                            && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
                         appendHeader(result.getHeaders(), key, value);
                         appendHeader(body, key, value);
                     }
@@ -239,8 +286,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
                 uploadAttacher.attachUpload(localFile, fileName, message);
             } else {
                 LOGGER.debug(
-                    "Cannot add file as attachment: {} because the file is not accepted according to fileNameExtWhitelist: {}",
-                    fileName, fileNameExtWhitelist);
+                        "Cannot add file as attachment: {} because the file is not accepted according to fileNameExtWhitelist: {}",
+                        fileName, fileNameExtWhitelist);
             }
         }
     }
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
index 61277a7..b428d87 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.platform.http.vertx;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -34,8 +33,6 @@ import io.vertx.core.http.HttpServerResponse;
 import io.vertx.ext.web.RoutingContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.NoTypeConversionAvailableException;
-import org.apache.camel.TypeConversionException;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.ExchangeHelper;
@@ -159,10 +156,11 @@ public final class VertxPlatformHttpSupport {
         return codeToUse;
     }
 
-    static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy) {
+    static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy)
+            throws Exception {
         final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy);
-
         final HttpServerResponse response = ctx.response();
+
         if (body == null) {
             LOGGER.trace("No payload to send as reply for exchange: {}", camelExchange);
             response.end();
@@ -177,20 +175,15 @@ public final class VertxPlatformHttpSupport {
                     b.appendBytes(bytes, 0, len);
                     response.write(b);
                 }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
             }
             response.end();
         } else {
             final TypeConverter tc = camelExchange.getContext().getTypeConverter();
-            try {
-                final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
-                final Buffer b = Buffer.buffer(bb.capacity());
-                b.setBytes(0, bb);
-                response.end(b);
-            } catch (TypeConversionException | NoTypeConversionAvailableException e) {
-                throw new RuntimeException(e);
-            }
+            final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
+            final Buffer b = Buffer.buffer(bb.capacity());
+
+            b.setBytes(0, bb);
+            response.end(b);
         }
 
     }
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
index e993860..ee54a1c 100644
--- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
@@ -17,7 +17,9 @@
 package org.apache.camel.component.platform.http.vertx;
 
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
+import io.vertx.core.VertxOptions;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.platform.http.PlatformHttpComponent;
@@ -76,6 +78,28 @@ public class VertxPlatformHttpEngineTest {
     }
 
     @Test
+    public void testEngineSetup() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.start();
+
+            assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull();
+            assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> {
+                assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class);
+            });
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
     public void testEngine() throws Exception {
         final int port = AvailablePortFinder.getNextAvailable();
         final CamelContext context = new DefaultCamelContext();
@@ -99,11 +123,6 @@ public class VertxPlatformHttpEngineTest {
 
             context.start();
 
-            assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull();
-            assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> {
-                assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class);
-            });
-
             given()
                 .port(conf.getBindPort())
             .when()
@@ -127,6 +146,82 @@ public class VertxPlatformHttpEngineTest {
     }
 
     @Test
+    public void testSlowConsumer() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.getRegistry().bind(
+                    "vertx-options",
+                    new VertxOptions()
+                            .setMaxEventLoopExecuteTime(2)
+                            .setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS));
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("platform-http:/get")
+                            .routeId("get")
+                            .process(e -> Thread.sleep(TimeUnit.SECONDS.toMillis(3)))
+                            .setBody().constant("get");
+                }
+            });
+
+            context.start();
+
+            given()
+                    .port(conf.getBindPort())
+                    .when()
+                    .get("/get")
+                    .then()
+                    .statusCode(200)
+                    .body(equalTo("get"));
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
+    public void testFailingConsumer() throws Exception {
+        final int port = AvailablePortFinder.getNextAvailable();
+        final CamelContext context = new DefaultCamelContext();
+
+        try {
+            VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+            conf.setBindPort(port);
+
+            context.addService(new VertxPlatformHttpServer(conf));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("platform-http:/get")
+                            .routeId("get")
+                            .process(exchange -> {
+                                throw new RuntimeException();
+                            });
+                }
+            });
+
+            context.start();
+
+            given()
+                    .port(conf.getBindPort())
+                    .when()
+                    .get("/get")
+                    .then()
+                    .statusCode(500);
+
+        } finally {
+            context.stop();
+        }
+    }
+
+    @Test
     public void testEngineSSL() throws Exception {
         VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
         conf.setSslContextParameters(serverSSLParameters);