You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 06:00:03 UTC
[camel] branch master updated: CAMEL-16366: camel-jetty - Jetty
consumer supports exchange pooling
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new c0a070e CAMEL-16366: camel-jetty - Jetty consumer supports exchange pooling
c0a070e is described below
commit c0a070e8bd0810d7b2713991ddb7263b38dbcad0
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 07:59:28 2021 +0200
CAMEL-16366: camel-jetty - Jetty consumer supports exchange pooling
---
.../org/apache/camel/http/common/HttpMessage.java | 38 +++++++++++++++-------
.../component/jetty/CamelContinuationServlet.java | 33 +++++++++++++++----
2 files changed, 53 insertions(+), 18 deletions(-)
diff --git a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java
index bd9d848..f0a21ba 100644
--- a/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java
+++ b/components/camel-http-common/src/main/java/org/apache/camel/http/common/HttpMessage.java
@@ -28,14 +28,31 @@ import org.apache.camel.util.ObjectHelper;
public class HttpMessage extends DefaultMessage {
- private final HttpServletRequest request;
- private final HttpServletResponse response;
- private final HttpCommonEndpoint endpoint;
+ private HttpServletRequest request;
+ private HttpServletResponse response;
+ private HttpCommonEndpoint endpoint;
private boolean requestRead;
public HttpMessage(Exchange exchange, HttpCommonEndpoint endpoint, HttpServletRequest request,
HttpServletResponse response) {
super(exchange);
+ init(exchange, endpoint, request, response);
+ }
+
+ private HttpMessage(HttpServletRequest request, HttpServletResponse response, Exchange exchange,
+ HttpCommonEndpoint endpoint,
+ boolean requestRead) {
+ super(exchange);
+ this.request = request;
+ this.response = response;
+ this.endpoint = endpoint;
+ this.requestRead = requestRead;
+ }
+
+ public void init(
+ Exchange exchange, HttpCommonEndpoint endpoint, HttpServletRequest request,
+ HttpServletResponse response) {
+ setExchange(exchange);
this.requestRead = false;
this.endpoint = endpoint;
@@ -56,14 +73,13 @@ public class HttpMessage extends DefaultMessage {
endpoint.getHttpBinding().readRequest(request, this);
}
- private HttpMessage(HttpServletRequest request, HttpServletResponse response, Exchange exchange,
- HttpCommonEndpoint endpoint,
- boolean requestRead) {
- super(exchange);
- this.request = request;
- this.response = response;
- this.endpoint = endpoint;
- this.requestRead = requestRead;
+ @Override
+ public void reset() {
+ super.reset();
+ request = null;
+ response = null;
+ endpoint = null;
+ requestRead = false;
}
public HttpServletRequest getRequest() {
diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
index 65db856..a1b9a4a 100644
--- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
+++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
@@ -30,12 +30,15 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
import org.apache.camel.http.common.CamelServlet;
import org.apache.camel.http.common.HttpCommonEndpoint;
import org.apache.camel.http.common.HttpConstants;
import org.apache.camel.http.common.HttpConsumer;
import org.apache.camel.http.common.HttpHelper;
import org.apache.camel.http.common.HttpMessage;
+import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.ObjectHelper;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.eclipse.jetty.continuation.Continuation;
@@ -182,7 +185,7 @@ public class CamelContinuationServlet extends CamelServlet {
// a new request so create an exchange
// must be prototype scoped (not pooled) so we create the exchange via endpoint
- final Exchange exchange = endpoint.createExchange();
+ final Exchange exchange = consumer.createExchange(false);
exchange.setPattern(ExchangePattern.InOut);
if (consumer.getEndpoint().isBridgeEndpoint()) {
@@ -195,7 +198,14 @@ public class CamelContinuationServlet extends CamelServlet {
HttpHelper.setCharsetFromContentType(request.getContentType(), exchange);
- exchange.setIn(new HttpMessage(exchange, consumer.getEndpoint(), request, response));
+ // reuse existing http message if pooled
+ Message msg = exchange.getIn();
+ if (msg instanceof HttpMessage) {
+ HttpMessage hm = (HttpMessage) msg;
+ hm.init(exchange, endpoint, request, response);
+ } else {
+ exchange.setIn(new HttpMessage(exchange, endpoint, request, response));
+ }
// set context path as header
String contextPath = consumer.getEndpoint().getPath();
exchange.getIn().setHeader("CamelServletContextPath", contextPath);
@@ -208,11 +218,18 @@ public class CamelContinuationServlet extends CamelServlet {
continuation.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId());
// we want to handle the UoW
- try {
- consumer.createUoW(exchange);
- } catch (Exception e) {
- log.error("Error processing request", e);
- throw new ServletException(e);
+ UnitOfWork uow = exchange.getUnitOfWork();
+ if (uow == null) {
+ try {
+ consumer.createUoW(exchange);
+ } catch (Exception e) {
+ log.error("Error processing request", e);
+ throw new ServletException(e);
+ }
+ } else if (uow.onPrepare(exchange)) {
+ // need to re-attach uow
+ ExtendedExchange ee = (ExtendedExchange) exchange;
+ ee.setUnitOfWork(uow);
}
// must suspend before we process the exchange
@@ -238,6 +255,7 @@ public class CamelContinuationServlet extends CamelServlet {
continuation.resume();
} else {
log.warn("Cannot resume expired continuation of exchangeId: {}", exchange.getExchangeId());
+ consumer.releaseExchange(exchange, false);
}
}
});
@@ -270,6 +288,7 @@ public class CamelContinuationServlet extends CamelServlet {
throw new ServletException(e);
} finally {
consumer.doneUoW(result);
+ consumer.releaseExchange(result, false);
}
}