You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/12 19:46:22 UTC

svn commit: r1080971 - in /camel/trunk: components/camel-jetty/src/main/java/org/apache/camel/component/jetty/ components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/ tests/camel-itest/src/test/java/org/apache/camel/itest/async/

Author: davsclaus
Date: Sat Mar 12 18:46:21 2011
New Revision: 1080971

URL: http://svn.apache.org/viewvc?rev=1080971&view=rev
Log:
CAMEL-3768: Fixed issue with Jetty continuation resuming already expired requests from async callback.

Added:
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java
      - copied, changed from r1080885, camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncContinuationTimeoutTest.java
    camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java
Modified:
    camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java

Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=1080971&r1=1080970&r2=1080971&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java (original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java Sat Mar 12 18:46:21 2011
@@ -17,6 +17,8 @@
 package org.apache.camel.component.jetty;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -45,12 +47,14 @@ public class CamelContinuationServlet ex
     private static final long serialVersionUID = 1L;
     // jetty will by default use 30000 millis as default timeout
     private Long continuationTimeout;
+    // we must remember expired exchanges as Jetty will initiate a new continuation when we send
+    // back the error when timeout occurred, and thus in the async callback we cannot check the
+    // continuation if it was previously expired. So that's why we have our own map for that
+    private final Map<String, String> expiredExchanges = new ConcurrentHashMap<String, String>();
 
     @Override
-    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
-        if (log.isTraceEnabled()) {
-            log.trace("Service: " + request);
-        }
+    protected void service(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
+        log.trace("Service: {}", request);
 
         // is there a consumer registered for the request.
         HttpConsumer consumer = resolve(request);
@@ -63,7 +67,8 @@ public class CamelContinuationServlet ex
         if (result == null) {
             // no asynchronous result so leverage continuation
             final Continuation continuation = ContinuationSupport.getContinuation(request);
-            if (continuationTimeout != null) {
+            if (continuation.isInitial() && continuationTimeout != null) {
+                // set timeout on initial
                 continuation.setTimeout(continuationTimeout);
             }
 
@@ -75,7 +80,9 @@ public class CamelContinuationServlet ex
 
             if (continuation.isExpired()) {
                 String id = (String) continuation.getAttribute(EXCHANGE_ATTRIBUTE_ID);
-                log.warn("Continuation expired of exchangeId: " + id);
+                // remember this id as expired
+                expiredExchanges.put(id, id);
+                log.warn("Continuation expired of exchangeId: {}", id);
                 response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
                 return;
             }
@@ -92,25 +99,25 @@ public class CamelContinuationServlet ex
             HttpHelper.setCharsetFromContentType(request.getContentType(), exchange);
             exchange.setIn(new HttpMessage(exchange, request, response));
 
-            if (log.isTraceEnabled()) {
-                log.trace("Suspending continuation of exchangeId: " + exchange.getExchangeId());
-            }
+            log.trace("Suspending continuation of exchangeId: {}", exchange.getExchangeId());
             continuation.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId());
             // must suspend before we process the exchange
             continuation.suspend();
 
-            if (log.isTraceEnabled()) {
-                log.trace("Processing request for exchangeId: " + exchange.getExchangeId());
-            }
+            log.trace("Processing request for exchangeId: {}", exchange.getExchangeId());
             // use the asynchronous API to process the exchange
             consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
                 public void done(boolean doneSync) {
-                    if (log.isTraceEnabled()) {
-                        log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId());
+                    // check if the exchange id is already expired
+                    boolean expired = expiredExchanges.remove(exchange.getExchangeId()) != null;
+                    if (!expired) {
+                        log.trace("Resuming continuation of exchangeId: {}", exchange.getExchangeId());
+                        // resume processing after both, sync and async callbacks
+                        continuation.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
+                        continuation.resume();
+                    } else {
+                        log.warn("Cannot resume expired continuation of exchangeId: {}", exchange.getExchangeId());
                     }
-                    // resume processing after both, sync and async callbacks
-                    continuation.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
-                    continuation.resume();
                 }
             });
 
@@ -120,9 +127,7 @@ public class CamelContinuationServlet ex
         }
 
         try {
-            if (log.isTraceEnabled()) {
-                log.trace("Resumed continuation and writing response for exchangeId: " + result.getExchangeId());
-            }
+            log.trace("Resumed continuation and writing response for exchangeId: {}", result.getExchangeId());
             // now lets output to the response
             consumer.getBinding().writeResponse(result, response);
         } catch (IOException e) {
@@ -141,4 +146,11 @@ public class CamelContinuationServlet ex
     public void setContinuationTimeout(Long continuationTimeout) {
         this.continuationTimeout = continuationTimeout;
     }
+
+    @Override
+    public void destroy() {
+        expiredExchanges.clear();
+        super.destroy();
+    }
+
 }

