You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2009/03/31 09:02:09 UTC
svn commit: r760345 -
/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
Author: ffang
Date: Tue Mar 31 07:02:07 2009
New Revision: 760345
URL: http://svn.apache.org/viewvc?rev=760345&view=rev
Log:
[SMXCOMP-493]also apply fix for HttpConsumerEndpoint
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=760345&r1=760344&r2=760345&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Tue Mar 31 07:02:07 2009
@@ -77,6 +77,7 @@
private Map<String, Continuation> locks = new ConcurrentHashMap<String, Continuation>();
private Map<String, MessageExchange> exchanges = new ConcurrentHashMap<String, MessageExchange>();
private Object httpContext;
+ private boolean isSTFlow;
public HttpConsumerEndpoint() {
super();
@@ -227,24 +228,36 @@
if (cont == null) {
throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
}
- // synchronized block
- synchronized (cont) {
- if (locks.remove(exchange.getExchangeId()) == null) {
- throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Resuming continuation for exchange: " + exchange.getExchangeId());
- }
- // Put the new exchange
- exchanges.put(exchange.getExchangeId(), exchange);
- // Resume continuation
- cont.resume();
- if (!cont.isResumed()) {
+ if (!cont.isPending()) {
+ isSTFlow = true;
+ } else {
+ isSTFlow = false;
+ // synchronized block
+ synchronized (cont) {
+ if (locks.remove(exchange.getExchangeId()) == null) {
+ throw new Exception(
+ "HTTP request has timed out for exchange: "
+ + exchange.getExchangeId());
+ }
if (logger.isDebugEnabled()) {
- logger.debug("Could not resume continuation for exchange: " + exchange.getExchangeId());
+ logger.debug("Resuming continuation for exchange: "
+ + exchange.getExchangeId());
+ }
+ // Put the new exchange
+ exchanges.put(exchange.getExchangeId(), exchange);
+ // Resume continuation
+ cont.resume();
+ if (!cont.isResumed()) {
+ if (logger.isDebugEnabled()) {
+ logger
+ .debug("Could not resume continuation for exchange: "
+ + exchange.getExchangeId());
+ }
+ exchanges.remove(exchange.getExchangeId());
+ throw new Exception(
+ "HTTP request has timed out for exchange: "
+ + exchange.getExchangeId());
}
- exchanges.remove(exchange.getExchangeId());
- throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
}
}
}
@@ -277,32 +290,59 @@
synchronized (cont) {
// Send the exchange
send(exchange);
- if (logger.isDebugEnabled()) {
- logger.debug("Suspending continuation for exchange: " + exchange.getExchangeId());
- }
- // Suspend the continuation for the configured timeout
- // If a SelectConnector is used, the call to suspend will throw a RetryRequest exception
- // else, the call will block until the continuation is resumed
- long to = this.timeout;
- if (to == 0) {
- to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
- }
- boolean result = cont.suspend(to);
- // The call has not thrown a RetryRequest, which means we don't use a SelectConnector
- // and we must handle the exchange in this very method call.
- // If result is false, the continuation has timed out.
- // So get the exchange (in case the object has changed) and remove it from the map
- exchange = exchanges.remove(exchange.getExchangeId());
- // remove the exchange id from the request as we don't need it anymore
- request.removeAttribute(MessageExchange.class.getName());
- // If a timeout occurred, throw an exception that will be sent back to the HTTP client
- // Whenever the exchange comes back, the process(MessageExchange) method will thrown an
- // exception and the exchange will be set in an ERROR status
- if (!result) {
- // Remove the continuation from the map.
- // This indicates the continuation has been fully processed
- locks.remove(exchange.getExchangeId());
- throw new Exception("Exchange timed out");
+ if (!isSTFlow) {
+ if (logger.isDebugEnabled()) {
+ logger
+ .debug("Suspending continuation for exchange: "
+ + exchange.getExchangeId());
+ }
+ // Suspend the continuation for the configured timeout
+ // If a SelectConnector is used, the call to suspend
+ // will throw a RetryRequest exception
+ // else, the call will block until the continuation is
+ // resumed
+ long to = this.timeout;
+ if (to == 0) {
+ to = ((HttpComponent) getServiceUnit()
+ .getComponent()).getConfiguration()
+ .getConsumerProcessorSuspendTime();
+ }
+ boolean result = cont.suspend(to);
+ // The call has not thrown a RetryRequest, which means
+ // we don't use a SelectConnector
+ // and we must handle the exchange in this very method
+ // call.
+ // If result is false, the continuation has timed out.
+ // So get the exchange (in case the object has changed)
+ // and remove it from the map
+ exchange = exchanges.remove(exchange.getExchangeId());
+ // remove the exchange id from the request as we don't
+ // need it anymore
+ request
+ .removeAttribute(MessageExchange.class
+ .getName());
+ // If a timeout occurred, throw an exception that will
+ // be sent back to the HTTP client
+ // Whenever the exchange comes back, the
+ // process(MessageExchange) method will thrown an
+ // exception and the exchange will be set in an ERROR
+ // status
+ if (!result) {
+ // Remove the continuation from the map.
+ // This indicates the continuation has been fully
+ // processed
+ locks.remove(exchange.getExchangeId());
+ throw new Exception("Exchange timed out");
+ }
+ } else {
+ String id = (String) request
+ .getAttribute(MessageExchange.class.getName());
+ locks.remove(id);
+ exchange = exchanges.remove(id);
+ request
+ .removeAttribute(MessageExchange.class
+ .getName());
+
}
}
// The continuation is a retry.