You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2009/03/31 04:13:01 UTC

svn commit: r760291 - /servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Author: ffang
Date: Tue Mar 31 02:13:00 2009
New Revision: 760291

URL: http://svn.apache.org/viewvc?rev=760291&view=rev
Log:
[SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc

Modified:
    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Tue Mar 31 02:13:00 2009
@@ -71,6 +71,7 @@
     protected Map<String, MessageExchange> exchanges;
     protected int suspentionTime = 60000;
     protected boolean started = false;
+    private boolean isSTFlow;
         
     public ConsumerProcessor(HttpEndpoint endpoint) {
         super(endpoint);
@@ -96,21 +97,27 @@
         if (cont == null) {
             throw new Exception("HTTP request has timed out");
         }
-        synchronized (cont) {
-            if (locks.remove(exchange.getExchangeId()) == null) {
-                throw new Exception("HTTP request has timed out");
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
-            }
-            exchanges.put(exchange.getExchangeId(), exchange);
-            cont.resume();
-            if (!cont.isResumed()) {
+
+        if (!cont.isPending()) {
+            isSTFlow = true;
+        } else {
+            isSTFlow = false;
+            synchronized (cont) {
+                if (locks.remove(exchange.getExchangeId()) == null) {
+                    throw new Exception("HTTP request has timed out");
+                }
                 if (log.isDebugEnabled()) {
-                    log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
+                    log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
+                }
+                exchanges.put(exchange.getExchangeId(), exchange);
+                cont.resume();
+                if (!cont.isResumed()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
+                    }
+                    exchanges.remove(exchange.getExchangeId());
+                    throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
                 }
-                exchanges.remove(exchange.getExchangeId());
-                throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
             }
         }
     }
@@ -168,15 +175,22 @@
                 request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
                 synchronized (cont) {
                     channel.send(exchange);
-                    if (log.isDebugEnabled()) {
-                        log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
-                    }
-                    boolean result = cont.suspend(suspentionTime);
-                    exchange = exchanges.remove(exchange.getExchangeId());
-                    request.removeAttribute(MessageExchange.class.getName());
-                    if (!result) {
-                        locks.remove(exchange.getExchangeId());
-                        throw new Exception("Exchange timed out");
+                    if (!isSTFlow) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
+                        }
+                        boolean result = cont.suspend(suspentionTime);
+                        exchange = exchanges.remove(exchange.getExchangeId());
+                        request.removeAttribute(MessageExchange.class.getName());
+                        if (!result) {
+                            locks.remove(exchange.getExchangeId());
+                            throw new Exception("Exchange timed out");
+                        }
+                    } else {
+                        String id = (String) request.getAttribute(MessageExchange.class.getName());
+                        locks.remove(id);
+                        exchange = exchanges.remove(id);
+                        request.removeAttribute(MessageExchange.class.getName());
                     }
                 }
             } catch (RetryRequest retry) {



Re: svn commit: r760291 - /servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Posted by Freeman Fang <fr...@gmail.com>.
Sure, will commit soon.
Thanks
Freeman
Guillaume Nodet wrote:
> Freeman, I think this modification should be done on the
> ConsumerEndpoint too in servicemix-http...
>
> On Tue, Mar 31, 2009 at 04:13,  <ff...@apache.org> wrote:
>   
>> Author: ffang
>> Date: Tue Mar 31 02:13:00 2009
>> New Revision: 760291
>>
>> URL: http://svn.apache.org/viewvc?rev=760291&view=rev
>> Log:
>> [SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc
>>
>> Modified:
>>    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>>
>> Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>> URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
>> ==============================================================================
>> --- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
>> +++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Tue Mar 31 02:13:00 2009
>> @@ -71,6 +71,7 @@
>>     protected Map<String, MessageExchange> exchanges;
>>     protected int suspentionTime = 60000;
>>     protected boolean started = false;
>> +    private boolean isSTFlow;
>>
>>     public ConsumerProcessor(HttpEndpoint endpoint) {
>>         super(endpoint);
>> @@ -96,21 +97,27 @@
>>         if (cont == null) {
>>             throw new Exception("HTTP request has timed out");
>>         }
>> -        synchronized (cont) {
>> -            if (locks.remove(exchange.getExchangeId()) == null) {
>> -                throw new Exception("HTTP request has timed out");
>> -            }
>> -            if (log.isDebugEnabled()) {
>> -                log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
>> -            }
>> -            exchanges.put(exchange.getExchangeId(), exchange);
>> -            cont.resume();
>> -            if (!cont.isResumed()) {
>> +
>> +        if (!cont.isPending()) {
>> +            isSTFlow = true;
>> +        } else {
>> +            isSTFlow = false;
>> +            synchronized (cont) {
>> +                if (locks.remove(exchange.getExchangeId()) == null) {
>> +                    throw new Exception("HTTP request has timed out");
>> +                }
>>                 if (log.isDebugEnabled()) {
>> -                    log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
>> +                    log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
>> +                }
>> +                exchanges.put(exchange.getExchangeId(), exchange);
>> +                cont.resume();
>> +                if (!cont.isResumed()) {
>> +                    if (log.isDebugEnabled()) {
>> +                        log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
>> +                    }
>> +                    exchanges.remove(exchange.getExchangeId());
>> +                    throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
>>                 }
>> -                exchanges.remove(exchange.getExchangeId());
>> -                throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
>>             }
>>         }
>>     }
>> @@ -168,15 +175,22 @@
>>                 request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
>>                 synchronized (cont) {
>>                     channel.send(exchange);
>> -                    if (log.isDebugEnabled()) {
>> -                        log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
>> -                    }
>> -                    boolean result = cont.suspend(suspentionTime);
>> -                    exchange = exchanges.remove(exchange.getExchangeId());
>> -                    request.removeAttribute(MessageExchange.class.getName());
>> -                    if (!result) {
>> -                        locks.remove(exchange.getExchangeId());
>> -                        throw new Exception("Exchange timed out");
>> +                    if (!isSTFlow) {
>> +                        if (log.isDebugEnabled()) {
>> +                            log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
>> +                        }
>> +                        boolean result = cont.suspend(suspentionTime);
>> +                        exchange = exchanges.remove(exchange.getExchangeId());
>> +                        request.removeAttribute(MessageExchange.class.getName());
>> +                        if (!result) {
>> +                            locks.remove(exchange.getExchangeId());
>> +                            throw new Exception("Exchange timed out");
>> +                        }
>> +                    } else {
>> +                        String id = (String) request.getAttribute(MessageExchange.class.getName());
>> +                        locks.remove(id);
>> +                        exchange = exchanges.remove(id);
>> +                        request.removeAttribute(MessageExchange.class.getName());
>>                     }
>>                 }
>>             } catch (RetryRequest retry) {
>>
>>
>>
>>     
>
>
>
>   


Re: svn commit: r760291 - /servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Posted by Guillaume Nodet <gn...@gmail.com>.
Freeman, I think this modification should be done on the
ConsumerEndpoint too in servicemix-http...

On Tue, Mar 31, 2009 at 04:13,  <ff...@apache.org> wrote:
> Author: ffang
> Date: Tue Mar 31 02:13:00 2009
> New Revision: 760291
>
> URL: http://svn.apache.org/viewvc?rev=760291&view=rev
> Log:
> [SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc
>
> Modified:
>    servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>
> Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
> URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
> ==============================================================================
> --- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
> +++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Tue Mar 31 02:13:00 2009
> @@ -71,6 +71,7 @@
>     protected Map<String, MessageExchange> exchanges;
>     protected int suspentionTime = 60000;
>     protected boolean started = false;
> +    private boolean isSTFlow;
>
>     public ConsumerProcessor(HttpEndpoint endpoint) {
>         super(endpoint);
> @@ -96,21 +97,27 @@
>         if (cont == null) {
>             throw new Exception("HTTP request has timed out");
>         }
> -        synchronized (cont) {
> -            if (locks.remove(exchange.getExchangeId()) == null) {
> -                throw new Exception("HTTP request has timed out");
> -            }
> -            if (log.isDebugEnabled()) {
> -                log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
> -            }
> -            exchanges.put(exchange.getExchangeId(), exchange);
> -            cont.resume();
> -            if (!cont.isResumed()) {
> +
> +        if (!cont.isPending()) {
> +            isSTFlow = true;
> +        } else {
> +            isSTFlow = false;
> +            synchronized (cont) {
> +                if (locks.remove(exchange.getExchangeId()) == null) {
> +                    throw new Exception("HTTP request has timed out");
> +                }
>                 if (log.isDebugEnabled()) {
> -                    log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
> +                    log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
> +                }
> +                exchanges.put(exchange.getExchangeId(), exchange);
> +                cont.resume();
> +                if (!cont.isResumed()) {
> +                    if (log.isDebugEnabled()) {
> +                        log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
> +                    }
> +                    exchanges.remove(exchange.getExchangeId());
> +                    throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
>                 }
> -                exchanges.remove(exchange.getExchangeId());
> -                throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
>             }
>         }
>     }
> @@ -168,15 +175,22 @@
>                 request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
>                 synchronized (cont) {
>                     channel.send(exchange);
> -                    if (log.isDebugEnabled()) {
> -                        log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
> -                    }
> -                    boolean result = cont.suspend(suspentionTime);
> -                    exchange = exchanges.remove(exchange.getExchangeId());
> -                    request.removeAttribute(MessageExchange.class.getName());
> -                    if (!result) {
> -                        locks.remove(exchange.getExchangeId());
> -                        throw new Exception("Exchange timed out");
> +                    if (!isSTFlow) {
> +                        if (log.isDebugEnabled()) {
> +                            log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
> +                        }
> +                        boolean result = cont.suspend(suspentionTime);
> +                        exchange = exchanges.remove(exchange.getExchangeId());
> +                        request.removeAttribute(MessageExchange.class.getName());
> +                        if (!result) {
> +                            locks.remove(exchange.getExchangeId());
> +                            throw new Exception("Exchange timed out");
> +                        }
> +                    } else {
> +                        String id = (String) request.getAttribute(MessageExchange.class.getName());
> +                        locks.remove(id);
> +                        exchange = exchanges.remove(id);
> +                        request.removeAttribute(MessageExchange.class.getName());
>                     }
>                 }
>             } catch (RetryRequest retry) {
>
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com