You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/11 12:26:34 UTC

svn commit: r694206 - in /servicemix/components/bindings/servicemix-http/trunk/src: main/java/org/apache/servicemix/http/endpoints/ main/java/org/apache/servicemix/http/processors/ test/java/org/apache/servicemix/http/

Author: gnodet
Date: Thu Sep 11 03:26:32 2008
New Revision: 694206

URL: http://svn.apache.org/viewvc?rev=694206&view=rev
Log:
SM-1407: Fix problem with some exchanges not sent back in ERROR when a timeout occur

Modified:
    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
    servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java

Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=694206&r1=694205&r2=694206&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Thu Sep 11 03:26:32 2008
@@ -55,8 +55,8 @@
 import org.mortbay.util.ajax.ContinuationSupport;
 
 /**
- * a plain HTTP consumer endpoint. This endpoint can be used to handle plain HTTP requests or to be able to process the request in a
- * non standard way. For HTTP requests, a WSDL2 HTTP binding can be used.
+ * Plain HTTP consumer endpoint. This endpoint can be used to handle plain HTTP request (without SOAP) or to be able to
+ * process the request in a non standard way. For HTTP requests, a WSDL2 HTTP binding can be used.
  * 
  * @author gnodet
  * @since 3.2
@@ -122,8 +122,8 @@
      * Specifies the timeout value for an HTTP consumer endpoint. The timeout is specified in milliseconds. The default value is 0
      * which means that the endpoint will never timeout.
      * 
-     * @org.apache.xbean.Property description=
-     *                            "the timeout is specified in milliseconds. The default value is 0 which means that the endpoint will never timeout."
+     * @org.apache.xbean.Property description="the timeout is specified in milliseconds. The default value is 0 which
+     *       means that the endpoint will never timeout."
      * @param timeout the length time, in milliseconds, to wait before timing out
      */
     public void setTimeout(long timeout) {
@@ -225,12 +225,12 @@
         // that will set the exchange status to ERROR.
         Continuation cont = locks.get(exchange.getExchangeId());
         if (cont == null) {
-            throw new Exception("HTTP request has timed out");
+            throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
         }
         // synchronized block
         synchronized (cont) {
             if (locks.remove(exchange.getExchangeId()) == null) {
-                throw new Exception("HTTP request has timed out");
+                throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
             }
             if (logger.isDebugEnabled()) {
                 logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
@@ -311,13 +311,14 @@
                     exchange = exchanges.remove(id);
                     request.removeAttribute(MessageExchange.class.getName());
                     // Check if this is a timeout
-                    if (!cont.isResumed()) {
-                        throw new Exception("Exchange timed out");
-                    }
-                    // This should never happen, but we never knows
                     if (exchange == null) {
                         throw new IllegalStateException("Exchange not found");
                     }
+                    if (!cont.isResumed()) {
+                        Exception e = new Exception("Exchange timed out: " + exchange.getExchangeId());
+                        fail(exchange, e);
+                        throw e;
+                    }
                 }
             }
             // At this point, we have received the exchange response,
@@ -395,8 +396,8 @@
             response.setStatus(200);
             response.setContentType("text/xml");
             try {
-                new SourceTransformer().toResult(new DOMSource((Node)res), new StreamResult(response
-                    .getOutputStream()));
+                new SourceTransformer().toResult(new DOMSource((Node)res),
+                                                 new StreamResult(response.getOutputStream()));
             } catch (TransformerException e) {
                 throw new ServletException("Error while sending xml resource", e);
             }
@@ -418,7 +419,7 @@
     }
 
     protected ContextManager getServerManager() {
-        HttpComponent comp = (HttpComponent)getServiceUnit().getComponent();
+        HttpComponent comp = (HttpComponent) getServiceUnit().getComponent();
         return comp.getServer();
     }
 
@@ -428,7 +429,7 @@
         // If the user has been authenticated, put these informations on
         // the in NormalizedMessage.
         if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
