You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 06:00:03 UTC

[camel] branch master updated: CAMEL-16366: camel-jetty - Jetty consumer supports exchange pooling

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c0a070e  CAMEL-16366: camel-jetty - Jetty consumer supports exchange pooling
c0a070e is described below

commit c0a070e8bd0810d7b2713991ddb7263b38dbcad0
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 07:59:28 2021 +0200

    CAMEL-16366: camel-jetty - Jetty consumer supports exchange pooling
---
 .../org/apache/camel/http/common/HttpMessage.java  | 38 +++++++++++++++-------
 .../component/jetty/CamelContinuationServlet.java  | 33 +++++++++++++++----
 2 files changed, 53 insertions(+), 18 deletions(-)

diff --git a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java
index bd9d848..f0a21ba 100644
--- a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java
+++ b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java
@@ -28,14 +28,31 @@ import org.apache.camel.util.ObjectHelper;
 
 public class HttpMessage extends DefaultMessage {
 
-    private final HttpServletRequest request;
-    private final HttpServletResponse response;
-    private final HttpCommonEndpoint endpoint;
+    private HttpServletRequest request;
+    private HttpServletResponse response;
+    private HttpCommonEndpoint endpoint;
     private boolean requestRead;
 
     public HttpMessage(Exchange exchange, HttpCommonEndpoint endpoint, HttpServletRequest request,
                        HttpServletResponse response) {
         super(exchange);
+        init(exchange, endpoint, request, response);
+    }
+
+    private HttpMessage(HttpServletRequest request, HttpServletResponse response, Exchange exchange,
+                        HttpCommonEndpoint endpoint,
+                        boolean requestRead) {
+        super(exchange);
+        this.request = request;
+        this.response = response;
+        this.endpoint = endpoint;
+        this.requestRead = requestRead;
+    }
+
+    public void init(
+            Exchange exchange, HttpCommonEndpoint endpoint, HttpServletRequest request,
+            HttpServletResponse response) {
+        setExchange(exchange);
         this.requestRead = false;
         this.endpoint = endpoint;
 
@@ -56,14 +73,13 @@ public class HttpMessage extends DefaultMessage {
         endpoint.getHttpBinding().readRequest(request, this);
     }
 
-    private HttpMessage(HttpServletRequest request, HttpServletResponse response, Exchange exchange,
-                        HttpCommonEndpoint endpoint,
-                        boolean requestRead) {
-        super(exchange);
-        this.request = request;
-        this.response = response;
-        this.endpoint = endpoint;
-        this.requestRead = requestRead;
+    @Override
+    public void reset() {
+        super.reset();
+        request = null;
+        response = null;
+        endpoint = null;
+        requestRead = false;
     }
 
     public HttpServletRequest getRequest() {
diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
index 65db856..a1b9a4a 100644
--- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
+++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
@@ -30,12 +30,15 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
 import org.apache.camel.http.common.CamelServlet;
 import org.apache.camel.http.common.HttpCommonEndpoint;
 import org.apache.camel.http.common.HttpConstants;
 import org.apache.camel.http.common.HttpConsumer;
 import org.apache.camel.http.common.HttpHelper;
 import org.apache.camel.http.common.HttpMessage;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ObjectHelper;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
 import org.eclipse.jetty.continuation.Continuation;
@@ -182,7 +185,7 @@ public class CamelContinuationServlet extends CamelServlet {
 
             // a new request so create an exchange
             // must be prototype scoped (not pooled) so we create the exchange via endpoint
-            final Exchange exchange = endpoint.createExchange();
+            final Exchange exchange = consumer.createExchange(false);
             exchange.setPattern(ExchangePattern.InOut);
 
             if (consumer.getEndpoint().isBridgeEndpoint()) {
@@ -195,7 +198,14 @@ public class CamelContinuationServlet extends CamelServlet {
 
             HttpHelper.setCharsetFromContentType(request.getContentType(), exchange);
 
-            exchange.setIn(new HttpMessage(exchange, consumer.getEndpoint(), request, response));
+            // reuse existing http message if pooled
+            Message msg = exchange.getIn();
+            if (msg instanceof HttpMessage) {
+                HttpMessage hm = (HttpMessage) msg;
+                hm.init(exchange, endpoint, request, response);
+            } else {
+                exchange.setIn(new HttpMessage(exchange, endpoint, request, response));
+            }
             // set context path as header
             String contextPath = consumer.getEndpoint().getPath();
             exchange.getIn().setHeader("CamelServletContextPath", contextPath);
@@ -208,11 +218,18 @@ public class CamelContinuationServlet extends CamelServlet {
             continuation.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId());
 
             // we want to handle the UoW
-            try {
-                consumer.createUoW(exchange);
-            } catch (Exception e) {
-                log.error("Error processing request", e);
-                throw new ServletException(e);
+            UnitOfWork uow = exchange.getUnitOfWork();
+            if (uow == null) {
+                try {
+                    consumer.createUoW(exchange);
+                } catch (Exception e) {
+                    log.error("Error processing request", e);
+                    throw new ServletException(e);
+                }
+            } else if (uow.onPrepare(exchange)) {
+                // need to re-attach uow
+                ExtendedExchange ee = (ExtendedExchange) exchange;
+                ee.setUnitOfWork(uow);
             }
 
             // must suspend before we process the exchange
@@ -238,6 +255,7 @@ public class CamelContinuationServlet extends CamelServlet {
                         continuation.resume();
                     } else {
                         log.warn("Cannot resume expired continuation of exchangeId: {}", exchange.getExchangeId());
+                        consumer.releaseExchange(exchange, false);
                     }
                 }
             });
@@ -270,6 +288,7 @@ public class CamelContinuationServlet extends CamelServlet {
             throw new ServletException(e);
         } finally {
             consumer.doneUoW(result);
+            consumer.releaseExchange(result, false);
         }
     }