You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/08/29 14:08:54 UTC

svn commit: r690200 - in /servicemix/components/bindings/servicemix-http/trunk/src: main/java/org/apache/servicemix/http/endpoints/ main/java/org/apache/servicemix/http/processors/ test/java/org/apache/servicemix/http/

Author: gnodet
Date: Fri Aug 29 05:08:53 2008
New Revision: 690200

URL: http://svn.apache.org/viewvc?rev=690200&view=rev
Log:
SM-1179: fix the exchange not found which lead to the http component being blocked

Modified:
    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
    servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java

Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=690200&r1=690199&r2=690200&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Fri Aug 29 05:08:53 2008
@@ -219,15 +219,25 @@
     }
 
     public void process(MessageExchange exchange) throws Exception {
-        Continuation cont = locks.remove(exchange.getExchangeId());
+        // Receive the exchange response
+        // First, check if the continuation has not been removed from the map,
+        // which would mean it has timed out.  If this is the case, throw an exception
+        // that will set the exchange status to ERROR.
+        Continuation cont = locks.get(exchange.getExchangeId());
         if (cont == null) {
             throw new Exception("HTTP request has timed out");
         }
+        // synchronized block
         synchronized (cont) {
+            if (locks.remove(exchange.getExchangeId()) == null) {
+                throw new Exception("HTTP request has timed out");
+            }
             if (logger.isDebugEnabled()) {
                 logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
             }
+            // Put the new exchange
             exchanges.put(exchange.getExchangeId(), exchange);
+            // Resume continuation
             cont.resume();
         }
     }
@@ -247,42 +257,71 @@
             Continuation cont = ContinuationSupport.getContinuation(request, null);
             // If the continuation is not a retry
             if (!cont.isPending()) {
+                // Create the exchange
                 exchange = createExchange(request);
-                locks.put(exchange.getExchangeId(), cont);
+                // Put the exchange in a map so that we can later retrieve it
+                // We don't put the exchange on the request directly in case the JMS flow is involved
+                // because the exchange coming back may not be the same object as the one send.
+                exchanges.put(exchange.getExchangeId(), exchange);
+                // Put the exchange id on the request to be able to retrieve the exchange later
                 request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
+                // Put the continuation in a map under the exchange id key
+                locks.put(exchange.getExchangeId(), cont);
                 synchronized (cont) {
+                    // Send the exchange
                     send(exchange);
                     if (logger.isDebugEnabled()) {
                         logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
                     }
+                    // Suspend the continuation for the configured timeout
+                    // If a SelectConnector is used, the call to suspend will throw a RetryRequest exception
+                    // else, the call will block until the continuation is resumed
                     long to = this.timeout;
                     if (to == 0) {
-                        to = ((HttpComponent)getServiceUnit().getComponent()).getConfiguration()
-                            .getConsumerProcessorSuspendTime();
+                        to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
                     }
-                    exchanges.put(exchange.getExchangeId(), exchange);
                     boolean result = cont.suspend(to);
+                    // The call has not thrown a RetryRequest, which means we don't use a SelectConnector
+                    // and we must handle the exchange in this very method call.
+                    // If result is false, the continuation has timed out.
+                    // So get the exchange (in case the object has changed) and remove it from the map
                     exchange = exchanges.remove(exchange.getExchangeId());
+                    // remove the exchange id from the request as we don't need it anymore
+                    request.removeAttribute(MessageExchange.class.getName());
+                    // If a timeout occurred, throw an exception that will be sent back to the HTTP client
+                    // Whenever the exchange comes back, the process(MessageExchange) method will thrown an
+                    // exception and the exchange will be set in an ERROR status
                     if (!result) {
+                        // Remove the continuation from the map.
+                        // This indicates the continuation has been fully processed
                         locks.remove(exchange.getExchangeId());
                         throw new Exception("Exchange timed out");
                     }
-                    request.removeAttribute(MessageExchange.class.getName());
                 }
+            // The continuation is a retry.
+            // This happens when the SelectConnector is used and in two cases:
+            //  * the continuation has been resumed because the exchange has been received
+            //  * the continuation has timed out
             } else {
-                String id = (String)request.getAttribute(MessageExchange.class.getName());
-                locks.remove(id);
-                exchange = exchanges.remove(id);
-                request.removeAttribute(MessageExchange.class.getName());
-                boolean result = cont.suspend(0);
-                // Check if this is a timeout
-                if (exchange == null) {
-                    throw new IllegalStateException("Exchange not found");
-                }
-                if (!result) {
-                    throw new Exception("Timeout");
+                synchronized (cont) {
+                    // Get the exchange id from the request
+                    String id = (String) request.getAttribute(MessageExchange.class.getName());
+                    // Remove the continuation from the map, indicating it has been processed or timed out
+                    locks.remove(id);
+                    exchange = exchanges.remove(id);
+                    request.removeAttribute(MessageExchange.class.getName());
+                    // Check if this is a timeout
+                    if (!cont.isResumed()) {
+                        throw new Exception("Exchange timed out");
+                    }
+                    // This should never happen, but we never knows
+                    if (exchange == null) {
+                        throw new IllegalStateException("Exchange not found");
+                    }
                 }
             }
+            // At this point, we have received the exchange response,
+            // so process it and send back the HTTP response
             if (exchange.getStatus() == ExchangeStatus.ERROR) {
                 Exception e = exchange.getError();
                 if (e == null) {
@@ -300,11 +339,9 @@
                             sendOut(exchange, outMsg, request, response);
                         }
                     }
-                    exchange.setStatus(ExchangeStatus.DONE);
-                    send(exchange);
+                    done(exchange);
                 } catch (Exception e) {
-                    exchange.setError(e);
-                    send(exchange);
+                    fail(exchange, e);
                     throw e;
                 }
             } else if (exchange.getStatus() == ExchangeStatus.DONE) {

Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=690200&r1=690199&r2=690200&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Fri Aug 29 05:08:53 2008
@@ -90,17 +90,19 @@
     }
     
     public void process(MessageExchange exchange) throws Exception {
-        Continuation cont = locks.remove(exchange.getExchangeId());
-        if (cont != null) {
-            synchronized (cont) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
-                }
-                exchanges.put(exchange.getExchangeId(), exchange);
-                cont.resume();
+        Continuation cont = locks.get(exchange.getExchangeId());
+        if (cont == null) {
+            throw new Exception("HTTP request has timed out");
+        }
+        synchronized (cont) {
+            if (locks.remove(exchange.getExchangeId()) == null) {
+                throw new Exception("HTTP request has timed out");
             }
-        } else {
-            throw new IllegalStateException("Exchange not found");
+            if (log.isDebugEnabled()) {
+                log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
+            }
+            exchanges.put(exchange.getExchangeId(), exchange);
+            cont.resume();
         }
     }
 