-            Subject subject = ((JaasJettyPrincipal)request.getUserPrincipal()).getSubject();
+            Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
             me.getMessage("in").setSecuritySubject(subject);
         }
         return me;
@@ -460,7 +461,7 @@
             marshaler = new DefaultHttpConsumerMarshaler();
         }
         if (marshaler instanceof DefaultHttpConsumerMarshaler) {
-            ((DefaultHttpConsumerMarshaler)marshaler).setDefaultMep(getDefaultMep());
+            ((DefaultHttpConsumerMarshaler) marshaler).setDefaultMep(getDefaultMep());
         }
     }
 }

Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=694206&r1=694205&r2=694206&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Thu Sep 11 03:26:32 2008
@@ -135,18 +135,7 @@
         // If the continuation is not a retry
         if (!cont.isPending()) {
             try {
-                SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
-                                            request.getInputStream(), 
-                                            request.getHeader(HEADER_CONTENT_TYPE));
-                Context ctx = soapHelper.createContext(message);
-                if (request.getUserPrincipal() != null) {
-                    if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
-                        Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
-                        ctx.getInMessage().setSubject(subject);
-                    } else {
-                        ctx.getInMessage().addPrincipal(request.getUserPrincipal());
-                    }
-                }
+                Context ctx = createContext(request);
                 request.setAttribute(Context.class.getName(), ctx);
                 exchange = soapHelper.onReceive(ctx);
                 exchanges.put(exchange.getExchangeId(), exchange);
@@ -175,8 +164,7 @@
                 sendFault(fault, request, response);
                 return;
             } catch (Exception e) {
-                SoapFault fault = new SoapFault(e);
-                sendFault(fault, request, response);
+                sendFault(new SoapFault(e), request, response);
                 return;
             }
         } else {
@@ -186,12 +174,16 @@
                 exchange = exchanges.remove(id);
                 request.removeAttribute(MessageExchange.class.getName());
                 // Check if this is a timeout
-                if (!cont.isResumed()) {
-                    throw new Exception("Exchange timed out");
-                }
                 if (exchange == null) {
                     throw new IllegalStateException("Exchange not found");
                 }
+                if (!cont.isResumed()) {
+                    Exception e = new Exception("Exchange timed out: " + exchange.getExchangeId());
+                    exchange.setError(e);
+                    channel.send(exchange);
+                    sendFault(new SoapFault(e), request, response);
+                    return;
+                }
             }
         }
         if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -217,6 +209,22 @@
         }
     }
 
