You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2003/08/27 18:26:38 UTC
cvs commit: jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger MessengerSession.java DefaultMessenger.java MessengerSupport.java
jstrachan 2003/08/27 09:26:38
Modified: messenger/src/java/org/apache/commons/messenger
MessengerSession.java DefaultMessenger.java
MessengerSupport.java
Log:
removed the caching of MessageProducer instances.
Also refactored some of the code into the DefaultMessenger implementation to make other Messenger implementations easier
Revision Changes Path
1.7 +7 -15 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java
Index: MessengerSession.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- MessengerSession.java 4 Mar 2003 10:20:59 -0000 1.6
+++ MessengerSession.java 27 Aug 2003 16:26:38 -0000 1.7
@@ -46,15 +46,15 @@
/** An optional cache of requestors */
private Map requestorsMap;
- /** the cache of producers */
- private Map producers;
-
/** The inbox which is used for the call() methods */
private Destination replyToDestination;
/** The current messenger to which I'm connected */
private MessengerSupport messenger;
+ /** The producer used to send messages using this session */
+ private MessageProducer producer;
+
public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
this.messenger = messenger;
this.sessionFactory = sessionFactory;
@@ -101,16 +101,8 @@
* @return the MessageProducer for the given destination.
*/
public MessageProducer getMessageProducer(Destination destination) throws JMSException {
- MessageProducer producer = null;
- if ( producers == null ) {
- producers = new HashMap();
- }
- else {
- producer = (MessageProducer) producers.get( destination );
- }
- if ( producer == null ) {
- producer = messenger.createMessageProducer( session, destination );
- producers.put( destination, producer );
+ if (producer == null) {
+ producer = messenger.createMessageProducer( session, null );
}
return producer;
}
1.18 +155 -1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
Index: DefaultMessenger.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- DefaultMessenger.java 20 Mar 2003 08:32:54 -0000 1.17
+++ DefaultMessenger.java 27 Aug 2003 16:26:38 -0000 1.18
@@ -58,14 +58,19 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
+import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
@@ -133,6 +138,94 @@
public Session getAsyncSession() throws JMSException {
return getMessengerSession().getListenerSession();
}
+
+ public Message call(Destination destination, Message message)
+ throws JMSException {
+ Session session = borrowSession();
+ MessageProducer producer = null;
+ try {
+ Destination replyTo = getReplyToDestination();
+ message.setJMSReplyTo(replyTo);
+
+ // NOTE - we could consider adding a correlation ID per request so that we can ignore
+ // any cruft or old messages that are sent onto our inbound queue.
+ //
+ // Though that does mean that we must then rely on the inbound message having
+ // the right correlationID. Though at least this strategy would mean
+ // that we could have a single consumer on a temporary queue for all threads
+ // and use correlation IDs to dispatch to the corrent thread
+ //
+ // Maybe this should be a configurable strategy
+
+ producer = borrowMessageProducer(session, destination);
+ MessageConsumer consumer = getReplyToConsumer();
+
+ if (isTopic(producer)) {
+ ((TopicPublisher) producer).publish((Topic) destination, message);
+ }
+ else {
+ ((QueueSender) producer).send((Queue) destination, message);
+ }
+ Message response = consumer.receive();
+ if (response == null) {
+ // we could have timed out so lets trash the temporary destination
+ // so that the next call() method will use a new destination to avoid
+ // the response for this call() coming back on later call() invokcations
+ clearReplyToDestination();
+ }
+ return response;
+ }
+ finally {
+ returnMessageProducer(producer);
+ returnSession(session);
+ }
+ }
+
+ public Message call(
+ Destination destination,
+ Message message,
+ long timeoutMillis)
+ throws JMSException {
+ Session session = borrowSession();
+ MessageProducer producer = null;
+ try {
+ Destination replyTo = getReplyToDestination();
+ message.setJMSReplyTo(replyTo);
+
+ // NOTE - we could consider adding a correlation ID per request so that we can ignore
+ // any cruft or old messages that are sent onto our inbound queue.
+ //
+ // Though that does mean that we must then rely on the inbound message having
+ // the right correlationID. Though at least this strategy would mean
+ // that we could have a single consumer on a temporary queue for all threads
+ // and use correlation IDs to dispatch to the corrent thread
+ //
+ // Maybe this should be a configurable strategy
+
+ producer = borrowMessageProducer(session, destination);
+
+ MessageConsumer consumer = getReplyToConsumer();
+ if (isTopic(producer)) {
+ ((TopicPublisher) producer).publish((Topic) destination, message);
+ }
+ else {
+ ((QueueSender) producer).send((Queue) destination, message);
+ }
+ Message response = consumer.receive(timeoutMillis);
+ if (response == null) {
+ // we could have timed out so lets trash the temporary destination
+ // so that the next call() method will use a new destination to avoid
+ // the response for this call() coming back on later call() invokcations
+ clearReplyToDestination();
+ }
+ return response;
+ }
+ finally {
+ returnMessageProducer(producer);
+ returnSession(session);
+ }
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
@@ -163,8 +256,69 @@
return getMessengerSession().getListenerSession();
}
+ /** @return a message producer for the given session and destination */
+ protected MessageProducer borrowMessageProducer(
+ Session session,
+ Destination destination)
+ throws JMSException {
+
+ if (isCacheProducers()) {
+ return getMessengerSession().getMessageProducer(destination);
+ }
+ else {
+ return createMessageProducer(session, destination);
+ }
+ }
+
+ protected void returnMessageProducer(MessageProducer producer)
+ throws JMSException {
+ if (!isCacheProducers()) {
+ producer.close();
+ }
+ }
+
protected void returnListenerSession(Session session) throws JMSException {
}
+
+ /**
+ * @return the MessageConsumer for this threads temporary destination
+ * which is cached for the duration of this process.
+ */
+ protected MessageConsumer getReplyToConsumer() throws JMSException {
+ MessengerSession messengerSession = getMessengerSession();
+ MessageConsumer consumer = messengerSession.getReplyToConsumer();
+ if (consumer == null) {
+ consumer =
+ createMessageConsumer(
+ messengerSession.getSession(),
+ messengerSession.getReplyToDestination());
+ messengerSession.setReplyToConsumer(consumer);
+ }
+ return consumer;
+ }
+
+ /**
+ * Clears the temporary destination used to receive reply-to messages
+ * which will lazily force a new destination and consumer to be created next
+ * time a call() method is invoked.
+ */
+ protected void clearReplyToDestination() throws JMSException {
+ MessengerSession messengerSession = getMessengerSession();
+
+ messengerSession.setReplyToDestination(null);
+ MessageConsumer consumer = messengerSession.getReplyToConsumer();
+ if (consumer != null) {
+ messengerSession.setReplyToConsumer(null);
+
+ // ensure that everything is nullified first before we close
+ // just in case an exception occurs
+ consumer.close();
+ }
+ }
+
+ protected Destination getReplyToDestination() throws JMSException {
+ return getMessengerSession().getReplyToDestination();
+ }
/**
* @return the current thread's MessengerSession
1.36 +9 -153 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
Index: MessengerSupport.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -r1.35 -r1.36
--- MessengerSupport.java 6 Jun 2003 10:51:08 -0000 1.35
+++ MessengerSupport.java 27 Aug 2003 16:26:38 -0000 1.36
@@ -154,10 +154,10 @@
try {
producer = borrowMessageProducer(session, destination);
if (isTopic(producer)) {
- ((TopicPublisher) producer).publish(message);
+ ((TopicPublisher) producer).publish((Topic) destination, message);
}
else {
- ((QueueSender) producer).send(message);
+ ((QueueSender) producer).send((Queue) destination, message);
}
}
finally {
@@ -166,93 +166,6 @@
}
}
- public Message call(Destination destination, Message message)
- throws JMSException {
- Session session = borrowSession();
- MessageProducer producer = null;
- try {
- Destination replyTo = getReplyToDestination();
- message.setJMSReplyTo(replyTo);
-
- // NOTE - we could consider adding a correlation ID per request so that we can ignore
- // any cruft or old messages that are sent onto our inbound queue.
- //
- // Though that does mean that we must then rely on the inbound message having
- // the right correlationID. Though at least this strategy would mean
- // that we could have a single consumer on a temporary queue for all threads
- // and use correlation IDs to dispatch to the corrent thread
- //
- // Maybe this should be a configurable strategy
-
- producer = borrowMessageProducer(session, destination);
- MessageConsumer consumer = getReplyToConsumer();
-
- if (isTopic(producer)) {
- ((TopicPublisher) producer).publish(message);
- }
- else {
- ((QueueSender) producer).send(message);
- }
- Message response = consumer.receive();
- if (response == null) {
- // we could have timed out so lets trash the temporary destination
- // so that the next call() method will use a new destination to avoid
- // the response for this call() coming back on later call() invokcations
- clearReplyToDestination();
- }
- return response;
- }
- finally {
- returnMessageProducer(producer);
- returnSession(session);
- }
- }
-
- public Message call(
- Destination destination,
- Message message,
- long timeoutMillis)
- throws JMSException {
- Session session = borrowSession();
- MessageProducer producer = null;
- try {
- Destination replyTo = getReplyToDestination();
- message.setJMSReplyTo(replyTo);
-
- // NOTE - we could consider adding a correlation ID per request so that we can ignore
- // any cruft or old messages that are sent onto our inbound queue.
- //
- // Though that does mean that we must then rely on the inbound message having
- // the right correlationID. Though at least this strategy would mean
- // that we could have a single consumer on a temporary queue for all threads
- // and use correlation IDs to dispatch to the corrent thread
- //
- // Maybe this should be a configurable strategy
-
- producer = borrowMessageProducer(session, destination);
-
- MessageConsumer consumer = getReplyToConsumer();
- if (isTopic(producer)) {
- ((TopicPublisher) producer).publish(message);
- }
- else {
- ((QueueSender) producer).send(message);
- }
- Message response = consumer.receive(timeoutMillis);
- if (response == null) {
- // we could have timed out so lets trash the temporary destination
- // so that the next call() method will use a new destination to avoid
- // the response for this call() coming back on later call() invokcations
- clearReplyToDestination();
- }
- return response;
- }
- finally {
- returnMessageProducer(producer);
- returnSession(session);
- }
- }
-
public Message receive(Destination destination) throws JMSException {
Session session = borrowSession();
MessageConsumer consumer = null;
@@ -758,6 +671,7 @@
producer = borrowMessageProducer(session, destination);
if (isTopic(producer)) {
((TopicPublisher) producer).publish(
+ (Topic) destination,
message,
deliveryMode,
priority,
@@ -765,6 +679,7 @@
}
else {
((QueueSender) producer).send(
+ (Queue) destination,
message,
deliveryMode,
priority,
@@ -947,32 +862,12 @@
protected abstract boolean isTopic(MessageProducer producer)
throws JMSException;
- /**
- * @return the current thread's MessengerSession
- */
- protected abstract MessengerSession getMessengerSession()
- throws JMSException;
-
/** @return a message producer for the given session and destination */
- protected MessageProducer borrowMessageProducer(
+ protected abstract MessageProducer borrowMessageProducer(
Session session,
- Destination destination)
- throws JMSException {
+ Destination destination) throws JMSException;
- if (isCacheProducers()) {
- return getMessengerSession().getMessageProducer(destination);
- }
- else {
- return createMessageProducer(session, destination);
- }
- }
-
- protected void returnMessageProducer(MessageProducer producer)
- throws JMSException {
- if (!isCacheProducers()) {
- producer.close();
- }
- }
+ protected abstract void returnMessageProducer(MessageProducer producer) throws JMSException;
/** @return a newly created message producer for the given session and destination */
protected MessageProducer createMessageProducer(
@@ -997,41 +892,6 @@
return answer;
}
- /**
- * @return the MessageConsumer for this threads temporary destination
- * which is cached for the duration of this process.
- */
- protected MessageConsumer getReplyToConsumer() throws JMSException {
- MessengerSession messengerSession = getMessengerSession();
- MessageConsumer consumer = messengerSession.getReplyToConsumer();
- if (consumer == null) {
- consumer =
- createMessageConsumer(
- messengerSession.getSession(),
- messengerSession.getReplyToDestination());
- messengerSession.setReplyToConsumer(consumer);
- }
- return consumer;
- }
-
- /**
- * Clears the temporary destination used to receive reply-to messages
- * which will lazily force a new destination and consumer to be created next
- * time a call() method is invoked.
- */
- protected void clearReplyToDestination() throws JMSException {
- MessengerSession messengerSession = getMessengerSession();
-
- messengerSession.setReplyToDestination(null);
- MessageConsumer consumer = messengerSession.getReplyToConsumer();
- if (consumer != null) {
- messengerSession.setReplyToConsumer(null);
-
- // ensure that everything is nullified first before we close
- // just in case an exception occurs
- consumer.close();
- }
- }
/** @return a MessageConsumer for the given session and destination */
protected MessageConsumer borrowMessageConsumer(
@@ -1170,9 +1030,5 @@
throws JMSException {
// XXXX: might want to cache
return session.createTopic(subject);
- }
-
- protected Destination getReplyToDestination() throws JMSException {
- return getMessengerSession().getReplyToDestination();
}
}