You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 20:34:54 UTC
[camel-quarkus] branch master updated: platform-http: handle
requests using a thread from the worker pool
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push:
new 00cd740 platform-http: handle requests using a thread from the worker pool
00cd740 is described below
commit 00cd740abcc7e39f16ea21fca682651eebe74518
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Sep 30 18:22:01 2020 +0200
platform-http: handle requests using a thread from the worker pool
---
.../http/runtime/QuarkusPlatformHttpConsumer.java | 261 ++++++++++++---------
1 file changed, 153 insertions(+), 108 deletions(-)
diff --git a/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java b/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java
index 087ed58..cf9fc15 100644
--- a/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java
+++ b/extensions/platform-http/runtime/src/main/java/org/apache/camel/quarkus/component/platform/http/runtime/QuarkusPlatformHttpConsumer.java
@@ -35,6 +35,7 @@ import java.util.regex.Pattern;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
@@ -71,10 +72,10 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
private final Router router;
private final List<Handler<RoutingContext>> handlers;
- private Route route;
private final String fileNameExtWhitelist;
private final UploadAttacher uploadAttacher;
private final Pattern PATH_PARAMETER_PATTERN = Pattern.compile("\\{([^/}]+)\\}");
+ private Route route;
public QuarkusPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor processor, Router router,
List<Handler<RoutingContext>> handlers, UploadAttacher uploadAttacher) {
@@ -86,81 +87,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
this.uploadAttacher = uploadAttacher;
}
- @Override
- public PlatformHttpEndpoint getEndpoint() {
- return (PlatformHttpEndpoint) super.getEndpoint();
- }
-
- @Override
- protected void doStart() throws Exception {
- super.doStart();
-
- final PlatformHttpEndpoint endpoint = getEndpoint();
- final String path = endpoint.getPath();
- /* Transform from the Camel path param syntax /path/{key} to vert.x web's /path/:key */
- final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
- final Route newRoute = router.route(vertxPathParamPath);
-
- final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict());
- if (!methods.equals(Method.getAll())) {
- methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name())));
- }
- if (endpoint.getConsumes() != null) {
- newRoute.consumes(endpoint.getConsumes());
- }
- if (endpoint.getProduces() != null) {
- newRoute.produces(endpoint.getProduces());
- }
-
- handlers.forEach(newRoute::handler);
-
- newRoute.handler(
- ctx -> {
- Exchange exchg = null;
- try {
- final Exchange exchange = exchg = toExchange(ctx);
- createUoW(exchange);
- getAsyncProcessor().process(
- exchange,
- doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()));
- } catch (Exception e) {
- ctx.fail(e);
- getExceptionHandler().handleException("Failed handling platform-http endpoint " + path, exchg, e);
- } finally {
- if (exchg != null) {
- doneUoW(exchg);
- }
- }
- });
-
- this.route = newRoute;
- }
-
- @Override
- protected void doStop() throws Exception {
- if (route != null) {
- route.remove();
- route = null;
- }
- super.doStop();
- }
-
- @Override
- protected void doSuspend() throws Exception {
- if (route != null) {
- route.disable();
- }
- super.doSuspend();
- }
-
- @Override
- protected void doResume() throws Exception {
- if (route != null) {
- route.enable();
- }
- super.doResume();
- }
-
static Object toHttpResponse(HttpServerResponse response, Message message, HeaderFilterStrategy headerFilterStrategy) {
final Exchange exchange = message.getExchange();
@@ -307,20 +233,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
}
- Exchange toExchange(RoutingContext ctx) {
- final Exchange exchange = getEndpoint().createExchange();
- Message in = toCamelMessage(ctx, exchange);
-
- final String charset = ctx.parsedHeaders().contentType().parameter("charset");
- if (charset != null) {
- exchange.setProperty(Exchange.CHARSET_NAME, charset);
- in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
- }
-
- exchange.setIn(in);
- return exchange;
- }
-
static void populateCamelHeaders(
RoutingContext ctx,
Map<String, Object> headersMap,
@@ -377,6 +289,157 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
headersMap.put(Exchange.HTTP_RAW_QUERY, request.query());
}
+ @SuppressWarnings("unchecked")
+ static void appendHeader(Map<String, Object> headers, String key, Object value) {
+ if (headers.containsKey(key)) {
+ Object existing = headers.get(key);
+ List<Object> list;
+ if (existing instanceof List) {
+ list = (List<Object>) existing;
+ } else {
+ list = new ArrayList<>();
+ list.add(existing);
+ }
+ list.add(value);
+ value = list;
+ }
+
+ headers.put(key, value);
+ }
+
+ @Override
+ public PlatformHttpEndpoint getEndpoint() {
+ return (PlatformHttpEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ final PlatformHttpEndpoint endpoint = getEndpoint();
+ final String path = endpoint.getPath();
+ /* Transform from the Camel path param syntax /path/{key} to vert.x web's /path/:key */
+ final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
+ final Route newRoute = router.route(vertxPathParamPath);
+
+ final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict());
+ if (!methods.equals(Method.getAll())) {
+ methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name())));
+ }
+ if (endpoint.getConsumes() != null) {
+ newRoute.consumes(endpoint.getConsumes());
+ }
+ if (endpoint.getProduces() != null) {
+ newRoute.produces(endpoint.getProduces());
+ }
+
+ handlers.forEach(newRoute::handler);
+
+ newRoute.handler(this::handleRequest);
+
+ this.route = newRoute;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (route != null) {
+ route.remove();
+ route = null;
+ }
+ super.doStop();
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ if (route != null) {
+ route.disable();
+ }
+ super.doSuspend();
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ if (route != null) {
+ route.enable();
+ }
+ super.doResume();
+ }
+
+ private void handleRequest(RoutingContext ctx) {
+ final Vertx vertx = ctx.vertx();
+ final Exchange exchange = toExchange(ctx);
+
+ //
+ // We do not know if any of the processing logic of the route is synchronous or not so we
+ // need to process the request on a thread on the Vert.x worker pool.
+ //
+ // As example, assuming the platform-http component is configured as the transport provider
+ // for the rest dsl, then the following code may result in a blocking operation that could
+ // block Vert.x event-loop for too long if the target service takes long to respond, as
+ // example in case the service is a knative service scaled to zero that could take some time
+ // to be come available:
+ //
+ // rest("/results")
+ // .get("/{id}")
+ // .route()
+ // .removeHeaders("*", "CamelHttpPath")
+ // .to("rest:get:?bridgeEndpoint=true");
+ //
+ vertx.executeBlocking(
+ promise -> {
+ try {
+ createUoW(exchange);
+ } catch (Exception e) {
+ promise.fail(e);
+ return;
+ }
+
+ getAsyncProcessor().process(exchange, c -> {
+ if (!exchange.isFailed()) {
+ promise.complete();
+ } else {
+ promise.fail(exchange.getException());
+ }
+ });
+ },
+ false,
+ result -> {
+ try {
+ if (result.succeeded()) {
+ try {
+ writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
+ } catch (Exception e) {
+ getExceptionHandler().handleException(
+ "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+ e);
+ }
+ } else {
+ getExceptionHandler().handleException(
+ "Failed handling platform-http endpoint " + getEndpoint().getPath(),
+ result.cause());
+
+ ctx.fail(result.cause());
+ }
+ } finally {
+ doneUoW(exchange);
+ }
+ });
+ }
+
+ Exchange toExchange(RoutingContext ctx) {
+ final Exchange exchange = getEndpoint().createExchange();
+ Message in = toCamelMessage(ctx, exchange);
+
+ final String charset = ctx.parsedHeaders().contentType().parameter("charset");
+ if (charset != null) {
+ exchange.setProperty(Exchange.CHARSET_NAME, charset);
+ in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
+ }
+
+ exchange.setIn(in);
+ return exchange;
+ }
+
Message toCamelMessage(RoutingContext ctx, Exchange exchange) {
final Message result = new DefaultMessage(exchange);
@@ -414,24 +477,6 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {
return result;
}
- @SuppressWarnings("unchecked")
- static void appendHeader(Map<String, Object> headers, String key, Object value) {
- if (headers.containsKey(key)) {
- Object existing = headers.get(key);
- List<Object> list;
- if (existing instanceof List) {
- list = (List<Object>) existing;
- } else {
- list = new ArrayList<>();
- list.add(existing);
- }
- list.add(value);
- value = list;
- }
-
- headers.put(key, value);
- }
-
void populateAttachments(Set<FileUpload> uploads, Message message) {
for (FileUpload upload : uploads) {
final String name = upload.name();