You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2009/03/31 09:02:15 UTC

svn commit: r760346 - /servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java

Author: ffang
Date: Tue Mar 31 07:02:14 2009
New Revision: 760346

URL: http://svn.apache.org/viewvc?rev=760346&view=rev
Log:
[SMXCOMP-493]also apply fix for HttpConsumerEndpoint

Modified:
    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java

Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=760346&r1=760345&r2=760346&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Tue Mar 31 07:02:14 2009
@@ -80,6 +80,7 @@
     private Object httpContext;
 
     private boolean started = false;
+    private boolean isSTFlow;
 
     public HttpConsumerEndpoint() {
         super();
@@ -240,24 +241,32 @@
         if (cont == null) {
             throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
         }
-        // synchronized block
-        synchronized (cont) {
-            if (locks.remove(exchange.getExchangeId()) == null) {
-                throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
-            }
-            // Put the new exchange
-            exchanges.put(exchange.getExchangeId(), exchange);
-            // Resume continuation
-            cont.resume();
-            if (!cont.isResumed()) {
+        if (!cont.isPending()) {
+            isSTFlow = true;
+        } else {
+            isSTFlow = false;
+            // synchronized block
+            synchronized (cont) {
+                if (locks.remove(exchange.getExchangeId()) == null) {
+                    throw new Exception("HTTP request has timed out for exchange: "
+                                        + exchange.getExchangeId());
+                }
                 if (logger.isDebugEnabled()) {
-                    logger.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
+                    logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
+                }
+                // Put the new exchange
+                exchanges.put(exchange.getExchangeId(), exchange);
+                // Resume continuation
+                cont.resume();
+                if (!cont.isResumed()) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Could not resume continuation for exchange: "
+                                     + exchange.getExchangeId());
+                    }
+                    exchanges.remove(exchange.getExchangeId());
+                    throw new Exception("HTTP request has timed out for exchange: "
+                                        + exchange.getExchangeId());
                 }
-                exchanges.remove(exchange.getExchangeId());
-                throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
             }
         }
     }
@@ -293,32 +302,50 @@
                 synchronized (cont) {
                     // Send the exchange
                     send(exchange);
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
-                    }
-                    // Suspend the continuation for the configured timeout
-                    // If a SelectConnector is used, the call to suspend will throw a RetryRequest exception
-                    // else, the call will block until the continuation is resumed
-                    long to = this.timeout;
-                    if (to == 0) {
-                        to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
-                    }
-                    boolean result = cont.suspend(to);
-                    // The call has not thrown a RetryRequest, which means we don't use a SelectConnector
-                    // and we must handle the exchange in this very method call.
-                    // If result is false, the continuation has timed out.
-                    // So get the exchange (in case the object has changed) and remove it from the map
-                    exchange = exchanges.remove(exchange.getExchangeId());
-                    // remove the exchange id from the request as we don't need it anymore
-                    request.removeAttribute(MessageExchange.class.getName());
-                    // If a timeout occurred, throw an exception that will be sent back to the HTTP client
-                    // Whenever the exchange comes back, the process(MessageExchange) method will thrown an
-                    // exception and the exchange will be set in an ERROR status
-                    if (!result) {
-                        // Remove the continuation from the map.
-                        // This indicates the continuation has been fully processed
-                        locks.remove(exchange.getExchangeId());
-                        throw new Exception("Exchange timed out");
+                    if (!isSTFlow) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
+                        }
+                        // Suspend the continuation for the configured timeout
+                        // If a SelectConnector is used, the call to suspend
+                        // will throw a RetryRequest exception
+                        // else, the call will block until the continuation is
+                        // resumed
+                        long to = this.timeout;
+                        if (to == 0) {
+                            to = ((HttpComponent)getServiceUnit().getComponent()).getConfiguration()
+                                .getConsumerProcessorSuspendTime();
+                        }
+                        boolean result = cont.suspend(to);
+                        // The call has not thrown a RetryRequest, which means
+                        // we don't use a SelectConnector
+                        // and we must handle the exchange in this very method
+                        // call.
+                        // If result is false, the continuation has timed out.
+                        // So get the exchange (in case the object has changed)
+                        // and remove it from the map
+                        exchange = exchanges.remove(exchange.getExchangeId());
+                        // remove the exchange id from the request as we don't
+                        // need it anymore
+                        request.removeAttribute(MessageExchange.class.getName());
+                        // If a timeout occurred, throw an exception that will
+                        // be sent back to the HTTP client
+                        // Whenever the exchange comes back, the
+                        // process(MessageExchange) method will thrown an
+                        // exception and the exchange will be set in an ERROR
+                        // status
+                        if (!result) {
+                            // Remove the continuation from the map.
+                            // This indicates the continuation has been fully
+                            // processed
+                            locks.remove(exchange.getExchangeId());
+                            throw new Exception("Exchange timed out");
+                        }
+                    } else {
+                        String id = (String)request.getAttribute(MessageExchange.class.getName());
+                        locks.remove(id);
+                        exchange = exchanges.remove(id);
+                        request.removeAttribute(MessageExchange.class.getName());
                     }
                 }
             // The continuation is a retry.