You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/03 14:36:00 UTC
svn commit: r1054613 -
/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
Author: davsclaus
Date: Mon Jan 3 13:36:00 2011
New Revision: 1054613
URL: http://svn.apache.org/viewvc?rev=1054613&view=rev
Log:
CAMEL-3479: Fixed CamelContinuationServlet may throw NPE under heavy load. Refactored logic to be similar to example from Jetty Continuation guide at their website. Also added check for continuation expired.
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=1054613&r1=1054612&r2=1054613&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java (original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java Mon Jan 3 13:36:00 2011
@@ -27,32 +27,34 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.component.http.CamelServlet;
import org.apache.camel.component.http.HttpConsumer;
import org.apache.camel.component.http.HttpMessage;
-import org.apache.camel.component.http.helper.HttpHelper;
import org.apache.camel.impl.DefaultExchange;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;
/**
- * Currently not in use.
+ * Servlet which leverage <a href="http://wiki.eclipse.org/Jetty/Feature/Continuations">Jetty Continuations</a>.
*
* @version $Revision$
*/
public class CamelContinuationServlet extends CamelServlet {
static final String EXCHANGE_ATTRIBUTE_NAME = "CamelExchange";
+ static final String EXCHANGE_ATTRIBUTE_ID = "CamelExchangeId";
private static final long serialVersionUID = 1L;
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
- try {
- // Is there a consumer registered for the request.
- HttpConsumer consumer = resolve(request);
- if (consumer == null) {
- response.sendError(HttpServletResponse.SC_NOT_FOUND);
- return;
- }
+ // is there a consumer registered for the request.
+ HttpConsumer consumer = resolve(request);
+ if (consumer == null) {
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
+ return;
+ }
+ final Exchange result = (Exchange) request.getAttribute(EXCHANGE_ATTRIBUTE_NAME);
+ if (result == null) {
+ // no asynchronous result so leverage continuation
final Continuation continuation = ContinuationSupport.getContinuation(request);
// are we suspended and a request is dispatched initially?
@@ -61,56 +63,56 @@ public class CamelContinuationServlet ex
return;
}
- if (continuation.isInitial()) {
- // a new request so create an exchange
- final Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
- if (consumer.getEndpoint().isBridgeEndpoint()) {
- exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
- }
- if (consumer.getEndpoint().isDisableStreamCache()) {
- exchange.setProperty(Exchange.DISABLE_HTTP_STREAM_CACHE, Boolean.TRUE);
- }
- HttpHelper.setCharsetFromContentType(request.getContentType(), exchange);
- exchange.setIn(new HttpMessage(exchange, request, response));
-
- if (log.isTraceEnabled()) {
- log.trace("Suspending continuation of exchangeId: " + exchange.getExchangeId());
- }
- continuation.suspend();
-
- // use the asynchronous API to process the exchange
- consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
- public void done(boolean doneSync) {
- if (log.isTraceEnabled()) {
- log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId());
- }
- // resume processing after both, sync and async callbacks
- continuation.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
- continuation.resume();
- }
- });
+ if (continuation.isExpired()) {
+ String id = (String) continuation.getAttribute(EXCHANGE_ATTRIBUTE_ID);
+ log.warn("Continuation expired of exchangeId: " + id);
+ response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
- if (continuation.isResumed()) {
- // a re-dispatched request containing the processing result
- Exchange exchange = (Exchange) continuation.getAttribute(EXCHANGE_ATTRIBUTE_NAME);
- if (log.isTraceEnabled()) {
- log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId());
- }
+ // a new request so create an exchange
+ final Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
+ if (consumer.getEndpoint().isBridgeEndpoint()) {
+ exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
+ }
+ if (consumer.getEndpoint().isDisableStreamCache()) {
+ exchange.setProperty(Exchange.DISABLE_HTTP_STREAM_CACHE, Boolean.TRUE);
+ }
+ exchange.setIn(new HttpMessage(exchange, request, response));
- // now lets output to the response
- if (log.isTraceEnabled()) {
- log.trace("Writing response of exchangeId: " + exchange.getExchangeId());
+ if (log.isTraceEnabled()) {
+ log.trace("Suspending continuation of exchangeId: " + exchange.getExchangeId());
+ }
+ continuation.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId());
+ // must suspend before we process the exchange
+ continuation.suspend();
+
+ // use the asynchronous API to process the exchange
+ consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ if (log.isTraceEnabled()) {
+ log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId());
+ }
+ // resume processing after both, sync and async callbacks
+ continuation.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
+ continuation.resume();
}
- consumer.getBinding().writeResponse(exchange, response);
+ });
+
+ // return to let Jetty continuation to work as it will resubmit and invoke the service
+ // method again when its resumed
+ return;
+ }
+
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("Resuming continuation of exchangeId: " + result.getExchangeId());
}
+ // now lets output to the response
+ consumer.getBinding().writeResponse(result, response);
} catch (IOException e) {
log.error("Error processing request", e);
throw e;
- } catch (Exception e) {
- log.error("Error processing request", e);
- throw new ServletException(e);
}
}