You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/04/06 18:34:13 UTC

svn commit: r392010 - /incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Author: gnodet
Date: Thu Apr  6 09:34:11 2006
New Revision: 392010

URL: http://svn.apache.org/viewcvs?rev=392010&view=rev
Log:
Fix servicemix-http consumer when using jms or jca flow.
The problem is that when not using sendSync, the received exchange may not be the same as the one that was sent.

Modified:
    incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java

Modified: incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java?rev=392010&r1=392009&r2=392010&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/servicemix-http/src/main/java/org/apache/servicemix/http/processors/ConsumerProcessor.java Thu Apr  6 09:34:11 2006
@@ -63,6 +63,7 @@
     protected SoapMarshaler soapMarshaler;
     protected SoapHelper soapHelper;
     protected Map locks;
+    protected Map exchanges;
         
     public ConsumerProcessor(HttpEndpoint endpoint) {
         this.endpoint = endpoint;
@@ -73,11 +74,13 @@
         this.soapHelper = new SoapHelper(endpoint);
         this.soapHelper.addPolicy(new AddressingHandler());
         this.locks = new ConcurrentHashMap();
+        this.exchanges = new ConcurrentHashMap();
     }
     
     public void process(MessageExchange exchange) throws Exception {
         Continuation cont = (Continuation) locks.remove(exchange.getExchangeId());
         if (cont != null) {
+            exchanges.put(exchange.getExchangeId(), exchange);
             cont.resume();
         } else {
             throw new IllegalStateException("Exchange not found");
@@ -138,7 +141,7 @@
                 NormalizedMessage inMessage = exchange.getMessage("in");
                 inMessage.setProperty(JbiConstants.PROTOCOL_HEADERS, getHeaders(request));
                 locks.put(exchange.getExchangeId(), cont);
-                request.setAttribute(MessageExchange.class.getName(), exchange);
+                request.setAttribute(MessageExchange.class.getName(), exchange.getExchangeId());
                 ((BaseLifeCycle) endpoint.getServiceUnit().getComponent().getLifeCycle()).sendConsumerExchange(exchange, this);
                 // TODO: make this timeout configurable
                 boolean result = cont.suspend(1000 * 60); // 60 s
@@ -150,7 +153,8 @@
                 return;
             }
         } else {
-            exchange = (MessageExchange) request.getAttribute(MessageExchange.class.getName());
+            String id = (String) request.getAttribute(MessageExchange.class.getName());
+            exchange = (MessageExchange) exchanges.get(id);
             request.removeAttribute(MessageExchange.class.getName());
             boolean result = cont.suspend(0); 
             // Check if this is a timeout