You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/03 14:36:00 UTC

svn commit: r1054613 - /camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java

Author: davsclaus
Date: Mon Jan  3 13:36:00 2011
New Revision: 1054613

URL: http://svn.apache.org/viewvc?rev=1054613&view=rev
Log:
CAMEL-3479: Fixed CamelContinuationServlet may throw NPE under heavy load. Refactored logic to be similar to example from Jetty Continuation guide at their website. Also added check for continuation expired.

Modified:
    camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java

Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=1054613&r1=1054612&r2=1054613&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java (original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java Mon Jan  3 13:36:00 2011
@@ -27,32 +27,34 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.http.CamelServlet;
 import org.apache.camel.component.http.HttpConsumer;
 import org.apache.camel.component.http.HttpMessage;
-import org.apache.camel.component.http.helper.HttpHelper;
 import org.apache.camel.impl.DefaultExchange;
 import org.eclipse.jetty.continuation.Continuation;
 import org.eclipse.jetty.continuation.ContinuationSupport;
 
 /**
- * Currently not in use.
+ * Servlet which leverage <a href="http://wiki.eclipse.org/Jetty/Feature/Continuations">Jetty Continuations</a>.
  *
  * @version $Revision$
  */
 public class CamelContinuationServlet extends CamelServlet {
 
     static final String EXCHANGE_ATTRIBUTE_NAME = "CamelExchange";
+    static final String EXCHANGE_ATTRIBUTE_ID = "CamelExchangeId";
 
     private static final long serialVersionUID = 1L;
 
     @Override
     protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
-        try {
-            // Is there a consumer registered for the request.
-            HttpConsumer consumer = resolve(request);
-            if (consumer == null) {
-                response.sendError(HttpServletResponse.SC_NOT_FOUND);
-                return;
-            }
+        // is there a consumer registered for the request.
+        HttpConsumer consumer = resolve(request);
+        if (consumer == null) {
+            response.sendError(HttpServletResponse.SC_NOT_FOUND);
+            return;
+        }
 
+        final Exchange result = (Exchange) request.getAttribute(EXCHANGE_ATTRIBUTE_NAME);
+        if (result == null) {
+            // no asynchronous result so leverage continuation
             final Continuation continuation = ContinuationSupport.getContinuation(request);
 
             // are we suspended and a request is dispatched initially?
@@ -61,56 +63,56 @@ public class CamelContinuationServlet ex
                 return;
             }
 
-            if (continuation.isInitial()) {
-                // a new request so create an exchange
-                final Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
-                if (consumer.getEndpoint().isBridgeEndpoint()) {
-                    exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
-                }
-                if (consumer.getEndpoint().isDisableStreamCache()) {
-                    exchange.setProperty(Exchange.DISABLE_HTTP_STREAM_CACHE, Boolean.TRUE);
-                }
-                HttpHelper.setCharsetFromContentType(request.getContentType(), exchange);
-                exchange.setIn(new HttpMessage(exchange, request, response));
-
-                if (log.isTraceEnabled()) {
-                    log.trace("Suspending continuation of exchangeId: " + exchange.getExchangeId());
-                }
-                continuation.suspend();
-
-                // use the asynchronous API to process the exchange
-                consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        if (log.isTraceEnabled()) {
-                            log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId());
-                        }
-                        // resume processing after both, sync and async callbacks
-                        continuation.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
-                        continuation.resume();
-                    }
-                });
+            if (continuation.isExpired()) {
+                String id = (String) continuation.getAttribute(EXCHANGE_ATTRIBUTE_ID);
+                log.warn("Continuation expired of exchangeId: " + id);
+                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
                 return;
             }
 
-            if (continuation.isResumed()) {
-                // a re-dispatched request containing the processing result
-                Exchange exchange = (Exchange) continuation.getAttribute(EXCHANGE_ATTRIBUTE_NAME);
-                if (log.isTraceEnabled()) {
-                    log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId());
-                }
+            // a new request so create an exchange
+            final Exchange exchange = new DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
+            if (consumer.getEndpoint().isBridgeEndpoint()) {
+                exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
+            }
+            if (consumer.getEndpoint().isDisableStreamCache()) {
+                exchange.setProperty(Exchange.DISABLE_HTTP_STREAM_CACHE, Boolean.TRUE);
+            }
+            exchange.setIn(new HttpMessage(exchange, request, response));
 
-                // now lets output to the response
-                if (log.isTraceEnabled()) {
-                    log.trace("Writing response of exchangeId: " + exchange.getExchangeId());
+            if (log.isTraceEnabled()) {
+                log.trace("Suspending continuation of exchangeId: " + exchange.getExchangeId());
+            }
+            continuation.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId());
+            // must suspend before we process the exchange
+            continuation.suspend();
+
+            // use the asynchronous API to process the exchange
+            consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId());
+                    }
+                    // resume processing after both, sync and async callbacks
+                    continuation.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
+                    continuation.resume();
                 }
-                consumer.getBinding().writeResponse(exchange, response);
+            });
+
+            // return to let Jetty continuation to work as it will resubmit and invoke the service
+            // method again when its resumed
+            return;
+        }
+
+        try {
+            if (log.isTraceEnabled()) {
+                log.trace("Resuming continuation of exchangeId: " + result.getExchangeId());
             }
+            // now lets output to the response
+            consumer.getBinding().writeResponse(result, response);
         } catch (IOException e) {
             log.error("Error processing request", e);
             throw e;
-        } catch (Exception e) {
-            log.error("Error processing request", e);
-            throw new ServletException(e);
         }
     }