Copied: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java (from r1080885, camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncContinuationTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncContinuationTimeoutTest.java&r1=1080885&r2=1080971&rev=1080971&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncContinuationTimeoutTest.java (original)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java Sat Mar 12 18:46:21 2011
@@ -16,17 +16,21 @@
  */
 package org.apache.camel.component.jetty.async;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.http.HttpOperationFailedException;
 import org.apache.camel.component.jetty.BaseJettyTest;
 import org.apache.camel.util.StopWatch;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
  * @version 
  */
-public class JettyAsyncContinuationTimeoutTest extends BaseJettyTest {
+@Ignore("This test takes a long time to run, so run it manually")
+public class JettyAsyncDefaultContinuationTimeoutTest extends BaseJettyTest {
 
     @Test
     public void testJettyAsyncTimeout() throws Exception {
@@ -44,11 +48,11 @@ public class JettyAsyncContinuationTimeo
             HttpOperationFailedException cause = assertIsInstanceOf(HttpOperationFailedException.class, e.getCause());
             assertEquals(503, cause.getStatusCode());
 
-            // should be approx 3-4 sec.
-            assertTrue("Timeout should occur faster than " + taken, taken < 4500);
+            // should be approx 30-34 sec.
+            assertTrue("Timeout should occur faster than " + taken, taken < 34000);
         }
 
-        assertMockEndpointsSatisfied();
+        assertMockEndpointsSatisfied(2, TimeUnit.MINUTES);
     }
 
     @Override
@@ -58,8 +62,8 @@ public class JettyAsyncContinuationTimeo
             public void configure() throws Exception {
                 context.addComponent("async", new MyAsyncComponent());
 
-                from("jetty:http://localhost:{{port}}/myservice?continuationTimeout=3000")
-                    .to("async:Bye World?delay=6000")
+                from("jetty:http://localhost:{{port}}/myservice")
+                    .to("async:Bye World?delay=45s")
                     .to("mock:result");
             }
         };

Added: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java?rev=1080971&view=auto
==============================================================================
--- camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java (added)
+++ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java Sat Mar 12 18:46:21 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.async;
+
+import javax.naming.Context;
+
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.http.HttpOperationFailedException;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.jndi.JndiContext;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class HttpJmsAsyncTimeoutTest extends CamelTestSupport {
+
+    @Test
+    public void testHttpJmsAsync() throws Exception {
+        try {
+            template.requestBody("http://0.0.0.0:9080/myservice", "Hello World", String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            HttpOperationFailedException cause = assertIsInstanceOf(HttpOperationFailedException.class, e.getCause());
+            assertEquals(503, cause.getStatusCode());
+        }
+    }
+
+    @Override
+    protected Context createJndiContext() throws Exception {
+        JndiContext answer = new JndiContext();
+
+        // add ActiveMQ with embedded broker
+        ActiveMQComponent amq = ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+        amq.setCamelContext(context);
+        answer.bind("jms", amq);
+        return answer;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // a lot of timeouts in the play :)
+
+                // jetty will timeout after 2 seconds
+                from("jetty:http://0.0.0.0:9080/myservice?continuationTimeout=2000")
+                    // jms request/reply will timeout after 5 seconds
+                    .to("jms:queue:foo?requestTimeout=5000");
+
+                from("jms:queue:foo")
+                    // and this one is slow and will reply after 10 seconds
+                    .delayer(10000)
+                    .transform(constant("Bye World"));
+            }
+        };
+    }
+}