You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2003/09/03 19:58:14 UTC
cvs commit: jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger TestMessenger.java
jstrachan 2003/09/03 10:58:14
Modified: messenger/src/java/org/apache/commons/messenger
MessengerSession.java Lock.java
DefaultMessenger.java MessengerSupport.java
messenger/src/test/org/apache/commons/messenger
TestMessenger.java
Added: messenger/src/java/org/apache/commons/messenger
SimpleMessenger.java
Log:
Added an implementation of a SimpleMessenger which is useful when using atomic Messenger operations (like send , receieve which are independent). Not useful for transactional stuff but can be handy for simple atomic sends / receives.
This implementation uses minimal Session objects and so can result in reduced thread creation in servlet environments
Revision Changes Path
1.8 +67 -41 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java
Index: MessengerSession.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- MessengerSession.java 27 Aug 2003 16:26:38 -0000 1.7
+++ MessengerSession.java 3 Sep 2003 17:58:14 -0000 1.8
@@ -33,18 +33,21 @@
/** the JMS Session for this thread */
private Session session;
-
+
/** the JMS Listener (async subscription) Session for this thread */
private Session listenerSession;
- /** the MessageConsumer for this threads reply to destination */
+ /** the JMS Session for blocking receive for this thread */
+ private Session receiveSession;
+
+ /** the MessageConsumer for this threads reply to destination */
private MessageConsumer replyToConsumer;
-
+
/** The factory used to create each thread's JMS Session */
private SessionFactory sessionFactory;
/** An optional cache of requestors */
- private Map requestorsMap;
+ private Map requestorsMap;
/** The inbox which is used for the call() methods */
private Destination replyToDestination;
@@ -52,7 +55,7 @@
/** The current messenger to which I'm connected */
private MessengerSupport messenger;
- /** The producer used to send messages using this session */
+ /** The producer used to send messages using this session */
private MessageProducer producer;
public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
@@ -61,29 +64,60 @@
}
public SessionFactory getSessionFactory() {
- return sessionFactory;
+ return sessionFactory;
+ }
+
+ /**
+ * Closes any sessions or producers open
+ */
+ public void close() throws JMSException {
+ if (producer != null) {
+ producer.close();
+ producer = null;
+ }
+
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ if (listenerSession != null) {
+ listenerSession.close();
+ listenerSession = null;
+ }
+ if (receiveSession != null) {
+ receiveSession.close();
+ receiveSession = null;
+ }
}
-
/**
* @return the JMS Session for this thread for synchronous mode
*/
public Session getSession() throws JMSException {
- if ( session == null ) {
+ if (session == null) {
session = createSession();
}
return session;
}
-
+
/**
* @return the JMS Session for this thread for asynchronous mode
*/
public Session getListenerSession() throws JMSException {
- if ( listenerSession == null ) {
+ if (listenerSession == null) {
listenerSession = createSession();
}
return listenerSession;
}
+ /**
+ * @return the JMS Session for this thread for blocking receive of messages
+ */
+ public Session getReceiveSession() throws JMSException {
+ if (receiveSession == null) {
+ receiveSession = createSession();
+ }
+ return receiveSession;
+ }
/**
* @return the MessageConsumer for the ReplyTo Destination for this thread
@@ -91,22 +125,21 @@
public MessageConsumer getReplyToConsumer() throws JMSException {
return replyToConsumer;
}
-
+
public void setReplyToConsumer(MessageConsumer replyToConsumer) {
this.replyToConsumer = replyToConsumer;
}
-
/**
* @return the MessageProducer for the given destination.
*/
public MessageProducer getMessageProducer(Destination destination) throws JMSException {
- if (producer == null) {
- producer = messenger.createMessageProducer( session, null );
+ if (producer == null) {
+ producer = messenger.createMessageProducer(session, null);
}
return producer;
}
-
+
/**
* @return the reply to destination (a temporary queue)
* used to reply to this thread and session
@@ -117,21 +150,18 @@
}
return replyToDestination;
}
-
+
/**
* Sets the reply to destination to use
*/
protected void setReplyToDestination(Destination replyToDestination) throws JMSException {
this.replyToDestination = replyToDestination;
}
-
+
/**
* @return either a cached TopicRequestor or creates a new one
*/
- public TopicRequestor getTopicRequestor(
- TopicSession session,
- Topic destination)
- throws JMSException {
+ public TopicRequestor getTopicRequestor(TopicSession session, Topic destination) throws JMSException {
if (messenger.isCacheRequestors()) {
TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination);
if (requestor == null) {
@@ -144,14 +174,11 @@
return new TopicRequestor(session, destination);
}
}
-
+
/**
* @return either a cached QueueRequestor or creates a new one
*/
- public QueueRequestor getQueueRequestor(
- QueueSession session,
- Queue destination)
- throws JMSException {
+ public QueueRequestor getQueueRequestor(QueueSession session, Queue destination) throws JMSException {
if (messenger.isCacheRequestors()) {
QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination);
if (requestor == null) {
@@ -165,34 +192,33 @@
}
}
-
/**
* Factory method to create a new JMS Session
*/
protected Session createSession() throws JMSException {
return getSessionFactory().createSession(messenger.getConnection());
}
-
+
/**
* Factory method to create a new temporary destination
*/
- protected Destination createTemporaryDestination() throws JMSException {
- if (messenger.isTopic(session)) {
- TopicSession topicSession = (TopicSession) session;
- return topicSession.createTemporaryTopic();
- } else {
- QueueSession queueSession = (QueueSession) session;
- return queueSession.createTemporaryQueue();
- }
- }
-
+ protected Destination createTemporaryDestination() throws JMSException {
+ if (messenger.isTopic(session)) {
+ TopicSession topicSession = (TopicSession) session;
+ return topicSession.createTemporaryTopic();
+ }
+ else {
+ QueueSession queueSession = (QueueSession) session;
+ return queueSession.createTemporaryQueue();
+ }
+ }
/**
* @return the map of requestors, indexed by destination.
* The Map will be lazily constructed
*/
protected Map getRequestorsMap() {
- if ( requestorsMap == null ) {
+ if (requestorsMap == null) {
requestorsMap = new HashMap();
}
return requestorsMap;
1.2 +9 -2 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Lock.java
Index: Lock.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Lock.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- Lock.java 27 Aug 2003 16:28:31 -0000 1.1
+++ Lock.java 3 Sep 2003 17:58:14 -0000 1.2
@@ -42,6 +42,9 @@
}
owner = caller;
count = 1;
+
+// System.out.println("Lock: " + this + " acquired by + "+ caller );
+// new Exception().printStackTrace();
}
}
}
@@ -55,7 +58,11 @@
}
else {
if (--count == 0) {
+// System.out.println("Lock: " + this + " released by + "+ owner );
+// new Exception().printStackTrace();
+
owner = null;
+
notify();
}
}
1.19 +4 -1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
Index: DefaultMessenger.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- DefaultMessenger.java 27 Aug 2003 16:26:38 -0000 1.18
+++ DefaultMessenger.java 3 Sep 2003 17:58:14 -0000 1.19
@@ -125,9 +125,12 @@
}
public synchronized void close() throws JMSException {
+ MessengerSession session = getMessengerSession();
+
// clear all the pools...
messengerSessionPool = new ThreadLocal();
+ session.close();
getSessionFactory().close();
}
1.37 +31 -14 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
Index: MessengerSupport.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -r1.36 -r1.37
--- MessengerSupport.java 27 Aug 2003 16:26:38 -0000 1.36
+++ MessengerSupport.java 3 Sep 2003 17:58:14 -0000 1.37
@@ -167,7 +167,7 @@
}
public Message receive(Destination destination) throws JMSException {
- Session session = borrowSession();
+ Session session = borrowReceiveSession();
MessageConsumer consumer = null;
try {
consumer = borrowMessageConsumer(session, destination);
@@ -175,13 +175,13 @@
}
finally {
returnMessageConsumer(consumer);
- returnSession(session);
+ returnReceiveSession(session);
}
}
public Message receive(Destination destination, String selector)
throws JMSException {
- Session session = borrowSession();
+ Session session = borrowReceiveSession();
MessageConsumer consumer = null;
try {
consumer = borrowMessageConsumer(session, destination, selector);
@@ -189,13 +189,13 @@
}
finally {
returnMessageConsumer(consumer);
- returnSession(session);
+ returnReceiveSession(session);
}
}
public Message receive(Destination destination, long timeoutMillis)
throws JMSException {
- Session session = borrowSession();
+ Session session = borrowReceiveSession();
MessageConsumer consumer = null;
try {
consumer = borrowMessageConsumer(session, destination);
@@ -203,7 +203,7 @@
}
finally {
returnMessageConsumer(consumer);
- returnSession(session);
+ returnReceiveSession(session);
}
}
@@ -212,7 +212,7 @@
String selector,
long timeoutMillis)
throws JMSException {
- Session session = borrowSession();
+ Session session = borrowReceiveSession();
MessageConsumer consumer = null;
try {
consumer = borrowMessageConsumer(session, destination, selector);
@@ -220,12 +220,12 @@
}
finally {
returnMessageConsumer(consumer);
- returnSession(session);
+ returnReceiveSession(session);
}
}
public Message receiveNoWait(Destination destination) throws JMSException {
- Session session = borrowSession();
+ Session session = borrowReceiveSession();
MessageConsumer consumer = null;
try {
consumer = borrowMessageConsumer(session, destination);
@@ -233,13 +233,13 @@
}
finally {
returnMessageConsumer(consumer);
- returnSession(session);
+ returnReceiveSession(session);
}
}
public Message receiveNoWait(Destination destination, String selector)
throws JMSException {
- Session session = borrowSession();
+ Session session = borrowReceiveSession();
MessageConsumer consumer = null;
try {
consumer = borrowMessageConsumer(session, destination, selector);
@@ -247,7 +247,7 @@
}
finally {
returnMessageConsumer(consumer);
- returnSession(session);
+ returnReceiveSession(session);
}
}
@@ -850,6 +850,23 @@
/** @return a session instance back to the pool */
protected abstract void returnListenerSession(Session session)
throws JMSException;
+
+ /**
+ * By default use teh same session as sending - though may wish to
+ * change this if a session is shared across threads
+ */
+ protected void returnReceiveSession(Session session) throws JMSException {
+ returnSession(session);
+ }
+
+ /**
+ * By default use teh same session as sending - though may wish to
+ * change this if a session is shared across threads
+ */
+ protected Session borrowReceiveSession() throws JMSException {
+ return borrowSession();
+ }
+
protected abstract boolean isTopic(Connection connection)
throws JMSException;
1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SimpleMessenger.java
Index: SimpleMessenger.java
===================================================================
/*
* ====================================================================
*
* The Apache Software License, Version 1.1
*
* Copyright (c) 1999-2003 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Commons", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.commons.messenger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** <p><code>SimpleMessenger</code> is an implementation of
* Messenger which uses a single JMS Session for sending
* to keep the JMS Session that should be used for a given calling thread.</p>
*
* @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
* @version $Revision: 1.1 $
*/
public class SimpleMessenger extends MessengerSupport {
/** Logger */
private static final Log log = LogFactory.getLog(SimpleMessenger.class);
// should have ack mode for sending and consuming
/** the SessionFactory used to create new JMS sessions */
private SessionFactory sessionFactory;
private MessengerSession messengerSession;
// locks to ensure only 1 thread uses a session at once
private Lock asyncSessionLock = new Lock();
private Lock sessionLock = new Lock();
private Lock sendSessionLock = new Lock();
private ThreadLocal threadLocalData = new ThreadLocal();
public SimpleMessenger() {
}
/** Returns the SessionFactory used to create new JMS sessions */
public SessionFactory getSessionFactory() throws JMSException {
if (sessionFactory == null) {
sessionFactory = createSessionFactory();
}
return sessionFactory;
}
/** Sets the SessionFactory used to create new JMS sessions */
public void setSessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
public Connection getConnection() throws JMSException {
return getSessionFactory().getConnection();
}
public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
throws JMSException {
return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
}
public synchronized void close() throws JMSException {
if (messengerSession != null) {
messengerSession.close();
}
getSessionFactory().close();
}
public Session getSession() throws JMSException {
return getMessengerSession().getSession();
}
public Session getAsyncSession() throws JMSException {
return getMessengerSession().getListenerSession();
}
public Message call(Destination destination, Message message) throws JMSException {
Session sendSession = borrowSession();
Session session = borrowReceiveSession();
MessageProducer producer = null;
try {
ThreadLocalData data = getThreadLocalData(session);
Destination replyTo = data.destination;
message.setJMSReplyTo(replyTo);
// NOTE - we could consider adding a correlation ID per request so that we can ignore
// any cruft or old messages that are sent onto our inbound queue.
//
// Though that does mean that we must then rely on the inbound message having
// the right correlationID. Though at least this strategy would mean
// that we could have a single consumer on a temporary queue for all threads
// and use correlation IDs to dispatch to the corrent thread
//
// Maybe this should be a configurable strategy
producer = borrowMessageProducer(sendSession, destination);
MessageConsumer consumer = data.consumer;
if (isTopic(producer)) {
((TopicPublisher) producer).publish(message);
}
else {
((QueueSender) producer).send(message);
}
Message response = consumer.receive();
if (response == null) {
// we could have timed out so lets trash the temporary destination
// so that the next call() method will use a new destination to avoid
// the response for this call() coming back on later call() invokcations
data.clear();
}
return response;
}
finally {
returnMessageProducer(producer);
returnReceiveSession(session);
returnSession(sendSession);
}
}
public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
Session sendSession = borrowSession();
Session session = borrowReceiveSession();
MessageProducer producer = null;
try {
ThreadLocalData data = getThreadLocalData(session);
Destination replyTo = data.destination;
message.setJMSReplyTo(replyTo);
// NOTE - we could consider adding a correlation ID per request so that we can ignore
// any cruft or old messages that are sent onto our inbound queue.
//
// Though that does mean that we must then rely on the inbound message having
// the right correlationID. Though at least this strategy would mean
// that we could have a single consumer on a temporary queue for all threads
// and use correlation IDs to dispatch to the corrent thread
//
// Maybe this should be a configurable strategy
producer = borrowMessageProducer(sendSession, destination);
MessageConsumer consumer = data.consumer;
if (isTopic(producer)) {
((TopicPublisher) producer).publish(message);
}
else {
((QueueSender) producer).send(message);
}
Message response = consumer.receive(timeoutMillis);
if (response == null) {
// we could have timed out so lets trash the temporary destination
// so that the next call() method will use a new destination to avoid
// the response for this call() coming back on later call() invokcations
data.clear();
}
return response;
}
finally {
returnMessageProducer(producer);
returnReceiveSession(session);
returnSession(sendSession);
}
}
public void send(Destination destination, Message message) throws JMSException {
Session session = borrowSession();
MessageProducer producer = null;
System.out.println("About to send message...");
try {
producer = borrowMessageProducer(session, destination);
System.out.println("Got producer: " + producer);
if (isTopic(producer)) {
((TopicPublisher) producer).publish((Topic) destination, message);
}
else {
((QueueSender) producer).send((Queue) destination, message);
}
System.out.println("Sent message");
}
finally {
returnMessageProducer(producer);
returnSession(session);
}
}
/**
* @return the local thread data
*/
protected ThreadLocalData getThreadLocalData(Session session) throws JMSException {
ThreadLocalData data = (ThreadLocalData) threadLocalData.get();
if (data == null) {
data = new ThreadLocalData();
threadLocalData.set(data);
}
if (data.destination == null) {
data.destination = createTemporaryDestination();
}
if (data.consumer == null) {
data.consumer = this.createConsumer(data.destination);
}
return data;
}
// Implementation methods
//-------------------------------------------------------------------------
protected static class ThreadLocalData {
public MessageConsumer consumer;
public Destination destination;
public void clear() throws JMSException {
destination = null;
consumer.close();
}
}
protected boolean isTopic(Connection connection) throws JMSException {
return getSessionFactory().isTopic();
}
protected boolean isTopic(ConnectionFactory factory) throws JMSException {
return getSessionFactory().isTopic();
}
protected boolean isTopic(Session session) throws JMSException {
return getSessionFactory().isTopic();
}
protected boolean isTopic(MessageProducer producer) throws JMSException {
return getSessionFactory().isTopic();
}
protected synchronized Session borrowSession() throws JMSException {
sessionLock.acquire();
return getMessengerSession().getSession();
}
protected void returnSession(Session session) throws JMSException {
sessionLock.release();
}
protected Session borrowListenerSession() throws JMSException {
asyncSessionLock.acquire();
return getMessengerSession().getListenerSession();
}
protected void returnListenerSession(Session session) throws JMSException {
asyncSessionLock.release();
}
protected Session borrowReceiveSession() throws JMSException {
sendSessionLock.acquire();
return getMessengerSession().getReceiveSession();
}
protected void returnReceiveSession(Session session) throws JMSException {
sendSessionLock.release();
}
protected MessageProducer borrowMessageProducer(Session session, Destination destination) throws JMSException {
sessionLock.acquire();
return getMessengerSession().getMessageProducer(destination);
}
protected void returnMessageProducer(MessageProducer producer) throws JMSException {
sessionLock.release();
}
/**
* @return the current thread's MessengerSession
*/
protected synchronized MessengerSession getMessengerSession() throws JMSException {
if (messengerSession == null) {
messengerSession = createMessengerSession();
}
return messengerSession;
}
/**
* Factory method to create a new MessengerSession
*/
protected MessengerSession createMessengerSession() throws JMSException {
return new MessengerSession(this, getSessionFactory());
}
/** Factory method to create a SessionFactory.
* Derived classes could override this method to create the SessionFactory
* from a well known place
*/
protected SessionFactory createSessionFactory() throws JMSException {
throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
}
}
1.6 +8 -4 jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java
Index: TestMessenger.java
===================================================================
RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- TestMessenger.java 4 Mar 2003 10:21:09 -0000 1.5
+++ TestMessenger.java 3 Sep 2003 17:58:14 -0000 1.6
@@ -83,10 +83,12 @@
Thread.sleep( waitTime );
- log( "sending topic message" );
+ log( "creating the message" );
TextMessage message = messenger.createTextMessage( topicMessageText );
+ log( "sending topic message" );
+
messenger.send( topic, message );
log( "sleeping" );
@@ -118,9 +120,11 @@
Thread.sleep( waitTime );
- log( "sending queue message" );
+ log( "creating the message" );
TextMessage message = messenger.createTextMessage( queueMessageText );
+
+ log( "sending queue message" );
messenger.send( queue, message );