You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/11 12:26:34 UTC
svn commit: r694206 - in
/servicemix/components/bindings/servicemix-http/trunk/src:
main/java/org/apache/servicemix/http/endpoints/
main/java/org/apache/servicemix/http/processors/
test/java/org/apache/servicemix/http/
Author: gnodet
Date: Thu Sep 11 03:26:32 2008
New Revision: 694206
URL: http://svn.apache.org/viewvc?rev=694206&view=rev
Log:
SM-1407: Fix problem with some exchanges not sent back in ERROR when a timeout occur
Modified:
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java
Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=694206&r1=694205&r2=694206&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Thu Sep 11 03:26:32 2008
@@ -55,8 +55,8 @@
import org.mortbay.util.ajax.ContinuationSupport;
/**
- * a plain HTTP consumer endpoint. This endpoint can be used to handle plain HTTP requests or to be able to process the request in a
- * non standard way. For HTTP requests, a WSDL2 HTTP binding can be used.
+ * Plain HTTP consumer endpoint. This endpoint can be used to handle plain HTTP request (without SOAP) or to be able to
+ * process the request in a non standard way. For HTTP requests, a WSDL2 HTTP binding can be used.
*
* @author gnodet
* @since 3.2
@@ -122,8 +122,8 @@
* Specifies the timeout value for an HTTP consumer endpoint. The timeout is specified in milliseconds. The default value is 0
* which means that the endpoint will never timeout.
*
- * @org.apache.xbean.Property description=
- * "the timeout is specified in milliseconds. The default value is 0 which means that the endpoint will never timeout."
+ * @org.apache.xbean.Property description="the timeout is specified in milliseconds. The default value is 0 which
+ * means that the endpoint will never timeout."
* @param timeout the length time, in milliseconds, to wait before timing out
*/
public void setTimeout(long timeout) {
@@ -225,12 +225,12 @@
// that will set the exchange status to ERROR.
Continuation cont = locks.get(exchange.getExchangeId());
if (cont == null) {
- throw new Exception("HTTP request has timed out");
+ throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
}
// synchronized block
synchronized (cont) {
if (locks.remove(exchange.getExchangeId()) == null) {
- throw new Exception("HTTP request has timed out");
+ throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
}
if (logger.isDebugEnabled()) {
logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
@@ -311,13 +311,14 @@
exchange = exchanges.remove(id);
request.removeAttribute(MessageExchange.class.getName());
// Check if this is a timeout
- if (!cont.isResumed()) {
- throw new Exception("Exchange timed out");
- }
- // This should never happen, but we never knows
if (exchange == null) {
throw new IllegalStateException("Exchange not found");
}
+ if (!cont.isResumed()) {
+ Exception e = new Exception("Exchange timed out: " + exchange.getExchangeId());
+ fail(exchange, e);
+ throw e;
+ }
}
}
// At this point, we have received the exchange response,
@@ -395,8 +396,8 @@
response.setStatus(200);
response.setContentType("text/xml");
try {
- new SourceTransformer().toResult(new DOMSource((Node)res), new StreamResult(response
- .getOutputStream()));
+ new SourceTransformer().toResult(new DOMSource((Node)res),
+ new StreamResult(response.getOutputStream()));
} catch (TransformerException e) {
throw new ServletException("Error while sending xml resource", e);
}
@@ -418,7 +419,7 @@
}
protected ContextManager getServerManager() {
- HttpComponent comp = (HttpComponent)getServiceUnit().getComponent();
+ HttpComponent comp = (HttpComponent) getServiceUnit().getComponent();
return comp.getServer();
}
@@ -428,7 +429,7 @@
// If the user has been authenticated, put these informations on
// the in NormalizedMessage.
if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
- Subject subject = ((JaasJettyPrincipal)request.getUserPrincipal()).getSubject();
+ Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
me.getMessage("in").setSecuritySubject(subject);
}
return me;
@@ -460,7 +461,7 @@
marshaler = new DefaultHttpConsumerMarshaler();
}
if (marshaler instanceof DefaultHttpConsumerMarshaler) {
- ((DefaultHttpConsumerMarshaler)marshaler).setDefaultMep(getDefaultMep());
+ ((DefaultHttpConsumerMarshaler) marshaler).setDefaultMep(getDefaultMep());
}
}
}
Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=694206&r1=694205&r2=694206&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Thu Sep 11 03:26:32 2008
@@ -135,18 +135,7 @@
// If the continuation is not a retry
if (!cont.isPending()) {
try {
- SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
- request.getInputStream(),
- request.getHeader(HEADER_CONTENT_TYPE));
- Context ctx = soapHelper.createContext(message);
- if (request.getUserPrincipal() != null) {
- if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
- Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
- ctx.getInMessage().setSubject(subject);
- } else {
- ctx.getInMessage().addPrincipal(request.getUserPrincipal());
- }
- }
+ Context ctx = createContext(request);
request.setAttribute(Context.class.getName(), ctx);
exchange = soapHelper.onReceive(ctx);
exchanges.put(exchange.getExchangeId(), exchange);
@@ -175,8 +164,7 @@
sendFault(fault, request, response);
return;
} catch (Exception e) {
- SoapFault fault = new SoapFault(e);
- sendFault(fault, request, response);
+ sendFault(new SoapFault(e), request, response);
return;
}
} else {
@@ -186,12 +174,16 @@
exchange = exchanges.remove(id);
request.removeAttribute(MessageExchange.class.getName());
// Check if this is a timeout
- if (!cont.isResumed()) {
- throw new Exception("Exchange timed out");
- }
if (exchange == null) {
throw new IllegalStateException("Exchange not found");
}
+ if (!cont.isResumed()) {
+ Exception e = new Exception("Exchange timed out: " + exchange.getExchangeId());
+ exchange.setError(e);
+ channel.send(exchange);
+ sendFault(new SoapFault(e), request, response);
+ return;
+ }
}
}
if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -217,6 +209,22 @@
}
}
+ private Context createContext(HttpServletRequest request) throws Exception {
+ SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
+ request.getInputStream(),
+ request.getHeader(HEADER_CONTENT_TYPE));
+ Context ctx = soapHelper.createContext(message);
+ if (request.getUserPrincipal() != null) {
+ if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
+ Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
+ ctx.getInMessage().setSubject(subject);
+ } else {
+ ctx.getInMessage().addPrincipal(request.getUserPrincipal());
+ }
+ }
+ return ctx;
+ }
+
private void processResponse(MessageExchange exchange, HttpServletRequest request, HttpServletResponse response) throws Exception {
NormalizedMessage outMsg = exchange.getMessage("out");
if (outMsg != null) {
Modified: servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java?rev=694206&r1=694205&r2=694206&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java Thu Sep 11 03:26:32 2008
@@ -57,9 +57,11 @@
import org.apache.servicemix.soap.interceptors.jbi.JbiConstants;
import org.apache.servicemix.soap.util.DomUtil;
import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
import org.apache.xpath.CachedXPathAPI;
import org.springframework.core.io.ClassPathResource;
+import org.mortbay.jetty.bio.SocketConnector;
public class ConsumerEndpointTest extends TestCase {
private static transient Log log = LogFactory.getLog(ConsumerEndpointTest.class);
@@ -76,6 +78,9 @@
container.setUseMBeanServer(false);
container.setCreateMBeanServer(false);
container.setEmbedded(true);
+ ExecutorFactoryImpl factory = new ExecutorFactoryImpl();
+ factory.getDefaultConfig().setQueueSize(0);
+ container.setExecutorFactory(factory);
container.init();
}
@@ -439,17 +444,40 @@
* Load testing test
*/
public void testHttpInOutUnderLoad() throws Exception {
+ final int nbThreads = 16;
+ final int nbRequests = 8;
+ final int endpointTimeout = 100;
+ final int echoSleepTime = 90;
+ final int soTimeout = 60 * 1000 * 1000;
+ final int listenerTimeout = 5000;
+
+ ExchangeCompletedListener listener = new ExchangeCompletedListener(listenerTimeout);
+ container.addListener(listener);
+
HttpComponent http = new HttpComponent();
+ //http.getConfiguration().setJettyConnectorClassName(SocketConnector.class.getName());
HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
ep.setService(new QName("urn:test", "svc"));
ep.setEndpoint("ep");
ep.setTargetService(new QName("urn:test", "echo"));
ep.setLocationURI("http://localhost:8192/ep1/");
- //ep.setTimeout(500);
+ ep.setTimeout(endpointTimeout);
http.setEndpoints(new HttpEndpointType[] {ep});
container.activateComponent(http, "http");
- EchoComponent echo = new EchoComponent();
+ final CountDownLatch latchRecv = new CountDownLatch(nbThreads * nbRequests);
+ EchoComponent echo = new EchoComponent() {
+ protected boolean transform(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws MessagingException {
+ latchRecv.countDown();
+ try {
+ Thread.sleep(echoSleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ out.setContent(in.getContent());
+ return true;
+ }
+ };
echo.setService(new QName("urn:test", "echo"));
echo.setEndpoint("endpoint");
container.activateComponent(echo, "echo");
@@ -458,37 +486,38 @@
container.start();
- final int nbThreads = 32;
- final int nbRequests = 64;
final List<Throwable> throwables = new CopyOnWriteArrayList<Throwable>();
- final CountDownLatch latch = new CountDownLatch(nbThreads * nbRequests);
+ final CountDownLatch latchSent = new CountDownLatch(nbThreads * nbRequests);
for (int t = 0; t < nbThreads; t++) {
new Thread() {
public void run() {
+ final SourceTransformer transformer = new SourceTransformer();
final HttpClient client = new HttpClient();
- //client.getParams().setSoTimeout(60000);
+ client.getParams().setSoTimeout(soTimeout);
for (int i = 0; i < nbRequests; i++) {
try {
PostMethod post = new PostMethod("http://localhost:8192/ep1/");
post.setRequestEntity(new StringRequestEntity("<hello>world</hello>"));
client.executeMethod(post);
- Node node = transformer.toDOMNode(new StreamSource(post.getResponseBodyAsStream()));
- log.info(transformer.toString(node));
- assertEquals("world", textValueOfXPath(node, "/hello/text()"));
if (post.getStatusCode() != 200) {
throw new InvalidStatusResponseException(post.getStatusCode());
}
+ Node node = transformer.toDOMNode(new StreamSource(post.getResponseBodyAsStream()));
+ log.info(transformer.toString(node));
+ assertEquals("world", textValueOfXPath(node, "/hello/text()"));
} catch (Throwable t) {
throwables.add(t);
} finally {
- latch.countDown();
+ latchSent.countDown();
//System.out.println("[" + System.currentTimeMillis() + "] Request " + latch.getCount() + " processed");
}
}
}
}.start();
}
- latch.await();
+ latchSent.await();
+ latchRecv.await();
+ listener.assertExchangeCompleted();
for (Throwable t : throwables) {
t.printStackTrace();
}