You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/01/12 11:25:25 UTC

svn commit: r733672 - /servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java

Author: gertv
Date: Mon Jan 12 02:25:21 2009
New Revision: 733672

URL: http://svn.apache.org/viewvc?rev=733672&view=rev
Log:
SM-1761: AutoDeploymentService stops working after x deployments

Modified:
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java

Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?rev=733672&r1=733671&r2=733672&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Mon Jan 12 02:25:21 2009
@@ -74,7 +74,7 @@
     private String password;
     private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
     private MessageConsumer broadcastConsumer;
-    private Map<String, MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>();
+    private Map<String, MessageConsumerSession> consumerMap = new ConcurrentHashMap<String, MessageConsumerSession>();
     private EndpointListener endpointListener;
     private ComponentListener componentListener;
     private Executor executor;
@@ -348,7 +348,7 @@
                 Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
                 MessageConsumer consumer = inboundSession.createConsumer(queue);
                 consumer.setMessageListener(this);
-                consumerMap.put(key, consumer);
+                consumerMap.put(key, new MessageConsumerSession(key, this));
             }
             if (broadcast) {
                 broadcast(event);
@@ -361,7 +361,7 @@
     public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
         try {
             String key = EndpointSupport.getKey(event.getEndpoint());
-            MessageConsumer consumer = consumerMap.remove(key);
+            MessageConsumerSession consumer = consumerMap.remove(key);
             if (consumer != null) {
                 consumer.close();
             }
@@ -396,11 +396,7 @@
         try {
             String key = event.getComponent().getName();
             if (!consumerMap.containsKey(key)) {
-                Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
-                MessageConsumer consumer = inboundSession.createConsumer(queue);
-                consumer.setMessageListener(this);
-                consumerMap.put(key, consumer);
+                consumerMap.put(key, new MessageConsumerSession(key, this));
             }
         } catch (Exception e) {
             log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
@@ -410,7 +406,7 @@
     public void onComponentStopped(ComponentEvent event) {
         try {
             String key = event.getComponent().getName();
-            MessageConsumer consumer = consumerMap.remove(key);
+            MessageConsumerSession consumer = consumerMap.remove(key);
             if (consumer != null) {
                 consumer.close();
             }
@@ -569,5 +565,30 @@
     public void setJmsURL(String jmsURL) {
         this.jmsURL = jmsURL;
     }
+    
+    /*
+     * Creates a message consumer and holds on to both consumer and session
+     * to allow closing both of them together.
+     */
+    private final class MessageConsumerSession {
+        
+        private Session session;
+        private MessageConsumer consumer;
+        
+        private MessageConsumerSession(String key, MessageListener listener) throws JMSException {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue(INBOUND_PREFIX + key);
+            consumer = session.createConsumer(queue);
+            consumer.setMessageListener(listener);            
+        }
 
+        private void close() throws JMSException {
+            if (consumer != null) {
+                consumer.close();
+            }
+            if (session != null) {
+                session.close();
+            }
+        }
+    }
 }