You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 16:02:46 UTC
[camel] 01/02: camel-platform-http-vertx: handle requests using a
thread from the worker pool
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit f8027157fcb4bae1891a38c629abc88aea1daa4d
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Sep 30 15:26:03 2020 +0200
camel-platform-http-vertx: handle requests using a thread from the worker pool
---
.../http/vertx/VertxPlatformHttpConsumer.java | 101 ++++++++++++++------
.../http/vertx/VertxPlatformHttpSupport.java | 23 ++---
.../http/vertx/VertxPlatformHttpEngineTest.java | 105 ++++++++++++++++++++-
3 files changed, 182 insertions(+), 47 deletions(-)
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index f816ea6..125a4ed 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.FileUpload;
@@ -63,15 +64,16 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
private Route route;
public VertxPlatformHttpConsumer(
- PlatformHttpEndpoint endpoint,
- Processor processor,
- List<Handler<RoutingContext>> handlers,
- UploadAttacher uploadAttacher) {
+ PlatformHttpEndpoint endpoint,
+ Processor processor,
+ List<Handler<RoutingContext>> handlers,
+ UploadAttacher uploadAttacher) {
super(endpoint, processor);
this.handlers = handlers;
this.uploadAttacher = uploadAttacher;
- this.fileNameExtWhitelist = endpoint.getFileNameExtWhitelist() == null ? null : endpoint.getFileNameExtWhitelist().toLowerCase(Locale.US);
+ this.fileNameExtWhitelist
+ = endpoint.getFileNameExtWhitelist() == null ? null : endpoint.getFileNameExtWhitelist().toLowerCase(Locale.US);
}
@Override
@@ -101,28 +103,11 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
}
newRoute.handler(router.bodyHandler());
- for (Handler<RoutingContext> handler: handlers) {
+ for (Handler<RoutingContext> handler : handlers) {
newRoute.handler(handler);
}
- newRoute.handler(
- ctx -> {
- Exchange exchg = null;
- try {
- final Exchange exchange = exchg = toExchange(ctx);
- createUoW(exchange);
- getAsyncProcessor().process(
- exchange,
- doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()));
- } catch (Exception e) {
- ctx.fail(e);
- getExceptionHandler().handleException("Failed handling platform-http endpoint " + endpoint.getPath(), exchg, e);
- } finally {
- if (exchg != null) {
- doneUoW(exchg);
- }
- }
- });
+ newRoute.handler(this::handleRequest);
this.route = newRoute;
}
@@ -161,6 +146,67 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
return PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
}
+ private void handleRequest(RoutingContext ctx) {
+ final Vertx vertx = ctx.vertx();
+ final Exchange exchange = toExchange(ctx);
+
+ //
+ // We do not know if any of the processing logic of the route is synchronous or not so we
+ // need to process the request on a thread on the Vert.x worker pool.
+ //
+ // As example, assuming the platform-http component is configured as the transport provider
+ // for the rest dsl, then the following code may result in a blocking operation that could
+ // block Vert.x event-loop for too long if the target service takes long to respond, as
+ // example in case the service is a knative service scaled to zero that could take some time
+ // to be come available:
+ //
+ // rest("/results")
+ // .get("/{id}")
+ // .route()
+ // .removeHeaders("*", "CamelHttpPath")
+ // .to("rest:get:?bridgeEndpoint=true");
+ //
+ vertx.executeBlocking(
+ promise -> {
+ try {
+ createUoW(exchange);
+ } catch (Exception e) {
+ promise.fail(e);
+ return;
+ }
+
+ getAsyncProcessor().process(exchange, c -> {
+ if (!exchange.isFailed()) {
+ promise.complete();
+ } else {
+ promise.fail(exchange.getException());
+ }
+ });
+ },
+ false,
+ result -> {
+ try {
+ if (result.succeeded()) {
+ try {
+ writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
+ } catch (Exception e) {
+ getExceptionHandler().handleException(
+ "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+ e);
+ }
+ } else {
+ getExceptionHandler().handleException(
+ "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+ result.cause());
+
+ ctx.fail(result.cause());
+ }
+ } finally {
+ doneUoW(exchange);
+ }
+ });
+ }
+
private Exchange toExchange(RoutingContext ctx) {
final Exchange exchange = getEndpoint().createExchange();
final Message in = toCamelMessage(ctx, exchange);
@@ -187,7 +233,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
final Map<String, Object> body = new HashMap<>();
for (String key : formData.names()) {
for (String value : formData.getAll(key)) {
- if (headerFilterStrategy != null && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
+ if (headerFilterStrategy != null
+ && !headerFilterStrategy.applyFilterToExternalHeaders(key, value, exchange)) {
appendHeader(result.getHeaders(), key, value);
appendHeader(body, key, value);
}
@@ -239,8 +286,8 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer {
uploadAttacher.attachUpload(localFile, fileName, message);
} else {
LOGGER.debug(
- "Cannot add file as attachment: {} because the file is not accepted according to fileNameExtWhitelist: {}",
- fileName, fileNameExtWhitelist);
+ "Cannot add file as attachment: {} because the file is not accepted according to fileNameExtWhitelist: {}",
+ fileName, fileNameExtWhitelist);
}
}
}
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
index 61277a7..b428d87 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpSupport.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.platform.http.vertx;
-import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -34,8 +33,6 @@ import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.RoutingContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.NoTypeConversionAvailableException;
-import org.apache.camel.TypeConversionException;
import org.apache.camel.TypeConverter;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.ExchangeHelper;
@@ -159,10 +156,11 @@ public final class VertxPlatformHttpSupport {
return codeToUse;
}
- static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy) {
+ static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilterStrategy headerFilterStrategy)
+ throws Exception {
final Object body = toHttpResponse(ctx.response(), camelExchange.getMessage(), headerFilterStrategy);
-
final HttpServerResponse response = ctx.response();
+
if (body == null) {
LOGGER.trace("No payload to send as reply for exchange: {}", camelExchange);
response.end();
@@ -177,20 +175,15 @@ public final class VertxPlatformHttpSupport {
b.appendBytes(bytes, 0, len);
response.write(b);
}
- } catch (IOException e) {
- throw new RuntimeException(e);
}
response.end();
} else {
final TypeConverter tc = camelExchange.getContext().getTypeConverter();
- try {
- final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
- final Buffer b = Buffer.buffer(bb.capacity());
- b.setBytes(0, bb);
- response.end(b);
- } catch (TypeConversionException | NoTypeConversionAvailableException e) {
- throw new RuntimeException(e);
- }
+ final ByteBuffer bb = tc.mandatoryConvertTo(ByteBuffer.class, body);
+ final Buffer b = Buffer.buffer(bb.capacity());
+
+ b.setBytes(0, bb);
+ response.end(b);
}
}
diff --git a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
index e993860..ee54a1c 100644
--- a/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
+++ b/components/camel-platform-http-vertx/src/test/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpEngineTest.java
@@ -17,7 +17,9 @@
package org.apache.camel.component.platform.http.vertx;
import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import io.vertx.core.VertxOptions;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.platform.http.PlatformHttpComponent;
@@ -76,6 +78,28 @@ public class VertxPlatformHttpEngineTest {
}
@Test
+ public void testEngineSetup() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+ final CamelContext context = new DefaultCamelContext();
+
+ try {
+ VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+ conf.setBindPort(port);
+
+ context.addService(new VertxPlatformHttpServer(conf));
+ context.start();
+
+ assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull();
+ assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> {
+ assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class);
+ });
+
+ } finally {
+ context.stop();
+ }
+ }
+
+ @Test
public void testEngine() throws Exception {
final int port = AvailablePortFinder.getNextAvailable();
final CamelContext context = new DefaultCamelContext();
@@ -99,11 +123,6 @@ public class VertxPlatformHttpEngineTest {
context.start();
- assertThat(VertxPlatformHttpRouter.lookup(context)).isNotNull();
- assertThat(context.getComponent("platform-http")).isInstanceOfSatisfying(PlatformHttpComponent.class, component -> {
- assertThat(component.getEngine()).isInstanceOf(VertxPlatformHttpEngine.class);
- });
-
given()
.port(conf.getBindPort())
.when()
@@ -127,6 +146,82 @@ public class VertxPlatformHttpEngineTest {
}
@Test
+ public void testSlowConsumer() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+ final CamelContext context = new DefaultCamelContext();
+
+ try {
+ VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+ conf.setBindPort(port);
+
+ context.getRegistry().bind(
+ "vertx-options",
+ new VertxOptions()
+ .setMaxEventLoopExecuteTime(2)
+ .setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS));
+
+ context.addService(new VertxPlatformHttpServer(conf));
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("platform-http:/get")
+ .routeId("get")
+ .process(e -> Thread.sleep(TimeUnit.SECONDS.toMillis(3)))
+ .setBody().constant("get");
+ }
+ });
+
+ context.start();
+
+ given()
+ .port(conf.getBindPort())
+ .when()
+ .get("/get")
+ .then()
+ .statusCode(200)
+ .body(equalTo("get"));
+
+ } finally {
+ context.stop();
+ }
+ }
+
+ @Test
+ public void testFailingConsumer() throws Exception {
+ final int port = AvailablePortFinder.getNextAvailable();
+ final CamelContext context = new DefaultCamelContext();
+
+ try {
+ VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
+ conf.setBindPort(port);
+
+ context.addService(new VertxPlatformHttpServer(conf));
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("platform-http:/get")
+ .routeId("get")
+ .process(exchange -> {
+ throw new RuntimeException();
+ });
+ }
+ });
+
+ context.start();
+
+ given()
+ .port(conf.getBindPort())
+ .when()
+ .get("/get")
+ .then()
+ .statusCode(500);
+
+ } finally {
+ context.stop();
+ }
+ }
+
+ @Test
public void testEngineSSL() throws Exception {
VertxPlatformHttpServerConfiguration conf = new VertxPlatformHttpServerConfiguration();
conf.setSslContextParameters(serverSSLParameters);