You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2003/09/03 19:58:14 UTC

cvs commit: jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger TestMessenger.java

jstrachan    2003/09/03 10:58:14

  Modified:    messenger/src/java/org/apache/commons/messenger
                        MessengerSession.java Lock.java
                        DefaultMessenger.java MessengerSupport.java
               messenger/src/test/org/apache/commons/messenger
                        TestMessenger.java
  Added:       messenger/src/java/org/apache/commons/messenger
                        SimpleMessenger.java
  Log:
  Added an implementation of a SimpleMessenger which is useful when using atomic Messenger operations (like send , receieve which are independent). Not useful for transactional stuff but can be handy for simple atomic sends / receives.

This implementation uses minimal Session objects and so can result in reduced thread creation in servlet environments
  
  Revision  Changes    Path
  1.8       +67 -41    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java
  
  Index: MessengerSession.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- MessengerSession.java	27 Aug 2003 16:26:38 -0000	1.7
  +++ MessengerSession.java	3 Sep 2003 17:58:14 -0000	1.8
  @@ -33,18 +33,21 @@
   
       /** the JMS Session for this thread */
       private Session session;
  -    
  +
       /** the JMS Listener (async subscription) Session for this thread */
       private Session listenerSession;
   
  -    /** the MessageConsumer for this threads reply to destination */    
  +    /** the JMS Session for blocking receive for this thread */
  +    private Session receiveSession;
  +
  +    /** the MessageConsumer for this threads reply to destination */
       private MessageConsumer replyToConsumer;
  -    
  +
       /** The factory used to create each thread's JMS Session */
       private SessionFactory sessionFactory;
   
       /** An optional cache of requestors */
  -    private Map requestorsMap;    
  +    private Map requestorsMap;
   
       /** The inbox which is used for the call() methods */
       private Destination replyToDestination;
  @@ -52,7 +55,7 @@
       /** The current messenger to which I'm connected */
       private MessengerSupport messenger;
   
  -	/** The producer used to send messages using this session */
  +    /** The producer used to send messages using this session */
       private MessageProducer producer;
   
       public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
  @@ -61,29 +64,60 @@
       }
   
       public SessionFactory getSessionFactory() {
  -        return sessionFactory;        
  +        return sessionFactory;
  +    }
  +
  +    /**
  +     * Closes any sessions or producers open
  +     */
  +    public void close() throws JMSException {
  +        if (producer != null) {
  +            producer.close();
  +            producer = null;
  +        }
  +
  +        if (session != null) {
  +            session.close();
  +            session = null;
  +        }
  +        if (listenerSession != null) {
  +            listenerSession.close();
  +            listenerSession = null;
  +        }
  +        if (receiveSession != null) {
  +            receiveSession.close();
  +            receiveSession = null;
  +        }
       }
  -    
       /** 
        * @return the JMS Session for this thread for synchronous mode 
        */
       public Session getSession() throws JMSException {
  -        if ( session == null ) {
  +        if (session == null) {
               session = createSession();
           }
           return session;
       }
  -    
  +
       /** 
        * @return the JMS Session for this thread for asynchronous mode 
        */
       public Session getListenerSession() throws JMSException {
  -        if ( listenerSession == null ) {
  +        if (listenerSession == null) {
               listenerSession = createSession();
           }
           return listenerSession;
       }
   
  +    /** 
  +     * @return the JMS Session for this thread for blocking receive of messages 
  +     */
  +    public Session getReceiveSession() throws JMSException {
  +        if (receiveSession == null) {
  +            receiveSession = createSession();
  +        }
  +        return receiveSession;
  +    }
   
       /** 
        * @return the MessageConsumer for the ReplyTo Destination for this thread
  @@ -91,22 +125,21 @@
       public MessageConsumer getReplyToConsumer() throws JMSException {
           return replyToConsumer;
       }
  -        
  +
       public void setReplyToConsumer(MessageConsumer replyToConsumer) {
           this.replyToConsumer = replyToConsumer;
       }
   
  -    
       /**
        * @return the MessageProducer for the given destination.
        */
       public MessageProducer getMessageProducer(Destination destination) throws JMSException {
  -    	if (producer == null) {
  -    		producer = messenger.createMessageProducer( session, null );
  +        if (producer == null) {
  +            producer = messenger.createMessageProducer(session, null);
           }
           return producer;
       }
  -    
  +
       /** 
        * @return the reply to destination (a temporary queue) 
        * used to reply to this thread and session
  @@ -117,21 +150,18 @@
           }
           return replyToDestination;
       }
  -    
  +
       /** 
        * Sets the reply to destination to use
        */
       protected void setReplyToDestination(Destination replyToDestination) throws JMSException {
           this.replyToDestination = replyToDestination;
       }
  -    
  +
       /**
        * @return either a cached TopicRequestor or creates a new one
        */
  -    public TopicRequestor getTopicRequestor(
  -        TopicSession session,
  -        Topic destination)
  -        throws JMSException {
  +    public TopicRequestor getTopicRequestor(TopicSession session, Topic destination) throws JMSException {
           if (messenger.isCacheRequestors()) {
               TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination);
               if (requestor == null) {
  @@ -144,14 +174,11 @@
               return new TopicRequestor(session, destination);
           }
       }
  -    
  +
       /**
        * @return either a cached QueueRequestor or creates a new one
        */
  -    public QueueRequestor getQueueRequestor(
  -        QueueSession session,
  -        Queue destination)
  -        throws JMSException {
  +    public QueueRequestor getQueueRequestor(QueueSession session, Queue destination) throws JMSException {
           if (messenger.isCacheRequestors()) {
               QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination);
               if (requestor == null) {
  @@ -165,34 +192,33 @@
           }
       }
   
  -    
       /** 
        * Factory method to create a new JMS Session 
        */
       protected Session createSession() throws JMSException {
           return getSessionFactory().createSession(messenger.getConnection());
       }
  -    
  +
       /**
        * Factory method to create a new temporary destination
        */
  -	protected Destination createTemporaryDestination() throws JMSException {
  -		if (messenger.isTopic(session)) {
  -			TopicSession topicSession = (TopicSession) session;
  -			return topicSession.createTemporaryTopic();
  -		} else {
  -			QueueSession queueSession = (QueueSession) session;
  -			return queueSession.createTemporaryQueue();
  -		}
  -	}
  -    
  +    protected Destination createTemporaryDestination() throws JMSException {
  +        if (messenger.isTopic(session)) {
  +            TopicSession topicSession = (TopicSession) session;
  +            return topicSession.createTemporaryTopic();
  +        }
  +        else {
  +            QueueSession queueSession = (QueueSession) session;
  +            return queueSession.createTemporaryQueue();
  +        }
  +    }
   
       /** 
        * @return the map of requestors, indexed by destination.
        *  The Map will be lazily constructed
        */
       protected Map getRequestorsMap() {
  -        if ( requestorsMap == null ) {
  +        if (requestorsMap == null) {
               requestorsMap = new HashMap();
           }
           return requestorsMap;
  
  
  
  1.2       +9 -2      jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Lock.java
  
  Index: Lock.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Lock.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Lock.java	27 Aug 2003 16:28:31 -0000	1.1
  +++ Lock.java	3 Sep 2003 17:58:14 -0000	1.2
  @@ -42,6 +42,9 @@
                   }
                   owner = caller;
                   count = 1;
  +                
  +//                System.out.println("Lock: " + this + " acquired by + "+ caller );
  +//                new Exception().printStackTrace();
               }
           }
       }
  @@ -55,7 +58,11 @@
           }
           else {
               if (--count == 0) {
  +//                System.out.println("Lock: " + this + " released by + "+ owner );
  +//                new Exception().printStackTrace();
  +
                   owner = null;
  +
                   notify();
               }
           }
  
  
  
  1.19      +4 -1      jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
  
  Index: DefaultMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
  retrieving revision 1.18
  retrieving revision 1.19
  diff -u -r1.18 -r1.19
  --- DefaultMessenger.java	27 Aug 2003 16:26:38 -0000	1.18
  +++ DefaultMessenger.java	3 Sep 2003 17:58:14 -0000	1.19
  @@ -125,9 +125,12 @@
       }
       
       public synchronized void close() throws JMSException {
  +		MessengerSession session = getMessengerSession();
  +		
           // clear all the pools...
           messengerSessionPool = new ThreadLocal();
           
  +		session.close();
           getSessionFactory().close();
       }
   
  
  
  
  1.37      +31 -14    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
  
  Index: MessengerSupport.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
  retrieving revision 1.36
  retrieving revision 1.37
  diff -u -r1.36 -r1.37
  --- MessengerSupport.java	27 Aug 2003 16:26:38 -0000	1.36
  +++ MessengerSupport.java	3 Sep 2003 17:58:14 -0000	1.37
  @@ -167,7 +167,7 @@
       }
   
       public Message receive(Destination destination) throws JMSException {
  -        Session session = borrowSession();
  +        Session session = borrowReceiveSession();
           MessageConsumer consumer = null;
           try {
               consumer = borrowMessageConsumer(session, destination);
  @@ -175,13 +175,13 @@
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnSession(session);
  +            returnReceiveSession(session);
           }
       }
   
       public Message receive(Destination destination, String selector)
           throws JMSException {
  -        Session session = borrowSession();
  +        Session session = borrowReceiveSession();
           MessageConsumer consumer = null;
           try {
               consumer = borrowMessageConsumer(session, destination, selector);
  @@ -189,13 +189,13 @@
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnSession(session);
  +            returnReceiveSession(session);
           }
       }
   
       public Message receive(Destination destination, long timeoutMillis)
           throws JMSException {
  -        Session session = borrowSession();
  +        Session session = borrowReceiveSession();
           MessageConsumer consumer = null;
           try {
               consumer = borrowMessageConsumer(session, destination);
  @@ -203,7 +203,7 @@
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnSession(session);
  +            returnReceiveSession(session);
           }
       }
   
  @@ -212,7 +212,7 @@
           String selector,
           long timeoutMillis)
           throws JMSException {
  -        Session session = borrowSession();
  +        Session session = borrowReceiveSession();
           MessageConsumer consumer = null;
           try {
               consumer = borrowMessageConsumer(session, destination, selector);
  @@ -220,12 +220,12 @@
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnSession(session);
  +            returnReceiveSession(session);
           }
       }
   
       public Message receiveNoWait(Destination destination) throws JMSException {
  -        Session session = borrowSession();
  +        Session session = borrowReceiveSession();
           MessageConsumer consumer = null;
           try {
               consumer = borrowMessageConsumer(session, destination);
  @@ -233,13 +233,13 @@
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnSession(session);
  +            returnReceiveSession(session);
           }
       }
   
       public Message receiveNoWait(Destination destination, String selector)
           throws JMSException {
  -        Session session = borrowSession();
  +        Session session = borrowReceiveSession();
           MessageConsumer consumer = null;
           try {
               consumer = borrowMessageConsumer(session, destination, selector);
  @@ -247,7 +247,7 @@
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnSession(session);
  +            returnReceiveSession(session);
           }
       }
   
  @@ -850,6 +850,23 @@
       /** @return a session instance back to the pool */
       protected abstract void returnListenerSession(Session session)
           throws JMSException;
  +
  +    /**
  +     * By default use teh same session as sending - though may wish to
  +     * change this if a session is shared across threads
  +     */
  +    protected void returnReceiveSession(Session session) throws JMSException {
  +        returnSession(session);
  +    }
  +
  +    /**
  +     * By default use teh same session as sending - though may wish to
  +     * change this if a session is shared across threads
  +     */
  +    protected Session borrowReceiveSession() throws JMSException {
  +        return borrowSession();
  +    }
  +
   
       protected abstract boolean isTopic(Connection connection)
           throws JMSException;
  
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SimpleMessenger.java
  
  Index: SimpleMessenger.java
  ===================================================================
  /*
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999-2003 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Commons", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   */
  package org.apache.commons.messenger;
  
  import javax.jms.Connection;
  import javax.jms.ConnectionFactory;
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.MessageConsumer;
  import javax.jms.MessageListener;
  import javax.jms.MessageProducer;
  import javax.jms.Queue;
  import javax.jms.QueueSender;
  import javax.jms.ServerSessionPool;
  import javax.jms.Session;
  import javax.jms.Topic;
  import javax.jms.TopicPublisher;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  /** <p><code>SimpleMessenger</code> is an implementation of
    * Messenger which uses a single JMS Session for sending
    * to keep the JMS Session that should be used for a given calling thread.</p>
    *
    * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
    * @version $Revision: 1.1 $
    */
  public class SimpleMessenger extends MessengerSupport {
  
      /** Logger */
      private static final Log log = LogFactory.getLog(SimpleMessenger.class);
  
      // should have ack mode for sending and consuming
  
      /** the SessionFactory used to create new JMS sessions */
      private SessionFactory sessionFactory;
  
      private MessengerSession messengerSession;
  
      // locks to ensure only 1 thread uses a session at once
      private Lock asyncSessionLock = new Lock();
      private Lock sessionLock = new Lock();
      private Lock sendSessionLock = new Lock();
  
      private ThreadLocal threadLocalData = new ThreadLocal();
  
      public SimpleMessenger() {
      }
  
      /** Returns the SessionFactory used to create new JMS sessions */
      public SessionFactory getSessionFactory() throws JMSException {
          if (sessionFactory == null) {
              sessionFactory = createSessionFactory();
          }
          return sessionFactory;
      }
  
      /** Sets the SessionFactory used to create new JMS sessions */
      public void setSessionFactory(SessionFactory sessionFactory) {
          this.sessionFactory = sessionFactory;
      }
  
      public Connection getConnection() throws JMSException {
          return getSessionFactory().getConnection();
      }
  
      public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
          throws JMSException {
          return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
      }
  
      public synchronized void close() throws JMSException {
          if (messengerSession != null) {
              messengerSession.close();
          }
  
          getSessionFactory().close();
      }
  
      public Session getSession() throws JMSException {
          return getMessengerSession().getSession();
      }
  
      public Session getAsyncSession() throws JMSException {
          return getMessengerSession().getListenerSession();
      }
  
      public Message call(Destination destination, Message message) throws JMSException {
          Session sendSession = borrowSession();
          Session session = borrowReceiveSession();
          MessageProducer producer = null;
          try {
              ThreadLocalData data = getThreadLocalData(session);
              Destination replyTo = data.destination;
              message.setJMSReplyTo(replyTo);
  
              // NOTE - we could consider adding a correlation ID per request so that we can ignore
              // any cruft or old messages that are sent onto our inbound queue.
              //
              // Though that does mean that we must then rely on the inbound message having
              // the right correlationID. Though at least this strategy would mean
              // that we could have a single consumer on a temporary queue for all threads
              // and use correlation IDs to dispatch to the corrent thread
              //
              // Maybe this should be a configurable strategy
  
              producer = borrowMessageProducer(sendSession, destination);
              MessageConsumer consumer = data.consumer;
  
              if (isTopic(producer)) {
                  ((TopicPublisher) producer).publish(message);
              }
              else {
                  ((QueueSender) producer).send(message);
              }
              Message response = consumer.receive();
              if (response == null) {
                  // we could have timed out so lets trash the temporary destination
                  // so that the next call() method will use a new destination to avoid
                  // the response for this call() coming back on later call() invokcations
                  data.clear();
              }
              return response;
          }
          finally {
              returnMessageProducer(producer);
              returnReceiveSession(session);
              returnSession(sendSession);
          }
      }
  
      public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
          Session sendSession = borrowSession();
          Session session = borrowReceiveSession();
          MessageProducer producer = null;
          try {
              ThreadLocalData data = getThreadLocalData(session);
              Destination replyTo = data.destination;
              message.setJMSReplyTo(replyTo);
  
              // NOTE - we could consider adding a correlation ID per request so that we can ignore
              // any cruft or old messages that are sent onto our inbound queue.
              //
              // Though that does mean that we must then rely on the inbound message having
              // the right correlationID. Though at least this strategy would mean
              // that we could have a single consumer on a temporary queue for all threads
              // and use correlation IDs to dispatch to the corrent thread
              //
              // Maybe this should be a configurable strategy
  
              producer = borrowMessageProducer(sendSession, destination);
  
              MessageConsumer consumer = data.consumer;
              if (isTopic(producer)) {
                  ((TopicPublisher) producer).publish(message);
              }
              else {
                  ((QueueSender) producer).send(message);
              }
              Message response = consumer.receive(timeoutMillis);
              if (response == null) {
                  // we could have timed out so lets trash the temporary destination
                  // so that the next call() method will use a new destination to avoid
                  // the response for this call() coming back on later call() invokcations
                  data.clear();
              }
              return response;
          }
          finally {
              returnMessageProducer(producer);
              returnReceiveSession(session);
              returnSession(sendSession);
          }
      }
  
      public void send(Destination destination, Message message) throws JMSException {
          Session session = borrowSession();
          MessageProducer producer = null;
  
          System.out.println("About to send message...");
          try {
              producer = borrowMessageProducer(session, destination);
  
              System.out.println("Got producer: " + producer);
  
              if (isTopic(producer)) {
                  ((TopicPublisher) producer).publish((Topic) destination, message);
              }
              else {
                  ((QueueSender) producer).send((Queue) destination, message);
              }
  
              System.out.println("Sent message");
          }
          finally {
              returnMessageProducer(producer);
              returnSession(session);
          }
      }
  
      /**
       * @return the local thread data
       */
      protected ThreadLocalData getThreadLocalData(Session session) throws JMSException {
          ThreadLocalData data = (ThreadLocalData) threadLocalData.get();
          if (data == null) {
              data = new ThreadLocalData();
              threadLocalData.set(data);
          }
          if (data.destination == null) {
              data.destination = createTemporaryDestination();
          }
          if (data.consumer == null) {
              data.consumer = this.createConsumer(data.destination);
          }
          return data;
      }
  
      // Implementation methods
      //-------------------------------------------------------------------------
      protected static class ThreadLocalData {
          public MessageConsumer consumer;
          public Destination destination;
  
          public void clear() throws JMSException {
              destination = null;
              consumer.close();
          }
      }
  
      protected boolean isTopic(Connection connection) throws JMSException {
          return getSessionFactory().isTopic();
      }
  
      protected boolean isTopic(ConnectionFactory factory) throws JMSException {
          return getSessionFactory().isTopic();
      }
  
      protected boolean isTopic(Session session) throws JMSException {
          return getSessionFactory().isTopic();
      }
  
      protected boolean isTopic(MessageProducer producer) throws JMSException {
          return getSessionFactory().isTopic();
      }
  
      protected synchronized Session borrowSession() throws JMSException {
          sessionLock.acquire();
          return getMessengerSession().getSession();
      }
  
      protected void returnSession(Session session) throws JMSException {
          sessionLock.release();
      }
  
      protected Session borrowListenerSession() throws JMSException {
          asyncSessionLock.acquire();
          return getMessengerSession().getListenerSession();
      }
  
      protected void returnListenerSession(Session session) throws JMSException {
          asyncSessionLock.release();
      }
  
      protected Session borrowReceiveSession() throws JMSException {
          sendSessionLock.acquire();
          return getMessengerSession().getReceiveSession();
      }
  
      protected void returnReceiveSession(Session session) throws JMSException {
          sendSessionLock.release();
      }
  
      protected MessageProducer borrowMessageProducer(Session session, Destination destination) throws JMSException {
          sessionLock.acquire();
          return getMessengerSession().getMessageProducer(destination);
      }
  
      protected void returnMessageProducer(MessageProducer producer) throws JMSException {
          sessionLock.release();
      }
  
      /**
        * @return the current thread's MessengerSession
        */
      protected synchronized MessengerSession getMessengerSession() throws JMSException {
          if (messengerSession == null) {
              messengerSession = createMessengerSession();
          }
          return messengerSession;
      }
  
      /**
       * Factory method to create a new MessengerSession
       */
      protected MessengerSession createMessengerSession() throws JMSException {
          return new MessengerSession(this, getSessionFactory());
      }
  
      /** Factory method to create a SessionFactory.
        * Derived classes could override this method to create the SessionFactory
        * from a well known place
        */
      protected SessionFactory createSessionFactory() throws JMSException {
          throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
      }
  }
  
  
  
  1.6       +8 -4      jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java
  
  Index: TestMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- TestMessenger.java	4 Mar 2003 10:21:09 -0000	1.5
  +++ TestMessenger.java	3 Sep 2003 17:58:14 -0000	1.6
  @@ -83,10 +83,12 @@
           
           Thread.sleep( waitTime );
           
  -        log( "sending topic message" );
  +        log( "creating the message" );
           
           TextMessage message = messenger.createTextMessage( topicMessageText );
           
  +        log( "sending topic message" );
  +        
           messenger.send( topic, message );
           
           log( "sleeping" );
  @@ -118,9 +120,11 @@
           
           Thread.sleep( waitTime );
           
  -        log( "sending queue message" );
  +        log( "creating the message" );
           
           TextMessage message = messenger.createTextMessage( queueMessageText );
  +        
  +        log( "sending queue message" );
           
           messenger.send( queue, message );