You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/12/20 00:17:58 UTC
svn commit: r488850 -
/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
Author: gnodet
Date: Tue Dec 19 15:17:57 2006
New Revision: 488850
URL: http://svn.apache.org/viewvc?view=rev&rev=488850
Log:
SM-782: jms component should be stateless when sending InOnly exchanges. Patch provided by Gregoire A
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java?view=diff&rev=488850&r1=488849&r2=488850
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java (original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingConsumerProcessor.java Tue Dec 19 15:17:57 2006
@@ -20,6 +20,7 @@
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jms.Destination;
import javax.jms.Message;
@@ -41,7 +42,7 @@
protected Session session;
protected Destination destination;
protected MessageConsumer consumer;
- protected Map pendingMessages = new ConcurrentHashMap();
+ protected Map pendingMessages;
protected DeliveryChannel channel;
public MultiplexingConsumerProcessor(JmsEndpoint endpoint) throws Exception {
@@ -66,6 +67,7 @@
}
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
+ pendingMessages = new ConcurrentHashMap();
channel = endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
}
@@ -74,6 +76,7 @@
destination = null;
consumer = null;
pendingMessages.clear();
+ pendingMessages = null;
}
public void onMessage(final Message message) {
@@ -102,27 +105,33 @@
public void process(MessageExchange exchange) throws Exception {
Context context = (Context) pendingMessages.remove(exchange.getExchangeId());
- Message message = (Message) context.getProperty(Message.class.getName());
- MessageProducer producer = null;
- Message response = null;
- try {
- response = fromNMSResponse(exchange, context, session);
- if (response != null) {
- producer = session.createProducer(message.getJMSReplyTo());
- if (endpoint.isUseMsgIdInResponse()) {
- response.setJMSCorrelationID(message.getJMSMessageID());
- } else {
- response.setJMSCorrelationID(message.getJMSCorrelationID());
+ // if context is null we lost it after a redeploy
+ // SM-782 : If exchange is InOnly and status = done > do nothing
+ if (exchange instanceof InOnly && exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ } else {
+ Message message = (Message) context.getProperty(Message.class.getName());
+ MessageProducer producer = null;
+ Message response = null;
+ try {
+ response = fromNMSResponse(exchange, context, session);
+ if (response != null) {
+ producer = session.createProducer(message.getJMSReplyTo());
+ if (endpoint.isUseMsgIdInResponse()) {
+ response.setJMSCorrelationID(message.getJMSMessageID());
+ } else {
+ response.setJMSCorrelationID(message.getJMSCorrelationID());
+ }
+ producer.send(response);
+ }
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ exchange.setStatus(ExchangeStatus.DONE);
+ channel.send(exchange);
}
- producer.send(response);
- }
- } finally {
- if (producer != null) {
- producer.close();
- }
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- exchange.setStatus(ExchangeStatus.DONE);
- channel.send(exchange);
}
}
}