You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/08/29 14:08:54 UTC
svn commit: r690200 - in
/servicemix/components/bindings/servicemix-http/trunk/src:
main/java/org/apache/servicemix/http/endpoints/
main/java/org/apache/servicemix/http/processors/
test/java/org/apache/servicemix/http/
Author: gnodet
Date: Fri Aug 29 05:08:53 2008
New Revision: 690200
URL: http://svn.apache.org/viewvc?rev=690200&view=rev
Log:
SM-1179: fix the exchange not found which lead to the http component being blocked
Modified:
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java
Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=690200&r1=690199&r2=690200&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Fri Aug 29 05:08:53 2008
@@ -219,15 +219,25 @@
}
public void process(MessageExchange exchange) throws Exception {
- Continuation cont = locks.remove(exchange.getExchangeId());
+ // Receive the exchange response
+ // First, check if the continuation has not been removed from the map,
+ // which would mean it has timed out. If this is the case, throw an exception
+ // that will set the exchange status to ERROR.
+ Continuation cont = locks.get(exchange.getExchangeId());
if (cont == null) {
throw new Exception("HTTP request has timed out");
}
+ // synchronized block
synchronized (cont) {
+ if (locks.remove(exchange.getExchangeId()) == null) {
+ throw new Exception("HTTP request has timed out");
+ }
if (logger.isDebugEnabled()) {
logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
}
+ // Put the new exchange
exchanges.put(exchange.getExchangeId(), exchange);
+ // Resume continuation
cont.resume();
}
}
@@ -247,42 +257,71 @@
Continuation cont = ContinuationSupport.getContinuation(request, null);
// If the continuation is not a retry
if (!cont.isPending()) {
+ // Create the exchange
exchange = createExchange(request);
- locks.put(exchange.getExchangeId(), cont);
+ // Put the exchange in a map so that we can later retrieve it
+ // We don't put the exchange on the request directly in case the JMS flow is involved
+ // because the exchange coming back may not be the same object as the one send.
+ exchanges.put(exchange.getExchangeId(), exchange);
+ // Put the exchange id on the request to be able to retrieve the exchange later
request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
+ // Put the continuation in a map under the exchange id key
+ locks.put(exchange.getExchangeId(), cont);
synchronized (cont) {
+ // Send the exchange
send(exchange);
if (logger.isDebugEnabled()) {
logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
}
+ // Suspend the continuation for the configured timeout
+ // If a SelectConnector is used, the call to suspend will throw a RetryRequest exception
+ // else, the call will block until the continuation is resumed
long to = this.timeout;
if (to == 0) {
- to = ((HttpComponent)getServiceUnit().getComponent()).getConfiguration()
- .getConsumerProcessorSuspendTime();
+ to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
}
- exchanges.put(exchange.getExchangeId(), exchange);
boolean result = cont.suspend(to);
+ // The call has not thrown a RetryRequest, which means we don't use a SelectConnector
+ // and we must handle the exchange in this very method call.
+ // If result is false, the continuation has timed out.
+ // So get the exchange (in case the object has changed) and remove it from the map
exchange = exchanges.remove(exchange.getExchangeId());
+ // remove the exchange id from the request as we don't need it anymore
+ request.removeAttribute(MessageExchange.class.getName());
+ // If a timeout occurred, throw an exception that will be sent back to the HTTP client
+ // Whenever the exchange comes back, the process(MessageExchange) method will thrown an
+ // exception and the exchange will be set in an ERROR status
if (!result) {
+ // Remove the continuation from the map.
+ // This indicates the continuation has been fully processed
locks.remove(exchange.getExchangeId());
throw new Exception("Exchange timed out");
}
- request.removeAttribute(MessageExchange.class.getName());
}
+ // The continuation is a retry.
+ // This happens when the SelectConnector is used and in two cases:
+ // * the continuation has been resumed because the exchange has been received
+ // * the continuation has timed out
} else {
- String id = (String)request.getAttribute(MessageExchange.class.getName());
- locks.remove(id);
- exchange = exchanges.remove(id);
- request.removeAttribute(MessageExchange.class.getName());
- boolean result = cont.suspend(0);
- // Check if this is a timeout
- if (exchange == null) {
- throw new IllegalStateException("Exchange not found");
- }
- if (!result) {
- throw new Exception("Timeout");
+ synchronized (cont) {
+ // Get the exchange id from the request
+ String id = (String) request.getAttribute(MessageExchange.class.getName());
+ // Remove the continuation from the map, indicating it has been processed or timed out
+ locks.remove(id);
+ exchange = exchanges.remove(id);
+ request.removeAttribute(MessageExchange.class.getName());
+ // Check if this is a timeout
+ if (!cont.isResumed()) {
+ throw new Exception("Exchange timed out");
+ }
+ // This should never happen, but we never knows
+ if (exchange == null) {
+ throw new IllegalStateException("Exchange not found");
+ }
}
}
+ // At this point, we have received the exchange response,
+ // so process it and send back the HTTP response
if (exchange.getStatus() == ExchangeStatus.ERROR) {
Exception e = exchange.getError();
if (e == null) {
@@ -300,11 +339,9 @@
sendOut(exchange, outMsg, request, response);
}
}
- exchange.setStatus(ExchangeStatus.DONE);
- send(exchange);
+ done(exchange);
} catch (Exception e) {
- exchange.setError(e);
- send(exchange);
+ fail(exchange, e);
throw e;
}
} else if (exchange.getStatus() == ExchangeStatus.DONE) {
Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=690200&r1=690199&r2=690200&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Fri Aug 29 05:08:53 2008
@@ -90,17 +90,19 @@
}
public void process(MessageExchange exchange) throws Exception {
- Continuation cont = locks.remove(exchange.getExchangeId());
- if (cont != null) {
- synchronized (cont) {
- if (log.isDebugEnabled()) {
- log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
- }
- exchanges.put(exchange.getExchangeId(), exchange);
- cont.resume();
+ Continuation cont = locks.get(exchange.getExchangeId());
+ if (cont == null) {
+ throw new Exception("HTTP request has timed out");
+ }
+ synchronized (cont) {
+ if (locks.remove(exchange.getExchangeId()) == null) {
+ throw new Exception("HTTP request has timed out");
}
- } else {
- throw new IllegalStateException("Exchange not found");
+ if (log.isDebugEnabled()) {
+ log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
+ }
+ exchanges.put(exchange.getExchangeId(), exchange);
+ cont.resume();
}
}
@@ -147,6 +149,7 @@
}
request.setAttribute(Context.class.getName(), ctx);
exchange = soapHelper.onReceive(ctx);
+ exchanges.put(exchange.getExchangeId(), exchange);
NormalizedMessage inMessage = exchange.getMessage("in");
if (getConfiguration().isWantHeadersFromHttpIntoExchange()) {
inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
@@ -160,11 +163,11 @@
}
boolean result = cont.suspend(suspentionTime);
exchange = exchanges.remove(exchange.getExchangeId());
+ request.removeAttribute(MessageExchange.class.getName());
if (!result) {
locks.remove(exchange.getExchangeId());
- throw new Exception("Error sending exchange: aborted");
+ throw new Exception("Exchange timed out");
}
- request.removeAttribute(MessageExchange.class.getName());
}
} catch (RetryRequest retry) {
throw retry;
@@ -177,16 +180,18 @@
return;
}
} else {
- String id = (String) request.getAttribute(MessageExchange.class.getName());
- exchange = exchanges.remove(id);
- request.removeAttribute(MessageExchange.class.getName());
- boolean result = cont.suspend(0);
- // Check if this is a timeout
- if (exchange == null) {
- throw new IllegalStateException("Exchange not found");
- }
- if (!result) {
- throw new Exception("Timeout");
+ synchronized (cont) {
+ String id = (String) request.getAttribute(MessageExchange.class.getName());
+ locks.remove(id);
+ exchange = exchanges.remove(id);
+ request.removeAttribute(MessageExchange.class.getName());
+ // Check if this is a timeout
+ if (!cont.isResumed()) {
+ throw new Exception("Exchange timed out");
+ }
+ if (exchange == null) {
+ throw new IllegalStateException("Exchange not found");
+ }
}
}
if (exchange.getStatus() == ExchangeStatus.ERROR) {
Modified: servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java?rev=690200&r1=690199&r2=690200&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java Fri Aug 29 05:08:53 2008
@@ -16,11 +16,17 @@
*/
package org.apache.servicemix.http;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.ExchangeStatus;
import javax.xml.namespace.QName;
import javax.xml.transform.TransformerException;
+import javax.xml.transform.stream.StreamSource;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -43,6 +49,7 @@
import org.apache.servicemix.jbi.jaxp.SourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.util.DOMUtil;
import org.apache.servicemix.soap.bindings.soap.Soap11;
import org.apache.servicemix.soap.bindings.soap.Soap12;
@@ -50,6 +57,7 @@
import org.apache.servicemix.soap.interceptors.jbi.JbiConstants;
import org.apache.servicemix.soap.util.DomUtil;
import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
import org.apache.xpath.CachedXPathAPI;
import org.springframework.core.io.ClassPathResource;
@@ -59,6 +67,10 @@
protected JBIContainer container;
protected SourceTransformer transformer = new SourceTransformer();
+ static {
+ System.setProperty("org.apache.servicemix.preserveContent", "true");
+ }
+
protected void setUp() throws Exception {
container = new JBIContainer();
container.setUseMBeanServer(false);
@@ -120,6 +132,48 @@
recv.getMessageList().assertMessagesReceived(1);
}
+ public void testHttpInOutWithTimeout() throws Exception {
+ HttpComponent http = new HttpComponent();
+ HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
+ ep.setService(new QName("urn:test", "svc"));
+ ep.setEndpoint("ep");
+ ep.setTargetService(new QName("urn:test", "echo"));
+ ep.setLocationURI("http://localhost:8192/ep1/");
+ ep.setDefaultMep(MessageExchangeSupport.IN_OUT);
+ ep.setTimeout(1000);
+ http.setEndpoints(new HttpEndpointType[] {ep});
+ container.activateComponent(http, "http");
+
+ EchoComponent echo = new EchoComponent() {
+ public void onMessageExchange(MessageExchange exchange) {
+ super.onMessageExchange(exchange);
+ }
+ protected boolean transform(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws MessagingException {
+ try {
+ Thread.sleep(1500);
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return super.transform(exchange, in, out);
+ }
+ };
+ echo.setService(new QName("urn:test", "echo"));
+ echo.setEndpoint("endpoint");
+ container.activateComponent(echo, "echo");
+
+ container.start();
+
+ PostMethod post = new PostMethod("http://localhost:8192/ep1/");
+ post.setRequestEntity(new StringRequestEntity("<hello>world</hello>"));
+ new HttpClient().executeMethod(post);
+ String res = post.getResponseBodyAsString();
+ log.info(res);
+ if (post.getStatusCode() != 500) {
+ throw new InvalidStatusResponseException(post.getStatusCode());
+ }
+ Thread.sleep(1000);
+ }
+
public void testHttpInOut() throws Exception {
HttpComponent http = new HttpComponent();
HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
@@ -381,4 +435,63 @@
assertEquals(200, post.getStatusCode());
}
+ /*
+ * Load testing test
+ */
+ public void testHttpInOutUnderLoad() throws Exception {
+ HttpComponent http = new HttpComponent();
+ HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
+ ep.setService(new QName("urn:test", "svc"));
+ ep.setEndpoint("ep");
+ ep.setTargetService(new QName("urn:test", "echo"));
+ ep.setLocationURI("http://localhost:8192/ep1/");
+ //ep.setTimeout(500);
+ http.setEndpoints(new HttpEndpointType[] {ep});
+ container.activateComponent(http, "http");
+
+ EchoComponent echo = new EchoComponent();
+ echo.setService(new QName("urn:test", "echo"));
+ echo.setEndpoint("endpoint");
+ container.activateComponent(echo, "echo");
+
+ ((ExecutorFactoryImpl) container.getExecutorFactory()).getDefaultConfig().setMaximumPoolSize(16);
+
+ container.start();
+
+ final int nbThreads = 32;
+ final int nbRequests = 64;
+ final List<Throwable> throwables = new CopyOnWriteArrayList<Throwable>();
+ final CountDownLatch latch = new CountDownLatch(nbThreads * nbRequests);
+ for (int t = 0; t < nbThreads; t++) {
+ new Thread() {
+ public void run() {
+ final HttpClient client = new HttpClient();
+ //client.getParams().setSoTimeout(60000);
+ for (int i = 0; i < nbRequests; i++) {
+ try {
+ PostMethod post = new PostMethod("http://localhost:8192/ep1/");
+ post.setRequestEntity(new StringRequestEntity("<hello>world</hello>"));
+ client.executeMethod(post);
+ Node node = transformer.toDOMNode(new StreamSource(post.getResponseBodyAsStream()));
+ log.info(transformer.toString(node));
+ assertEquals("world", textValueOfXPath(node, "/hello/text()"));
+ if (post.getStatusCode() != 200) {
+ throw new InvalidStatusResponseException(post.getStatusCode());
+ }
+ } catch (Throwable t) {
+ throwables.add(t);
+ } finally {
+ latch.countDown();
+ //System.out.println("[" + System.currentTimeMillis() + "] Request " + latch.getCount() + " processed");
+ }
+ }
+ }
+ }.start();
+ }
+ latch.await();
+ for (Throwable t : throwables) {
+ t.printStackTrace();
+ }
+ }
+
}