You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/03/22 14:45:10 UTC

svn commit: r521263 - in /incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms: AbstractJMSFlow.java JMSFlow.java JMSFlowTibco.java

Author: gnodet
Date: Thu Mar 22 06:45:09 2007
New Revision: 521263

URL: http://svn.apache.org/viewvc?view=rev&rev=521263
Log:
SM-893: Fix synchronization problems in JMS flow

Modified:
    incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
    incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
    incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?view=diff&rev=521263&r1=521262&r2=521263
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Thu Mar 22 06:45:09 2007
@@ -76,18 +76,8 @@
 
     private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
 
-    private MessageProducer queueProducer;
-
-    private MessageProducer topicProducer;
-
-    protected Topic broadcastTopic;
-
-    protected Session broadcastSession;
-
     private MessageConsumer broadcastConsumer;
 
-    private Session inboundSession;
-
     protected Set subscriberSet = new CopyOnWriteArraySet();
 
     private Map consumerMap = new ConcurrentHashMap();
@@ -231,15 +221,10 @@
             }
             connection.setClientID(broker.getContainer().getName());
             connection.start();
-            inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName());
             MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
             inboundQueue.setMessageListener(this);
-            queueProducer = inboundSession.createProducer(null);
-            broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
-            topicProducer = broadcastSession.createProducer(broadcastTopic);
-            topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         } catch (JMSException e) {
             log.error("Failed to initialize JMSFlow", e);
             throw new JBIException(e);
@@ -271,6 +256,8 @@
             log.debug(broker.getContainer().getName() + ": Starting jms flow");
             super.start();
             try {
+                Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Topic broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
                 broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
                 broadcastConsumer.setMessageListener(new MessageListener() {
                     public void onMessage(Message message) {
@@ -373,15 +360,14 @@
         try {
             String key = EndpointSupport.getKey(event.getEndpoint());
             if (!consumerMap.containsKey(key)) {
+                Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                 Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
                 MessageConsumer consumer = inboundSession.createConsumer(queue);
                 consumer.setMessageListener(this);
                 consumerMap.put(key, consumer);
             }
             if (broadcast) {
-                log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
-                ObjectMessage msg = broadcastSession.createObjectMessage(event);
-                topicProducer.send(msg);
+                broadcast(event);
             }
         } catch (Exception e) {
             log.error("Cannot create consumer for " + event.getEndpoint(), e);
@@ -396,14 +382,28 @@
                 consumer.close();
             }
             if (broadcast) {
-                ObjectMessage msg = broadcastSession.createObjectMessage(event);
-                log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
-                topicProducer.send(msg);
+                broadcast(event);
             }
         } catch (Exception e) {
             log.error("Cannot destroy consumer for " + event, e);
         }
     }
+    
+    protected void broadcast(EndpointEvent event) throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
+        }
+        Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try {
+            ObjectMessage msg = broadcastSession.createObjectMessage(event);
+            Topic broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
+            MessageProducer topicProducer = broadcastSession.createProducer(broadcastTopic);
+            topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            topicProducer.send(msg);
+        } finally {
+            broadcastSession.close();
+        }
+    }
 
     public void onComponentStarted(ComponentEvent event) {
         if (!started.get()) {
@@ -412,6 +412,7 @@
         try {
             String key = event.getComponent().getName();
             if (!consumerMap.containsKey(key)) {
+                Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                 Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
                 MessageConsumer consumer = inboundSession.createConsumer(queue);
                 consumer.setMessageListener(this);
@@ -493,15 +494,21 @@
                 }
             }
 
-            Queue queue = inboundSession.createQueue(destination);
-            ObjectMessage msg = inboundSession.createObjectMessage(me);
-            queueProducer.send(queue, msg);
+            Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            try {
+                Queue queue = inboundSession.createQueue(destination);
+                ObjectMessage msg = inboundSession.createObjectMessage(me);
+                MessageProducer queueProducer = inboundSession.createProducer(queue);
+                queueProducer.send(msg);
+            } finally {
+                inboundSession.close();
+            }
         } catch (JMSException e) {
             log.error("Failed to send exchange: " + me + " internal JMS Network", e);
             throw new MessagingException(e);
         }
     }
-
+    
     /**
      * MessageListener implementation
      * 

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?view=diff&rev=521263&r1=521262&r2=521263
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Thu Mar 22 06:45:09 2007
@@ -20,15 +20,16 @@
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import javax.jms.Session;
 import javax.jms.Topic;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.pool.PooledConnectionFactory;
 
 /**
  * Use for message routing among a network of containers. All
@@ -40,7 +41,7 @@
 public class JMSFlow extends AbstractJMSFlow {
 
     protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL) {
-        return (jmsURL != null) ? new ActiveMQConnectionFactory(jmsURL) : new ActiveMQConnectionFactory();
+        return (jmsURL != null) ? new PooledConnectionFactory(jmsURL) : new PooledConnectionFactory();
     }
 
     /**
@@ -61,6 +62,8 @@
     }
 
     public void startConsumerMonitor() throws JMSException {
+        Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic broadcastTopic = broadcastSession.createTopic(getBroadcastDestinationName());
         Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
         monitorMessageConsumer = broadcastSession.createConsumer(advisoryTopic);
         monitorMessageConsumer.setMessageListener(new MessageListener() {

Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java?view=diff&rev=521263&r1=521262&r2=521263
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java Thu Mar 22 06:45:09 2007
@@ -22,6 +22,7 @@
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import javax.jms.Session;
 import javax.jms.Topic;
 
 /**
@@ -70,7 +71,7 @@
             String connectionId = "" + message.getLongProperty(PROPERTY_NAME_CONN_CONNID);
             String targetDestName = message.getStringProperty(PROPERTY_NAME_TARGET_DEST_NAME);
             String eventClass = message.getStringProperty(PROPERTY_NAME_EVENT_CLASS);
-            if (broadcastTopic.getTopicName().equals(targetDestName)) {
+            if (getBroadcastDestinationName().equals(targetDestName)) {
                 if (EVENT_CLASS_CONSUMER_CREATE.equals(eventClass)) {
                     addClusterNode(connectionId);
                 } else {
@@ -83,6 +84,7 @@
     }
 
     public void startConsumerMonitor() throws JMSException {
+        Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic createTopic = broadcastSession.createTopic(TOPIC_NAME_MONITOR_CONSUMER);
         monitorMessageConsumer = broadcastSession.createConsumer(createTopic);
         monitorMessageConsumer.setMessageListener(new MessageListener() {