You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/01/12 11:25:25 UTC
svn commit: r733672 -
/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
Author: gertv
Date: Mon Jan 12 02:25:21 2009
New Revision: 733672
URL: http://svn.apache.org/viewvc?rev=733672&view=rev
Log:
SM-1761: AutoDeploymentService stops working after x deployments
Modified:
servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?rev=733672&r1=733671&r2=733672&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Mon Jan 12 02:25:21 2009
@@ -74,7 +74,7 @@
private String password;
private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
private MessageConsumer broadcastConsumer;
- private Map<String, MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>();
+ private Map<String, MessageConsumerSession> consumerMap = new ConcurrentHashMap<String, MessageConsumerSession>();
private EndpointListener endpointListener;
private ComponentListener componentListener;
private Executor executor;
@@ -348,7 +348,7 @@
Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
MessageConsumer consumer = inboundSession.createConsumer(queue);
consumer.setMessageListener(this);
- consumerMap.put(key, consumer);
+ consumerMap.put(key, new MessageConsumerSession(key, this));
}
if (broadcast) {
broadcast(event);
@@ -361,7 +361,7 @@
public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
try {
String key = EndpointSupport.getKey(event.getEndpoint());
- MessageConsumer consumer = consumerMap.remove(key);
+ MessageConsumerSession consumer = consumerMap.remove(key);
if (consumer != null) {
consumer.close();
}
@@ -396,11 +396,7 @@
try {
String key = event.getComponent().getName();
if (!consumerMap.containsKey(key)) {
- Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
- MessageConsumer consumer = inboundSession.createConsumer(queue);
- consumer.setMessageListener(this);
- consumerMap.put(key, consumer);
+ consumerMap.put(key, new MessageConsumerSession(key, this));
}
} catch (Exception e) {
log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
@@ -410,7 +406,7 @@
public void onComponentStopped(ComponentEvent event) {
try {
String key = event.getComponent().getName();
- MessageConsumer consumer = consumerMap.remove(key);
+ MessageConsumerSession consumer = consumerMap.remove(key);
if (consumer != null) {
consumer.close();
}
@@ -569,5 +565,30 @@
public void setJmsURL(String jmsURL) {
this.jmsURL = jmsURL;
}
+
+ /*
+ * Creates a message consumer and holds on to both consumer and session
+ * to allow closing both of them together.
+ */
+ private final class MessageConsumerSession {
+
+ private Session session;
+ private MessageConsumer consumer;
+
+ private MessageConsumerSession(String key, MessageListener listener) throws JMSException {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(INBOUND_PREFIX + key);
+ consumer = session.createConsumer(queue);
+ consumer.setMessageListener(listener);
+ }
+ private void close() throws JMSException {
+ if (consumer != null) {
+ consumer.close();
+ }
+ if (session != null) {
+ session.close();
+ }
+ }
+ }
}