You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/03/22 14:45:10 UTC
svn commit: r521263 - in
/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms:
AbstractJMSFlow.java JMSFlow.java JMSFlowTibco.java
Author: gnodet
Date: Thu Mar 22 06:45:09 2007
New Revision: 521263
URL: http://svn.apache.org/viewvc?view=rev&rev=521263
Log:
SM-893: Fix synchronization problems in JMS flow
Modified:
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java?view=diff&rev=521263&r1=521262&r2=521263
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java Thu Mar 22 06:45:09 2007
@@ -76,18 +76,8 @@
private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
- private MessageProducer queueProducer;
-
- private MessageProducer topicProducer;
-
- protected Topic broadcastTopic;
-
- protected Session broadcastSession;
-
private MessageConsumer broadcastConsumer;
- private Session inboundSession;
-
protected Set subscriberSet = new CopyOnWriteArraySet();
private Map consumerMap = new ConcurrentHashMap();
@@ -231,15 +221,10 @@
}
connection.setClientID(broker.getContainer().getName());
connection.start();
- inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName());
MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
inboundQueue.setMessageListener(this);
- queueProducer = inboundSession.createProducer(null);
- broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
- topicProducer = broadcastSession.createProducer(broadcastTopic);
- topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
} catch (JMSException e) {
log.error("Failed to initialize JMSFlow", e);
throw new JBIException(e);
@@ -271,6 +256,8 @@
log.debug(broker.getContainer().getName() + ": Starting jms flow");
super.start();
try {
+ Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
broadcastConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
@@ -373,15 +360,14 @@
try {
String key = EndpointSupport.getKey(event.getEndpoint());
if (!consumerMap.containsKey(key)) {
+ Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
MessageConsumer consumer = inboundSession.createConsumer(queue);
consumer.setMessageListener(this);
consumerMap.put(key, consumer);
}
if (broadcast) {
- log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
- ObjectMessage msg = broadcastSession.createObjectMessage(event);
- topicProducer.send(msg);
+ broadcast(event);
}
} catch (Exception e) {
log.error("Cannot create consumer for " + event.getEndpoint(), e);
@@ -396,14 +382,28 @@
consumer.close();
}
if (broadcast) {
- ObjectMessage msg = broadcastSession.createObjectMessage(event);
- log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
- topicProducer.send(msg);
+ broadcast(event);
}
} catch (Exception e) {
log.error("Cannot destroy consumer for " + event, e);
}
}
+
+ protected void broadcast(EndpointEvent event) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
+ }
+ Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try {
+ ObjectMessage msg = broadcastSession.createObjectMessage(event);
+ Topic broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
+ MessageProducer topicProducer = broadcastSession.createProducer(broadcastTopic);
+ topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ topicProducer.send(msg);
+ } finally {
+ broadcastSession.close();
+ }
+ }
public void onComponentStarted(ComponentEvent event) {
if (!started.get()) {
@@ -412,6 +412,7 @@
try {
String key = event.getComponent().getName();
if (!consumerMap.containsKey(key)) {
+ Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
MessageConsumer consumer = inboundSession.createConsumer(queue);
consumer.setMessageListener(this);
@@ -493,15 +494,21 @@
}
}
- Queue queue = inboundSession.createQueue(destination);
- ObjectMessage msg = inboundSession.createObjectMessage(me);
- queueProducer.send(queue, msg);
+ Session inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try {
+ Queue queue = inboundSession.createQueue(destination);
+ ObjectMessage msg = inboundSession.createObjectMessage(me);
+ MessageProducer queueProducer = inboundSession.createProducer(queue);
+ queueProducer.send(msg);
+ } finally {
+ inboundSession.close();
+ }
} catch (JMSException e) {
log.error("Failed to send exchange: " + me + " internal JMS Network", e);
throw new MessagingException(e);
}
}
-
+
/**
* MessageListener implementation
*
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?view=diff&rev=521263&r1=521262&r2=521263
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Thu Mar 22 06:45:09 2007
@@ -20,15 +20,16 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+import javax.jms.Session;
import javax.jms.Topic;
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.pool.PooledConnectionFactory;
/**
* Use for message routing among a network of containers. All
@@ -40,7 +41,7 @@
public class JMSFlow extends AbstractJMSFlow {
protected ConnectionFactory createConnectionFactoryFromUrl(String jmsURL) {
- return (jmsURL != null) ? new ActiveMQConnectionFactory(jmsURL) : new ActiveMQConnectionFactory();
+ return (jmsURL != null) ? new PooledConnectionFactory(jmsURL) : new PooledConnectionFactory();
}
/**
@@ -61,6 +62,8 @@
}
public void startConsumerMonitor() throws JMSException {
+ Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic broadcastTopic = broadcastSession.createTopic(getBroadcastDestinationName());
Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
monitorMessageConsumer = broadcastSession.createConsumer(advisoryTopic);
monitorMessageConsumer.setMessageListener(new MessageListener() {
Modified: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java?view=diff&rev=521263&r1=521262&r2=521263
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java (original)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTibco.java Thu Mar 22 06:45:09 2007
@@ -22,6 +22,7 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+import javax.jms.Session;
import javax.jms.Topic;
/**
@@ -70,7 +71,7 @@
String connectionId = "" + message.getLongProperty(PROPERTY_NAME_CONN_CONNID);
String targetDestName = message.getStringProperty(PROPERTY_NAME_TARGET_DEST_NAME);
String eventClass = message.getStringProperty(PROPERTY_NAME_EVENT_CLASS);
- if (broadcastTopic.getTopicName().equals(targetDestName)) {
+ if (getBroadcastDestinationName().equals(targetDestName)) {
if (EVENT_CLASS_CONSUMER_CREATE.equals(eventClass)) {
addClusterNode(connectionId);
} else {
@@ -83,6 +84,7 @@
}
public void startConsumerMonitor() throws JMSException {
+ Session broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic createTopic = broadcastSession.createTopic(TOPIC_NAME_MONITOR_CONSUMER);
monitorMessageConsumer = broadcastSession.createConsumer(createTopic);
monitorMessageConsumer.setMessageListener(new MessageListener() {