You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2009/03/31 04:13:01 UTC
svn commit: r760291 -
/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Author: ffang
Date: Tue Mar 31 02:13:00 2009
New Revision: 760291
URL: http://svn.apache.org/viewvc?rev=760291&view=rev
Log:
[SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc
Modified:
servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Tue Mar 31 02:13:00 2009
@@ -71,6 +71,7 @@
protected Map<String, MessageExchange> exchanges;
protected int suspentionTime = 60000;
protected boolean started = false;
+ private boolean isSTFlow;
public ConsumerProcessor(HttpEndpoint endpoint) {
super(endpoint);
@@ -96,21 +97,27 @@
if (cont == null) {
throw new Exception("HTTP request has timed out");
}
- synchronized (cont) {
- if (locks.remove(exchange.getExchangeId()) == null) {
- throw new Exception("HTTP request has timed out");
- }
- if (log.isDebugEnabled()) {
- log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
- }
- exchanges.put(exchange.getExchangeId(), exchange);
- cont.resume();
- if (!cont.isResumed()) {
+
+ if (!cont.isPending()) {
+ isSTFlow = true;
+ } else {
+ isSTFlow = false;
+ synchronized (cont) {
+ if (locks.remove(exchange.getExchangeId()) == null) {
+ throw new Exception("HTTP request has timed out");
+ }
if (log.isDebugEnabled()) {
- log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
+ log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
+ }
+ exchanges.put(exchange.getExchangeId(), exchange);
+ cont.resume();
+ if (!cont.isResumed()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
+ }
+ exchanges.remove(exchange.getExchangeId());
+ throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
}
- exchanges.remove(exchange.getExchangeId());
- throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
}
}
}
@@ -168,15 +175,22 @@
request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
synchronized (cont) {
channel.send(exchange);
- if (log.isDebugEnabled()) {
- log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
- }
- boolean result = cont.suspend(suspentionTime);
- exchange = exchanges.remove(exchange.getExchangeId());
- request.removeAttribute(MessageExchange.class.getName());
- if (!result) {
- locks.remove(exchange.getExchangeId());
- throw new Exception("Exchange timed out");
+ if (!isSTFlow) {
+ if (log.isDebugEnabled()) {
+ log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
+ }
+ boolean result = cont.suspend(suspentionTime);
+ exchange = exchanges.remove(exchange.getExchangeId());
+ request.removeAttribute(MessageExchange.class.getName());
+ if (!result) {
+ locks.remove(exchange.getExchangeId());
+ throw new Exception("Exchange timed out");
+ }
+ } else {
+ String id = (String) request.getAttribute(MessageExchange.class.getName());
+ locks.remove(id);
+ exchange = exchanges.remove(id);
+ request.removeAttribute(MessageExchange.class.getName());
}
}
} catch (RetryRequest retry) {
Re: svn commit: r760291 - /servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Posted by Freeman Fang <fr...@gmail.com>.
Sure, will commit soon.
Thanks
Freeman
Guillaume Nodet wrote:
> Freeman, I think this modification should be done on the
> ConsumerEndpoint too in servicemix-http...
>
> On Tue, Mar 31, 2009 at 04:13, <ff...@apache.org> wrote:
>
>> Author: ffang
>> Date: Tue Mar 31 02:13:00 2009
>> New Revision: 760291
>>
>> URL: http://svn.apache.org/viewvc?rev=760291&view=rev
>> Log:
>> [SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc
>>
>> Modified:
>> servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>>
>> Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>> URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
>> ==============================================================================
>> --- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
>> +++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Tue Mar 31 02:13:00 2009
>> @@ -71,6 +71,7 @@
>> protected Map<String, MessageExchange> exchanges;
>> protected int suspentionTime = 60000;
>> protected boolean started = false;
>> + private boolean isSTFlow;
>>
>> public ConsumerProcessor(HttpEndpoint endpoint) {
>> super(endpoint);
>> @@ -96,21 +97,27 @@
>> if (cont == null) {
>> throw new Exception("HTTP request has timed out");
>> }
>> - synchronized (cont) {
>> - if (locks.remove(exchange.getExchangeId()) == null) {
>> - throw new Exception("HTTP request has timed out");
>> - }
>> - if (log.isDebugEnabled()) {
>> - log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
>> - }
>> - exchanges.put(exchange.getExchangeId(), exchange);
>> - cont.resume();
>> - if (!cont.isResumed()) {
>> +
>> + if (!cont.isPending()) {
>> + isSTFlow = true;
>> + } else {
>> + isSTFlow = false;
>> + synchronized (cont) {
>> + if (locks.remove(exchange.getExchangeId()) == null) {
>> + throw new Exception("HTTP request has timed out");
>> + }
>> if (log.isDebugEnabled()) {
>> - log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
>> + log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
>> + }
>> + exchanges.put(exchange.getExchangeId(), exchange);
>> + cont.resume();
>> + if (!cont.isResumed()) {
>> + if (log.isDebugEnabled()) {
>> + log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
>> + }
>> + exchanges.remove(exchange.getExchangeId());
>> + throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
>> }
>> - exchanges.remove(exchange.getExchangeId());
>> - throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
>> }
>> }
>> }
>> @@ -168,15 +175,22 @@
>> request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
>> synchronized (cont) {
>> channel.send(exchange);
>> - if (log.isDebugEnabled()) {
>> - log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
>> - }
>> - boolean result = cont.suspend(suspentionTime);
>> - exchange = exchanges.remove(exchange.getExchangeId());
>> - request.removeAttribute(MessageExchange.class.getName());
>> - if (!result) {
>> - locks.remove(exchange.getExchangeId());
>> - throw new Exception("Exchange timed out");
>> + if (!isSTFlow) {
>> + if (log.isDebugEnabled()) {
>> + log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
>> + }
>> + boolean result = cont.suspend(suspentionTime);
>> + exchange = exchanges.remove(exchange.getExchangeId());
>> + request.removeAttribute(MessageExchange.class.getName());
>> + if (!result) {
>> + locks.remove(exchange.getExchangeId());
>> + throw new Exception("Exchange timed out");
>> + }
>> + } else {
>> + String id = (String) request.getAttribute(MessageExchange.class.getName());
>> + locks.remove(id);
>> + exchange = exchanges.remove(id);
>> + request.removeAttribute(MessageExchange.class.getName());
>> }
>> }
>> } catch (RetryRequest retry) {
>>
>>
>>
>>
>
>
>
>
Re: svn commit: r760291 - /servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Posted by Guillaume Nodet <gn...@gmail.com>.
Freeman, I think this modification should be done on the
ConsumerEndpoint too in servicemix-http...
On Tue, Mar 31, 2009 at 04:13, <ff...@apache.org> wrote:
> Author: ffang
> Date: Tue Mar 31 02:13:00 2009
> New Revision: 760291
>
> URL: http://svn.apache.org/viewvc?rev=760291&view=rev
> Log:
> [SMXCOMP-493]STFlow doesn't work with servicemix-http/servicemix-cxf-bc
>
> Modified:
> servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
>
> Modified: servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
> URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=760291&r1=760290&r2=760291&view=diff
> ==============================================================================
> --- servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
> +++ servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Tue Mar 31 02:13:00 2009
> @@ -71,6 +71,7 @@
> protected Map<String, MessageExchange> exchanges;
> protected int suspentionTime = 60000;
> protected boolean started = false;
> + private boolean isSTFlow;
>
> public ConsumerProcessor(HttpEndpoint endpoint) {
> super(endpoint);
> @@ -96,21 +97,27 @@
> if (cont == null) {
> throw new Exception("HTTP request has timed out");
> }
> - synchronized (cont) {
> - if (locks.remove(exchange.getExchangeId()) == null) {
> - throw new Exception("HTTP request has timed out");
> - }
> - if (log.isDebugEnabled()) {
> - log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
> - }
> - exchanges.put(exchange.getExchangeId(), exchange);
> - cont.resume();
> - if (!cont.isResumed()) {
> +
> + if (!cont.isPending()) {
> + isSTFlow = true;
> + } else {
> + isSTFlow = false;
> + synchronized (cont) {
> + if (locks.remove(exchange.getExchangeId()) == null) {
> + throw new Exception("HTTP request has timed out");
> + }
> if (log.isDebugEnabled()) {
> - log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
> + log.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
> + }
> + exchanges.put(exchange.getExchangeId(), exchange);
> + cont.resume();
> + if (!cont.isResumed()) {
> + if (log.isDebugEnabled()) {
> + log.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
> + }
> + exchanges.remove(exchange.getExchangeId());
> + throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
> }
> - exchanges.remove(exchange.getExchangeId());
> - throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
> }
> }
> }
> @@ -168,15 +175,22 @@
> request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
> synchronized (cont) {
> channel.send(exchange);
> - if (log.isDebugEnabled()) {
> - log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
> - }
> - boolean result = cont.suspend(suspentionTime);
> - exchange = exchanges.remove(exchange.getExchangeId());
> - request.removeAttribute(MessageExchange.class.getName());
> - if (!result) {
> - locks.remove(exchange.getExchangeId());
> - throw new Exception("Exchange timed out");
> + if (!isSTFlow) {
> + if (log.isDebugEnabled()) {
> + log.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
> + }
> + boolean result = cont.suspend(suspentionTime);
> + exchange = exchanges.remove(exchange.getExchangeId());
> + request.removeAttribute(MessageExchange.class.getName());
> + if (!result) {
> + locks.remove(exchange.getExchangeId());
> + throw new Exception("Exchange timed out");
> + }
> + } else {
> + String id = (String) request.getAttribute(MessageExchange.class.getName());
> + locks.remove(id);
> + exchange = exchanges.remove(id);
> + request.removeAttribute(MessageExchange.class.getName());
> }
> }
> } catch (RetryRequest retry) {
>
>
>
--
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com