+    private Context createContext(HttpServletRequest request) throws Exception {
+        SoapMessage message = soapHelper.getSoapMarshaler().createReader().read(
+                                    request.getInputStream(),
+                                    request.getHeader(HEADER_CONTENT_TYPE));
+        Context ctx = soapHelper.createContext(message);
+        if (request.getUserPrincipal() != null) {
+            if (request.getUserPrincipal() instanceof JaasJettyPrincipal) {
+                Subject subject = ((JaasJettyPrincipal) request.getUserPrincipal()).getSubject();
+                ctx.getInMessage().setSubject(subject);
+            } else {
+                ctx.getInMessage().addPrincipal(request.getUserPrincipal());
+            }
+        }
+        return ctx;
+    }
+
     private void processResponse(MessageExchange exchange, HttpServletRequest request, HttpServletResponse response) throws Exception {
         NormalizedMessage outMsg = exchange.getMessage("out");
         if (outMsg != null) {

Modified: servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java?rev=694206&r1=694205&r2=694206&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java Thu Sep 11 03:26:32 2008
@@ -57,9 +57,11 @@
 import org.apache.servicemix.soap.interceptors.jbi.JbiConstants;
 import org.apache.servicemix.soap.util.DomUtil;
 import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
 import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
 import org.apache.xpath.CachedXPathAPI;
 import org.springframework.core.io.ClassPathResource;
+import org.mortbay.jetty.bio.SocketConnector;
 
 public class ConsumerEndpointTest extends TestCase {
     private static transient Log log = LogFactory.getLog(ConsumerEndpointTest.class);
@@ -76,6 +78,9 @@
         container.setUseMBeanServer(false);
         container.setCreateMBeanServer(false);
         container.setEmbedded(true);
+        ExecutorFactoryImpl factory = new ExecutorFactoryImpl();
+        factory.getDefaultConfig().setQueueSize(0);
+        container.setExecutorFactory(factory);
         container.init();
     }
 
@@ -439,17 +444,40 @@
      * Load testing test
      */
     public void testHttpInOutUnderLoad() throws Exception {
+        final int nbThreads = 16;
+        final int nbRequests = 8;
+        final int endpointTimeout = 100;
+        final int echoSleepTime = 90;
+        final int soTimeout = 60 * 1000 * 1000;
+        final int listenerTimeout = 5000;
+
+        ExchangeCompletedListener listener = new ExchangeCompletedListener(listenerTimeout);
+        container.addListener(listener);
+
         HttpComponent http = new HttpComponent();
+        //http.getConfiguration().setJettyConnectorClassName(SocketConnector.class.getName());
         HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
         ep.setService(new QName("urn:test", "svc"));
         ep.setEndpoint("ep");
         ep.setTargetService(new QName("urn:test", "echo"));
         ep.setLocationURI("http://localhost:8192/ep1/");
-        //ep.setTimeout(500);
+        ep.setTimeout(endpointTimeout);
         http.setEndpoints(new HttpEndpointType[] {ep});
         container.activateComponent(http, "http");
 
-        EchoComponent echo = new EchoComponent();
+        final CountDownLatch latchRecv = new CountDownLatch(nbThreads * nbRequests);
+        EchoComponent echo = new EchoComponent() {
+            protected boolean transform(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws MessagingException {
+                latchRecv.countDown();
+                try {
+                    Thread.sleep(echoSleepTime);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+                out.setContent(in.getContent());
+                return true;
+            }
+        };
         echo.setService(new QName("urn:test", "echo"));
         echo.setEndpoint("endpoint");
         container.activateComponent(echo, "echo");
@@ -458,37 +486,38 @@
 
         container.start();
 
-        final int nbThreads = 32;
-        final int nbRequests = 64;
         final List<Throwable> throwables = new CopyOnWriteArrayList<Throwable>();
-        final CountDownLatch latch = new CountDownLatch(nbThreads * nbRequests);
+        final CountDownLatch latchSent = new CountDownLatch(nbThreads * nbRequests);
         for (int t = 0; t < nbThreads; t++) {
             new Thread() {
                 public void run() {
+                    final SourceTransformer transformer = new SourceTransformer(); 
                     final HttpClient client = new HttpClient();
-                    //client.getParams().setSoTimeout(60000);
+                    client.getParams().setSoTimeout(soTimeout);
                     for (int i = 0; i < nbRequests; i++) {
                         try {
                             PostMethod post = new PostMethod("http://localhost:8192/ep1/");
                             post.setRequestEntity(new StringRequestEntity("<hello>world</hello>"));
                             client.executeMethod(post);
-                            Node node = transformer.toDOMNode(new StreamSource(post.getResponseBodyAsStream()));
-                            log.info(transformer.toString(node));
-                            assertEquals("world", textValueOfXPath(node, "/hello/text()"));
                             if (post.getStatusCode() != 200) {
                                 throw new InvalidStatusResponseException(post.getStatusCode());
                             }
+                            Node node = transformer.toDOMNode(new StreamSource(post.getResponseBodyAsStream()));
+                            log.info(transformer.toString(node));
+                            assertEquals("world", textValueOfXPath(node, "/hello/text()"));
                         } catch (Throwable t) {
                             throwables.add(t);
                         } finally {
-                            latch.countDown();
+                            latchSent.countDown();
                             //System.out.println("[" + System.currentTimeMillis() + "] Request " + latch.getCount() + " processed");
                         }
                     }
                 }
             }.start();
         }
-        latch.await();
+        latchSent.await();
+        latchRecv.await();
+        listener.assertExchangeCompleted();
         for (Throwable t : throwables) {
             t.printStackTrace();
         }