You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/03/12 19:46:22 UTC
svn commit: r1080971 - in /camel/trunk:
components/camel-jetty/src/main/java/org/apache/camel/component/jetty/
components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/
tests/camel-itest/src/test/java/org/apache/camel/itest/async/
Author: davsclaus
Date: Sat Mar 12 18:46:21 2011
New Revision: 1080971
URL: http://svn.apache.org/viewvc?rev=1080971&view=rev
Log:
CAMEL-3768: Fixed issue with Jetty continuation resuming already expired requests from async callback.
Added:
camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java
- copied, changed from r1080885, camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncContinuationTimeoutTest.java
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=1080971&r1=1080970&r2=1080971&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java (original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java Sat Mar 12 18:46:21 2011
@@ -17,6 +17,8 @@
package org.apache.camel.component.jetty;
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -45,12 +47,14 @@ public class CamelContinuationServlet ex
private static final long serialVersionUID = 1L;
// jetty will by default use 30000 millis as default timeout
private Long continuationTimeout;
+ // we must remember expired exchanges as Jetty will initiate a new continuation when we send
+ // back the error when timeout occurred, and thus in the async callback we cannot check the
+ // continuation if it was previously expired. So that's why we have our own map for that
+ private final Map<String, String> expiredExchanges = new ConcurrentHashMap<String, String>();
@Override
- protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
- if (log.isTraceEnabled()) {
- log.trace("Service: " + request);
- }
+ protected void service(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
+ log.trace("Service: {}", request);
// is there a consumer registered for the request.
HttpConsumer consumer = resolve(request);
@@ -63,7 +67,8 @@ public class CamelContinuationServlet ex
if (result == null) {
// no asynchronous result so leverage continuation
final Continuation continuation = ContinuationSupport.getContinuation(request);
- if (continuationTimeout != null) {
+ if (continuation.isInitial() && continuationTimeout != null) {
+ // set timeout on initial
continuation.setTimeout(continuationTimeout);
}
@@ -75,7 +80,9 @@ public class CamelContinuationServlet ex
if (continuation.isExpired()) {
String id = (String) continuation.getAttribute(EXCHANGE_ATTRIBUTE_ID);
- log.warn("Continuation expired of exchangeId: " + id);
+ // remember this id as expired
+ expiredExchanges.put(id, id);
+ log.warn("Continuation expired of exchangeId: {}", id);
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
@@ -92,25 +99,25 @@ public class CamelContinuationServlet ex
HttpHelper.setCharsetFromContentType(request.getContentType(), exchange);
exchange.setIn(new HttpMessage(exchange, request, response));
- if (log.isTraceEnabled()) {
- log.trace("Suspending continuation of exchangeId: " + exchange.getExchangeId());
- }
+ log.trace("Suspending continuation of exchangeId: {}", exchange.getExchangeId());
continuation.setAttribute(EXCHANGE_ATTRIBUTE_ID, exchange.getExchangeId());
// must suspend before we process the exchange
continuation.suspend();
- if (log.isTraceEnabled()) {
- log.trace("Processing request for exchangeId: " + exchange.getExchangeId());
- }
+ log.trace("Processing request for exchangeId: {}", exchange.getExchangeId());
// use the asynchronous API to process the exchange
consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
- if (log.isTraceEnabled()) {
- log.trace("Resuming continuation of exchangeId: " + exchange.getExchangeId());
+ // check if the exchange id is already expired
+ boolean expired = expiredExchanges.remove(exchange.getExchangeId()) != null;
+ if (!expired) {
+ log.trace("Resuming continuation of exchangeId: {}", exchange.getExchangeId());
+ // resume processing after both, sync and async callbacks
+ continuation.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
+ continuation.resume();
+ } else {
+ log.warn("Cannot resume expired continuation of exchangeId: {}", exchange.getExchangeId());
}
- // resume processing after both, sync and async callbacks
- continuation.setAttribute(EXCHANGE_ATTRIBUTE_NAME, exchange);
- continuation.resume();
}
});
@@ -120,9 +127,7 @@ public class CamelContinuationServlet ex
}
try {
- if (log.isTraceEnabled()) {
- log.trace("Resumed continuation and writing response for exchangeId: " + result.getExchangeId());
- }
+ log.trace("Resumed continuation and writing response for exchangeId: {}", result.getExchangeId());
// now lets output to the response
consumer.getBinding().writeResponse(result, response);
} catch (IOException e) {
@@ -141,4 +146,11 @@ public class CamelContinuationServlet ex
public void setContinuationTimeout(Long continuationTimeout) {
this.continuationTimeout = continuationTimeout;
}
+
+ @Override
+ public void destroy() {
+ expiredExchanges.clear();
+ super.destroy();
+ }
+
}
Copied: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java (from r1080885, camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncContinuationTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncContinuationTimeoutTest.java&r1=1080885&r2=1080971&rev=1080971&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncContinuationTimeoutTest.java (original)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/async/JettyAsyncDefaultContinuationTimeoutTest.java Sat Mar 12 18:46:21 2011
@@ -16,17 +16,21 @@
*/
package org.apache.camel.component.jetty.async;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.CamelExecutionException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.http.HttpOperationFailedException;
import org.apache.camel.component.jetty.BaseJettyTest;
import org.apache.camel.util.StopWatch;
+import org.junit.Ignore;
import org.junit.Test;
/**
* @version
*/
-public class JettyAsyncContinuationTimeoutTest extends BaseJettyTest {
+@Ignore("This test takes a long time to run, so run it manually")
+public class JettyAsyncDefaultContinuationTimeoutTest extends BaseJettyTest {
@Test
public void testJettyAsyncTimeout() throws Exception {
@@ -44,11 +48,11 @@ public class JettyAsyncContinuationTimeo
HttpOperationFailedException cause = assertIsInstanceOf(HttpOperationFailedException.class, e.getCause());
assertEquals(503, cause.getStatusCode());
- // should be approx 3-4 sec.
- assertTrue("Timeout should occur faster than " + taken, taken < 4500);
+ // should be approx 30-34 sec.
+ assertTrue("Timeout should occur faster than " + taken, taken < 34000);
}
- assertMockEndpointsSatisfied();
+ assertMockEndpointsSatisfied(2, TimeUnit.MINUTES);
}
@Override
@@ -58,8 +62,8 @@ public class JettyAsyncContinuationTimeo
public void configure() throws Exception {
context.addComponent("async", new MyAsyncComponent());
- from("jetty:http://localhost:{{port}}/myservice?continuationTimeout=3000")
- .to("async:Bye World?delay=6000")
+ from("jetty:http://localhost:{{port}}/myservice")
+ .to("async:Bye World?delay=45s")
.to("mock:result");
}
};
Added: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java?rev=1080971&view=auto
==============================================================================
--- camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java (added)
+++ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/async/HttpJmsAsyncTimeoutTest.java Sat Mar 12 18:46:21 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.async;
+
+import javax.naming.Context;
+
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.http.HttpOperationFailedException;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.jndi.JndiContext;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class HttpJmsAsyncTimeoutTest extends CamelTestSupport {
+
+ @Test
+ public void testHttpJmsAsync() throws Exception {
+ try {
+ template.requestBody("http://0.0.0.0:9080/myservice", "Hello World", String.class);
+ fail("Should have thrown exception");
+ } catch (CamelExecutionException e) {
+ HttpOperationFailedException cause = assertIsInstanceOf(HttpOperationFailedException.class, e.getCause());
+ assertEquals(503, cause.getStatusCode());
+ }
+ }
+
+ @Override
+ protected Context createJndiContext() throws Exception {
+ JndiContext answer = new JndiContext();
+
+ // add ActiveMQ with embedded broker
+ ActiveMQComponent amq = ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+ amq.setCamelContext(context);
+ answer.bind("jms", amq);
+ return answer;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // a lot of timeouts in the play :)
+
+ // jetty will timeout after 2 seconds
+ from("jetty:http://0.0.0.0:9080/myservice?continuationTimeout=2000")
+ // jms request/reply will timeout after 5 seconds
+ .to("jms:queue:foo?requestTimeout=5000");
+
+ from("jms:queue:foo")
+ // and this one is slow and will reply after 10 seconds
+ .delayer(10000)
+ .transform(constant("Bye World"));
+ }
+ };
+ }
+}