You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 16:24:42 UTC

[camel-k-runtime] branch master updated: knative(http): use async processor to process http requests

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git


The following commit(s) were added to refs/heads/master by this push:
     new 92d7ca4  knative(http): use async processor to process http requests
92d7ca4 is described below

commit 92d7ca4bd71a15716641b49a5aa50c7dd84251af
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Sep 30 17:16:15 2020 +0200

    knative(http): use async processor to process http requests
---
 .../knative/http/KnativeHttpConsumer.java          | 70 ++++++++++------------
 1 file changed, 32 insertions(+), 38 deletions(-)

diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
index ecd1dc7..bb1b932 100644
--- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
+++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java
@@ -158,32 +158,36 @@ public class KnativeHttpConsumer extends DefaultConsumer {
             message.setBody(null);
         }
 
-        try {
-            createUoW(exchange);
-
-            // We do not know if any of the processing logic of the route is synchronous or not so we
-            // need to process the request on a thread on the Vert.x worker pool.
-            //
-            // As example the following route may block the Vert.x event loop as the camel-http component
-            // is not async so if the service is scaled-down, the it may take a while to become ready and
-            // the camel-http component blocks until the service becomes available.
-            //
-            // from("knative:event/my.event")
-            //        .to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service");
-            //
-            router.vertx().executeBlocking(
-                promise -> {
-                    try {
-                        // no need to use an async processor as the processing happen in
-                        // a dedicated thread ans it won't block the Vert.x event loop
-                        getProcessor().process(exchange);
+        // We do not know if any of the processing logic of the route is synchronous or not so we
+        // need to process the request on a thread on the Vert.x worker pool.
+        //
+        // As example the following route may block the Vert.x event loop as the camel-http component
+        // is not async so if the service is scaled-down, the it may take a while to become ready and
+        // the camel-http component blocks until the service becomes available.
+        //
+        // from("knative:event/my.event")
+        //        .to("http://{{env:PROJECT}}.{{env:NAMESPACE}}.svc.cluster.local/service");
+        //
+        router.vertx().executeBlocking(
+            promise -> {
+                try {
+                    createUoW(exchange);
+                } catch (Exception e) {
+                    promise.fail(e);
+                    return;
+                }
+
+                getAsyncProcessor().process(exchange, c -> {
+                    if (!exchange.isFailed()) {
                         promise.complete();
-                    } catch (Exception e) {
-                        promise.fail(e);
+                    } else {
+                        promise.fail(exchange.getException());
                     }
-                },
-                false,
-                result -> {
+                });
+            },
+            false,
+            result -> {
+                try {
                     if (result.succeeded()) {
                         try {
                             HttpServerResponse response = toHttpResponse(request, exchange.getMessage());
@@ -212,22 +216,12 @@ public class KnativeHttpConsumer extends DefaultConsumer {
                     } else if (result.failed()) {
                         getExceptionHandler().handleException(result.cause());
 
-                        request.response().setStatusCode(500);
-                        request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain");
-                        request.response().end(result.cause().getMessage());
+                        routingContext.fail(result.cause());
                     }
-
+                } finally {
                     doneUoW(exchange);
-                });
-        } catch (Exception e) {
-            getExceptionHandler().handleException(e);
-
-            request.response().setStatusCode(500);
-            request.response().putHeader(Exchange.CONTENT_TYPE, "text/plain");
-            request.response().end(e.getMessage());
-
-            doneUoW(exchange);
-        }
+                }
+            });
     }
 
     private Message toMessage(HttpServerRequest request, Exchange exchange) {