@@ -147,6 +149,7 @@
                 }
                 request.setAttribute(Context.class.getName(), ctx);
                 exchange = soapHelper.onReceive(ctx);
+                exchanges.put(exchange.getExchangeId(), exchange);
                 NormalizedMessage inMessage = exchange.getMessage("in");
                 if (getConfiguration().isWantHeadersFromHttpIntoExchange()) {
                     inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
@@ -160,11 +163,11 @@
                     }
                     boolean result = cont.suspend(suspentionTime);
                     exchange = exchanges.remove(exchange.getExchangeId());
+                    request.removeAttribute(MessageExchange.class.getName());
                     if (!result) {
                         locks.remove(exchange.getExchangeId());
-                        throw new Exception("Error sending exchange: aborted");
+                        throw new Exception("Exchange timed out");
                     }
-                    request.removeAttribute(MessageExchange.class.getName());
                 }
             } catch (RetryRequest retry) {
                 throw retry;
@@ -177,16 +180,18 @@
                 return;
             }
         } else {
-            String id = (String) request.getAttribute(MessageExchange.class.getName());
-            exchange = exchanges.remove(id);
-            request.removeAttribute(MessageExchange.class.getName());
-            boolean result = cont.suspend(0); 
-            // Check if this is a timeout
-            if (exchange == null) {
-                throw new IllegalStateException("Exchange not found");
-            }
-            if (!result) {
-                throw new Exception("Timeout");
+            synchronized (cont) {
+                String id = (String) request.getAttribute(MessageExchange.class.getName());
+                locks.remove(id);
+                exchange = exchanges.remove(id);
+                request.removeAttribute(MessageExchange.class.getName());
+                // Check if this is a timeout
+                if (!cont.isResumed()) {
+                    throw new Exception("Exchange timed out");
+                }
+                if (exchange == null) {
+                    throw new IllegalStateException("Exchange not found");
+                }
             }
         }
         if (exchange.getStatus() == ExchangeStatus.ERROR) {

Modified: servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java?rev=690200&r1=690199&r2=690200&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/test/java/org/apache/servicemix/http/ConsumerEndpointTest.java Fri Aug 29 05:08:53 2008
@@ -16,11 +16,17 @@
  */
 package org.apache.servicemix.http;
 
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.ExchangeStatus;
 import javax.xml.namespace.QName;
 import javax.xml.transform.TransformerException;
+import javax.xml.transform.stream.StreamSource;
 
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -43,6 +49,7 @@
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.util.DOMUtil;
 import org.apache.servicemix.soap.bindings.soap.Soap11;
 import org.apache.servicemix.soap.bindings.soap.Soap12;
@@ -50,6 +57,7 @@
 import org.apache.servicemix.soap.interceptors.jbi.JbiConstants;
 import org.apache.servicemix.soap.util.DomUtil;
 import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
 import org.apache.xpath.CachedXPathAPI;
 import org.springframework.core.io.ClassPathResource;
 
@@ -59,6 +67,10 @@
     protected JBIContainer container;
     protected SourceTransformer transformer = new SourceTransformer();
 
+    static {
+        System.setProperty("org.apache.servicemix.preserveContent", "true");
+    }
+
     protected void setUp() throws Exception {
         container = new JBIContainer();
         container.setUseMBeanServer(false);
@@ -120,6 +132,48 @@
         recv.getMessageList().assertMessagesReceived(1);
     }
 
+    public void testHttpInOutWithTimeout() throws Exception {
+        HttpComponent http = new HttpComponent();
+        HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
+        ep.setService(new QName("urn:test", "svc"));
+        ep.setEndpoint("ep");
+        ep.setTargetService(new QName("urn:test", "echo"));
+        ep.setLocationURI("http://localhost:8192/ep1/");
+        ep.setDefaultMep(MessageExchangeSupport.IN_OUT);
+        ep.setTimeout(1000);
+        http.setEndpoints(new HttpEndpointType[] {ep});
+        container.activateComponent(http, "http");
+
+        EchoComponent echo = new EchoComponent() {
+            public void onMessageExchange(MessageExchange exchange) {
+                super.onMessageExchange(exchange);
+            }
+            protected boolean transform(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out) throws MessagingException {
+                try {
+                    Thread.sleep(1500);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                }
+                return super.transform(exchange, in, out);
+            }
+        };
+        echo.setService(new QName("urn:test", "echo"));
+        echo.setEndpoint("endpoint");
+        container.activateComponent(echo, "echo");
+
+        container.start();
+
+        PostMethod post = new PostMethod("http://localhost:8192/ep1/");
+        post.setRequestEntity(new StringRequestEntity("<hello>world</hello>"));
+        new HttpClient().executeMethod(post);
+        String res = post.getResponseBodyAsString();
+        log.info(res);
+        if (post.getStatusCode() != 500) {
+            throw new InvalidStatusResponseException(post.getStatusCode());
+        }
+        Thread.sleep(1000);
+    }
+
     public void testHttpInOut() throws Exception {
         HttpComponent http = new HttpComponent();
         HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
@@ -381,4 +435,63 @@
         assertEquals(200, post.getStatusCode());
     }
 
+    /*
+     * Load testing test
+     */
+    public void testHttpInOutUnderLoad() throws Exception {
+        HttpComponent http = new HttpComponent();
+        HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
+        ep.setService(new QName("urn:test", "svc"));
+        ep.setEndpoint("ep");
+        ep.setTargetService(new QName("urn:test", "echo"));
+        ep.setLocationURI("http://localhost:8192/ep1/");
+        //ep.setTimeout(500);
+        http.setEndpoints(new HttpEndpointType[] {ep});
+        container.activateComponent(http, "http");
+
+        EchoComponent echo = new EchoComponent();
+        echo.setService(new QName("urn:test", "echo"));
+        echo.setEndpoint("endpoint");
+        container.activateComponent(echo, "echo");
+
+        ((ExecutorFactoryImpl) container.getExecutorFactory()).getDefaultConfig().setMaximumPoolSize(16);
+
+        container.start();
+
+        final int nbThreads = 32;
+        final int nbRequests = 64;
+        final List<Throwable> throwables = new CopyOnWriteArrayList<Throwable>();
+        final CountDownLatch latch = new CountDownLatch(nbThreads * nbRequests);
+        for (int t = 0; t < nbThreads; t++) {
+            new Thread() {
+                public void run() {
+                    final HttpClient client = new HttpClient();
+                    //client.getParams().setSoTimeout(60000);
+                    for (int i = 0; i < nbRequests; i++) {
+                        try {
+                            PostMethod post = new PostMethod("http://localhost:8192/ep1/");
+                            post.setRequestEntity(new StringRequestEntity("<hello>world</hello>"));
+                            client.executeMethod(post);
+                            Node node = transformer.toDOMNode(new StreamSource(post.getResponseBodyAsStream()));
+                            log.info(transformer.toString(node));
+                            assertEquals("world", textValueOfXPath(node, "/hello/text()"));
+                            if (post.getStatusCode() != 200) {
+                                throw new InvalidStatusResponseException(post.getStatusCode());
+                            }
+                        } catch (Throwable t) {
+                            throwables.add(t);
+                        } finally {
+                            latch.countDown();
+                            //System.out.println("[" + System.currentTimeMillis() + "] Request " + latch.getCount() + " processed");
+                        }
+                    }
+                }
+            }.start();
+        }
+        latch.await();
+        for (Throwable t : throwables) {
+            t.printStackTrace();
+        }
+    }
+
 }