You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/12/20 00:17:58 UTC

svn commit: r488850 - /incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java

Author: gnodet
Date: Tue Dec 19 15:17:57 2006
New Revision: 488850

URL: http://svn.apache.org/viewvc?view=rev&rev=488850
Log:
SM-782: jms component should be stateless when sending InOnly exchanges. Patch provided by Gregoire A

Modified:
    incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java

Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java?view=diff&rev=488850&r1=488849&r2=488850
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java Tue Dec 19 15:17:57 2006
@@ -20,6 +20,7 @@
 
 import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
 import javax.jms.Destination;
 import javax.jms.Message;
@@ -41,7 +42,7 @@
     protected Session session;
     protected Destination destination;
     protected MessageConsumer consumer;
-    protected Map pendingMessages = new ConcurrentHashMap();
+    protected Map pendingMessages;
     protected DeliveryChannel channel;
 
     public MultiplexingConsumerProcessor(JmsEndpoint endpoint) throws Exception {
@@ -66,6 +67,7 @@
         }
         consumer = session.createConsumer(destination);
         consumer.setMessageListener(this);
+        pendingMessages = new ConcurrentHashMap();
         channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
     }
 
@@ -74,6 +76,7 @@
         destination = null;
         consumer = null;
         pendingMessages.clear();
+        pendingMessages = null;
     }
 
     public void onMessage(final Message message) {
@@ -102,27 +105,33 @@
 
     public void process(MessageExchange exchange) throws Exception {
         Context context = (Context) pendingMessages.remove(exchange.getExchangeId());
-        Message message = (Message) context.getProperty(Message.class.getName());
-        MessageProducer producer = null;
-        Message response = null;
-        try {
-            response = fromNMSResponse(exchange, context, session);
-            if (response != null) {
-                producer = session.createProducer(message.getJMSReplyTo());
-                if (endpoint.isUseMsgIdInResponse()) {
-                    response.setJMSCorrelationID(message.getJMSMessageID());
-                } else {
-                    response.setJMSCorrelationID(message.getJMSCorrelationID());
+        // if context is null we lost it after a redeploy
+        // SM-782 : If exchange is InOnly and status = done > do nothing
+        if (exchange instanceof InOnly && exchange.getStatus() == ExchangeStatus.DONE) {
+            return;
+        } else {
+            Message message = (Message) context.getProperty(Message.class.getName());
+            MessageProducer producer = null;
+            Message response = null;
+            try {
+                response = fromNMSResponse(exchange, context, session);
+                if (response != null) {
+                    producer = session.createProducer(message.getJMSReplyTo());
+                    if (endpoint.isUseMsgIdInResponse()) {
+                        response.setJMSCorrelationID(message.getJMSMessageID());
+                    } else {
+                        response.setJMSCorrelationID(message.getJMSCorrelationID());
+                    }
+                    producer.send(response);
+                }
+            } finally {
+                if (producer != null) {
+                    producer.close();
+                }
+                if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                    exchange.setStatus(ExchangeStatus.DONE);
+                    channel.send(exchange);
                 }
-                producer.send(response);
-            }
-        } finally {
-            if (producer != null) {
-                producer.close();
-            }
-            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                exchange.setStatus(ExchangeStatus.DONE);
-                channel.send(exchange);
             }
         }
     }