You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/04/06 18:34:13 UTC
svn commit: r392010 -
/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Author: gnodet
Date: Thu Apr 6 09:34:11 2006
New Revision: 392010
URL: http://svn.apache.org/viewcvs?rev=392010&view=rev
Log:
Fix servicemix-http consumer when using jms or jca flow.
The problem is that when not using sendSync, the received exchange may not be the same as the one that was sent.
Modified:
incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
Modified: incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=392010&r1=392009&r2=392010&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Thu Apr 6 09:34:11 2006
@@ -63,6 +63,7 @@
protected SoapMarshaler soapMarshaler;
protected SoapHelper soapHelper;
protected Map locks;
+ protected Map exchanges;
public ConsumerProcessor(HttpEndpoint endpoint) {
this.endpoint = endpoint;
@@ -73,11 +74,13 @@
this.soapHelper = new SoapHelper(endpoint);
this.soapHelper.addPolicy(new AddressingHandler());
this.locks = new ConcurrentHashMap();
+ this.exchanges = new ConcurrentHashMap();
}
public void process(MessageExchange exchange) throws Exception {
Continuation cont = (Continuation) locks.remove(exchange.getExchangeId());
if (cont != null) {
+ exchanges.put(exchange.getExchangeId(), exchange);
cont.resume();
} else {
throw new IllegalStateException("Exchange not found");
@@ -138,7 +141,7 @@
NormalizedMessage inMessage = exchange.getMessage("in");
inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
locks.put(exchange.getExchangeId(), cont);
- request.setAttribute(MessageExchange.class.getName(), exchange);
+ request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
((BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle()).sendConsumerExchange(exchange, this);
// TODO: make this timeout configurable
boolean result = cont.suspend(1000 * 60); // 60 s
@@ -150,7 +153,8 @@
return;
}
} else {
- exchange = (MessageExchange) request.getAttribute(MessageExchange.class.getName());
+ String id = (String) request.getAttribute(MessageExchange.class.getName());
+ exchange = (MessageExchange) exchanges.get(id);
request.removeAttribute(MessageExchange.class.getName());
boolean result = cont.suspend(0);
// Check if this is a timeout