You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2002/06/14 19:50:13 UTC

cvs commit: jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger MessengerSession.java MessengerSupport.java DefaultMessenger.java

jstrachan    2002/06/14 10:50:13

  Modified:    messenger build.xml gump.xml
               messenger/src/java/org/apache/commons/messenger
                        MessengerSupport.java DefaultMessenger.java
  Added:       messenger/src/java/org/apache/commons/messenger/task
                        ConsumerTask.java
               messenger/src/java/org/apache/commons/messenger
                        MessengerSession.java
  Log:
  Refactored the session pooling code to make things much more simple and clear
  Also only one thread has a temporary queue so that multi-threaded call() functions should not step on each others toes.
  Finally added a ConsumerTask for consuming JMS messages via Ant.
  
  Revision  Changes    Path
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/task/ConsumerTask.java
  
  Index: ConsumerTask.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   * 
   * $Id: ConsumerTask.java,v 1.4 2002/05/17 15:05:47 jstrachan Exp $
   */
  package org.apache.commons.messenger.task;
  
  import java.io.File;
  import java.io.FileWriter;
  import java.io.IOException;
  import java.util.Iterator;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.TextMessage;
  
  import org.apache.commons.messenger.Messenger;
  import org.apache.commons.messenger.MessengerManager;
  
  import org.apache.tools.ant.Task;
  import org.apache.tools.ant.BuildException;
  import org.apache.tools.ant.Project;
  
  /** 
   * <p><code>ConsumerTask</code> is an Ant task which will 
   * publish all of the given text files as a JMS Text Message
   * using a given JMS Connection (Messenger) and a Destination
   *
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
   * @version $Revision: 1.4 $
   */
  public class ConsumerTask extends Task {
  
      private Messenger messenger;
      private String messengerName;
      private Destination destination;
      private String subject;
      private MessengerManager messengerManager;    
  
      /** the number of messages to receive */
      private int count;
  
      /** the output directory */
      private File dir = new File(".");    
      
      // Properties
      //-------------------------------------------------------------------------
      
  
      /** 
       * Sets the output directory 
       */
      public void setDir(File dir) {
          this.dir = dir;
      }
  
      public Messenger getMessenger() throws JMSException {
          if ( messenger == null ) {
              messenger = getMessengerManager().getMessenger( getMessengerName() );
          }
          return messenger;
      }
      
      /** Sets the Messenger to be used */
      public void setMessenger(Messenger messenger) {
          this.messenger = messenger;
      }
      
      /** Getter for property messengerName.
       * @return Value of property messengerName.
       */
      public String getMessengerName() {
          return messengerName;
      }
  
      /** Setter for property messengerName.
       * @param messengerName New value of property messengerName.
       */
      public void setMessengerName(String messengerName) {
          this.messengerName = messengerName;
      }
  
      /** Getter for property destination.
       * @return Value of property destination.
       */
      public Destination getDestination() throws JMSException {
          if ( destination == null ) {
              destination = getMessenger().getDestination( getSubject() );
          }
          return destination;
      }
  
      /** Setter for property destination.
       * @param destination New value of property destination.
       */
      public void setDestination(Destination destination) {
          this.destination = destination;
      }
  
      /** Getter for property subject.
       * @return Value of property subject.
       */
      public String getSubject() {
          return subject;
      }
  
      /** Setter for property subject.
       * @param subject New value of property subject.
       */
      public void setSubject(String subject) {
          this.subject = subject;
      }
      
      
      /** Getter for property messengerManager.
       * @return Value of property messengerManager.
       */
      public MessengerManager getMessengerManager() {
          return messengerManager;
      }
      
      /** Setter for property messengerManager.
       * @param messengerManager New value of property messengerManager.
       */
      public void setMessengerManager(MessengerManager messengerManager) {
          this.messengerManager = messengerManager;
      }
  
      /** 
       * Sets the URI of the Messenger.xml configuration document to use
       * to configure the messengers to use for this task.
       */
      public void setConfiguration(String uri) throws JMSException {
          setMessengerManager( MessengerManager.load( uri ) );
      }
      
      /** 
       * @return the number of messages to receive.
       * A number less than or equal to 0 will receive messages forever
       */
      public int getCount() {
          return count;
      }
      
      /** 
       * Setter for the number of messages to receive.
       * A number less than or equal to 0 will receive messages forever
       */
      public void setCount(int count) {
          this.count = count;
      }
      
      
      // Task interface
      //-------------------------------------------------------------------------
      
      /**
       * Performs the copy operation.
       */
      public void execute() throws BuildException {
          try {
              Messenger messenger = getMessenger();
              if ( messenger == null ) {
                  throw new BuildException("Must specify a valid Messenger", location );
              }
              Destination destination = getDestination();
              if ( destination == null ) {
                  throw new BuildException("Must specify a valid JMS Destination", location );
              }
  
              if ( count > 0 ) {
                  log( "Will wait until I receive: " + count + " messages and will write to directory: " + dir );
                  
                  for ( int i = 0; i < count; i++ ) {
                      Message message = messenger.receive( destination );
                      processMessage( message );
                  }
                  
                  log( "Finished." );
              }
              else {
                  log( "Infinite loop. Will write to directory: " + dir );
                  
                  while (true) {
                      Message message = messenger.receive( destination );
                      processMessage( message );
                  }
              }
          }
          catch (IOException e) {
              log( "Caught exception: " + e, Project.MSG_ERR );
              throw new BuildException(e, location);
          }
          catch (JMSException e) {
              log( "Caught exception: " + e, Project.MSG_ERR );
              throw new BuildException(e, location);
          }
          finally {        
              try {
                  // close the JMS connection to release any background threads        
                  messenger.close();
              }
              catch (Exception e) {
                  // ignore close exceptions
              }
          }
      }
  
      /**
       * Processes a given message
       */
      protected void processMessage(Message message) throws IOException, JMSException {
          log( "Received message to: " + message );
  
          String text = null;
          if ( message instanceof TextMessage ) {
              TextMessage textMessage = (TextMessage) message;
              text = textMessage.toString();
          }
          else {
              // #### bit of a hack!!!
              // ideally we need an XML format for message persistence
              text = message.toString();
          }       
          processMessageText(text);
      }
      
      /** 
       * Writes the given text to a file
       */
      protected void processMessageText(String text) throws IOException {
          FileWriter writer = new FileWriter( dir );
          writer.write ( text );
          writer.close();
      }
  }
  
  
  
  
  
  1.33      +27 -6     jakarta-commons-sandbox/messenger/build.xml
  
  Index: build.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/build.xml,v
  retrieving revision 1.32
  retrieving revision 1.33
  diff -u -r1.32 -r1.33
  --- build.xml	2 Jun 2002 18:19:57 -0000	1.32
  +++ build.xml	14 Jun 2002 17:50:13 -0000	1.33
  @@ -188,11 +188,18 @@
   	</target>
   	
   
  -	<target name="compile" depends="maven:compile, maven:jar-resources">
  -	  <path id="test.classpath">
  +	<target name="compile" depends="maven:compile">
  +	  <path id="jms.classpath">
   	    <pathelement path="${maven.build.dest}"/>
  -        <path refid="maven.dependency.classpath"/>
  +	    <path refid="maven.dependency.classpath"/>
  +	    <pathelement location="${lib.repo}/xmlParserAPIs-2.0.0.jar"/>
  +	    <pathelement location="${lib.repo}/xercesImpl-2.0.0.jar"/>
  +	    <pathelement path="${jms.classes.dir}"/>
  +	    <fileset dir="${jms.lib.dir}">
  +	      <include name="**/*.jar"/>
  +	    </fileset>
   	  </path>
  +    <!-- add the JARs required for a pluggable JMS provider -->
       </target>	
   
   	<target name="compile.test" depends="compile"/>
  @@ -313,7 +320,7 @@
   
   
     <!-- Construct unit test classpath -->
  -  <path id="test.classpath">
  +  <path id="old.test.classpath">
       <pathelement location="${build.home}/classes"/>
       <pathelement location="${build.home}/tests"/>
       <pathelement location="${servlet.jar}"/>
  @@ -340,7 +347,7 @@
     </path>
   
     <!-- Running sample programs against the configured JMS provider -->
  -  <path id="jms.classpath">
  +  <path id="old.jms.classpath">
       <pathelement location="${build.home}/classes"/>
       <pathelement location="${build.home}/tests"/>
       <pathelement location="${servlet.jar}"/>
  @@ -545,6 +552,20 @@
       <jmsSend messengerName="queue" configuration="${messenger.xml}" subject="jms/Queue">
         <fileset dir="src/conf" excludes="**/*.txt"/>
       </jmsSend> 
  +  </target>
  +  
  +  
  +  <target name="demo.receive.loop" depends="compile.test"
  +    description="Receives a number of messages from a JMS destination using the jmsReceive task">
  +
  +    <taskdef
  +      name="jmsReceive"
  +      classname="org.apache.commons.messenger.task.ConsumerTask">
  +      <classpath refid="jms.classpath"/>
  +    </taskdef>
  +
  +	<mkdir dir="target/output"/>		
  +    <jmsReceive messengerName="queue" configuration="${messenger.xml}" subject="jms/Queue" dir="target/output"/>
     </target>
     
   
  
  
  
  1.2       +6 -6      jakarta-commons-sandbox/messenger/gump.xml
  
  Index: gump.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/gump.xml,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- gump.xml	7 Jun 2002 08:30:32 -0000	1.1
  +++ gump.xml	14 Jun 2002 17:50:13 -0000	1.2
  @@ -23,12 +23,12 @@
       <depend project="jakarta-ant"/>
       <depend project="xml-xerces"/>
   
  -    <depend project="commons-logging"/>
  -    <depend project="commons-beanutils"/>
  -    <depend project="commons-collections"/>
  -    <depend project="commons-digester"/>
  -    <depend project="servlet"/>
  -    <depend project="jms"/>
  +    <depend project="$context.node.id"/>
  +    <depend project="$context.node.id"/>
  +    <depend project="$context.node.id"/>
  +    <depend project="$context.node.id"/>
  +    <depend project="$context.node.id"/>
  +    <depend project="$context.node.id"/>
   
       <work nested="target/classes"/>
       <home nested="target"/>
  
  
  
  1.22      +42 -72    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
  
  Index: MessengerSupport.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
  retrieving revision 1.21
  retrieving revision 1.22
  diff -u -r1.21 -r1.22
  --- MessengerSupport.java	17 May 2002 15:05:45 -0000	1.21
  +++ MessengerSupport.java	14 Jun 2002 17:50:13 -0000	1.22
  @@ -72,27 +72,12 @@
        */
       private boolean noLocal;
       
  +    /** Should we cache the requestor object per thread? */
  +    private boolean cacheRequestors;
  +
       /** A Map of ListenerKey objects to MessageConsumer objects */
       private Map listeners = new HashMap();
       
  -    /** A Map of MessageConsumer objects indexed by Destination or Destination and selector */
  -    private Map consumers = new HashMap();
  -    
  -    /** A Map of MessageProducer objects indexed by Destination */
  -    private Map producers = new HashMap();
  -    
  -    ///** A Map of Queue or Topic Requestors indexed by Destination */
  -    //private Map requestors = new HashMap();
  -    
  -    private ThreadLocal requestorsMap = new ThreadLocal() {
  -        protected Object initialValue() {
  -            return new HashMap();
  -        }
  -    };
  -    
  -    /** The inbox which is used for the call() methods */
  -    private Destination replyToDestination;
  -    
       public MessengerSupport() {
       }
       
  @@ -165,12 +150,16 @@
           try {
               if (isTopic(session)) {
                   TopicRequestor requestor =
  -                    getTopicRequestor((TopicSession) session, (Topic) destination);
  +                    getMessengerSession().getTopicRequestor(
  +                        (TopicSession) session, (Topic) destination
  +                    );
                   return requestor.request(message);
               }
               else {
                   QueueRequestor requestor =
  -                    getQueueRequestor((QueueSession) session, (Queue) destination);
  +                    getMessengerSession().getQueueRequestor(
  +                        (QueueSession) session, (Queue) destination
  +                    );
                   return requestor.request(message);
               }
           }
  @@ -178,7 +167,10 @@
               returnSession(session);
           }
       }
  +    
       /*
  +     * It'd be nice to try replacing the above with this...
  +     * 
           public Message call( Destination destination, Message message ) throws JMSException {
               Session session = borrowSession();
               try {
  @@ -777,7 +769,18 @@
           this.durable = durable;
       }
   
  -    /** Returns the durable name used for durable topic based subscriptions */
  +    /** Gets whether we should cache the requestor object per thread? */
  +    public boolean isCacheRequestors() {
  +        return cacheRequestors;
  +    }
  +    
  +    /** Sets whether we should cache the requestor object per thread? */
  +    public void setCacheRequestors(boolean cacheRequestors) {
  +        this.cacheRequestors = cacheRequestors;
  +    }
  +
  +
  +    /** @return the durable name used for durable topic based subscriptions */
       public String getDurableName() {
           return durableName;
       }
  @@ -810,16 +813,16 @@
       /** Borrows a session instance from the pool */
       protected abstract Session borrowSession() throws JMSException;
       
  -    /** Returns a session instance back to the pool */
  +    /** @return a session instance back to the pool */
       protected abstract void returnSession(Session session) throws JMSException;
       
       /** Deletes a session instance */
  -    protected abstract void deleteSession(Session session) throws JMSException;
  +    //protected abstract void deleteSession(Session session) throws JMSException;
       
       /** Borrows a session instance from the pool */
       protected abstract Session borrowListenerSession() throws JMSException;
       
  -    /** Returns a session instance back to the pool */
  +    /** @return a session instance back to the pool */
       protected abstract void returnListenerSession(Session session)
           throws JMSException;
       
  @@ -833,7 +836,13 @@
       protected abstract boolean isTopic(MessageProducer producer)
           throws JMSException;
       
  -    /** Returns a message producer for the given session and destination */
  +    /**
  +     * @return the current thread's MessengerSession
  +     */    
  +    protected abstract MessengerSession getMessengerSession() throws JMSException;
  +    
  +    
  +    /** @return a message producer for the given session and destination */
       protected MessageProducer getMessageProducer(
           Session session,
           Destination destination)
  @@ -848,7 +857,7 @@
           */
       }
       
  -    /** Returns a newly created message producer for the given session and destination */
  +    /** @return a newly created message producer for the given session and destination */
       protected MessageProducer createMessageProducer(
           Session session,
           Destination destination)
  @@ -863,7 +872,7 @@
           }
       }
       
  -    /** Returns a MessageConsumer for the given session and destination */
  +    /** @return a MessageConsumer for the given session and destination */
       protected MessageConsumer getMessageConsumer(
           Session session,
           Destination destination)
  @@ -878,7 +887,7 @@
           */
       }
       
  -    /** Returns a MessageConsumer for the given session, destination and selector */
  +    /** @return a MessageConsumer for the given session, destination and selector */
       protected MessageConsumer getMessageConsumer(
           Session session,
           Destination destination,
  @@ -888,7 +897,7 @@
           return createMessageConsumer(session, destination, selector);
       }
       
  -    /** Returns a new MessageConsumer for the given session and destination */
  +    /** @return a new MessageConsumer for the given session and destination */
       protected MessageConsumer createMessageConsumer(
           Session session,
           Destination destination)
  @@ -910,7 +919,7 @@
           }
       }
       
  -    /** Returns a new MessageConsumer for the given session, destination and selector */
  +    /** @return a new MessageConsumer for the given session, destination and selector */
       protected MessageConsumer createMessageConsumer(
           Session session,
           Destination destination,
  @@ -951,45 +960,6 @@
       }
       
       protected Destination getReplyToDestination() throws JMSException {
  -        if (replyToDestination == null) {
  -            replyToDestination = createTemporaryDestination();
  -        }
  -        return replyToDestination;
  -    }
  -    
  -    protected TopicRequestor getTopicRequestor(
  -        TopicSession session,
  -        Topic destination)
  -        throws JMSException {
  -        if (CACHE_REQUESTOR) {
  -            Map requestors = (Map) requestorsMap.get();
  -            TopicRequestor requestor = (TopicRequestor) requestors.get(destination);
  -            if (requestor == null) {
  -                requestor = new TopicRequestor(session, destination);
  -                requestors.put(destination, requestor);
  -            }
  -            return requestor;
  -        }
  -        else {
  -            return new TopicRequestor(session, destination);
  -        }
  -    }
  -    
  -    protected QueueRequestor getQueueRequestor(
  -        QueueSession session,
  -        Queue destination)
  -        throws JMSException {
  -        if (CACHE_REQUESTOR) {
  -            Map requestors = (Map) requestorsMap.get();
  -            QueueRequestor requestor = (QueueRequestor) requestors.get(destination);
  -            if (requestor == null) {
  -                requestor = new QueueRequestor(session, destination);
  -                requestors.put(destination, requestor);
  -            }
  -            return requestor;
  -        }
  -        else {
  -            return new QueueRequestor(session, destination);
  -        }
  +        return getMessengerSession().getReplyToDestination();
       }
   }
  
  
  
  1.11      +30 -52    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
  
  Index: DefaultMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- DefaultMessenger.java	17 May 2002 15:05:45 -0000	1.10
  +++ DefaultMessenger.java	14 Jun 2002 17:50:13 -0000	1.11
  @@ -45,19 +45,12 @@
       /** Logger */
       private static final Log log = LogFactory.getLog(DefaultMessenger.class);
       
  -    /** the session object for each thread */
  -    private ThreadLocal sessionPool = new ThreadLocal();
  +    /** the MessengerSession for each thread */
  +    private ThreadLocal messengerSessionPool = new ThreadLocal();
       
  -    /** the listener session object for each thread */
  -    private ThreadLocal listenerSessionPool = new ThreadLocal();
  -    
  -    /** The factory used to create each thread's JMS Session */
  +    /** the SessionFactory used to create new JMS sessions */
       private SessionFactory sessionFactory;
       
  -    /** A pool of Connections, one per thread */
  -    private ThreadLocal connectionPool = new ThreadLocal();
  -    
  -    
       public DefaultMessenger() {
       }
       
  @@ -75,21 +68,7 @@
       }
       
       public Connection getConnection() throws JMSException {
  -        if (SHARE_CONNECTION) {
  -            return getSessionFactory().getConnection();
  -        }
  -        else {
  -            Connection answer = (Connection) connectionPool.get();
  -            if (answer == null) {
  -                answer = getSessionFactory().createConnection();
  -                if (log.isInfoEnabled()) {
  -                    log.info(
  -                        "Created connection: " + answer + " for thread: " + Thread.currentThread());
  -                }
  -                connectionPool.set(answer);
  -            }
  -            return answer;
  -        }
  +        return getSessionFactory().getConnection();
       }
       
       public ServerSessionPool createServerSessionPool(
  @@ -102,59 +81,58 @@
       public void close() throws JMSException {
           getSessionFactory().close();
           // clear all the pools...
  -        sessionPool = new ThreadLocal();
  -        listenerSessionPool = new ThreadLocal();
  +        messengerSessionPool = new ThreadLocal();
       }
       
       // Implementation methods
       //-------------------------------------------------------------------------
       protected boolean isTopic(Connection connection) throws JMSException {
  -        return sessionFactory.isTopic();
  +        return getSessionFactory().isTopic();
       }
       
       protected boolean isTopic(ConnectionFactory factory) throws JMSException {
  -        return sessionFactory.isTopic();
  +        return getSessionFactory().isTopic();
       }
       
       protected boolean isTopic(Session session) throws JMSException {
  -        return sessionFactory.isTopic();
  +        return getSessionFactory().isTopic();
       }
       
       protected boolean isTopic(MessageProducer producer) throws JMSException {
  -        return sessionFactory.isTopic();
  +        return getSessionFactory().isTopic();
       }
       
       protected Session borrowSession() throws JMSException {
  -        Session answer = (Session) sessionPool.get();
  -        if (answer == null) {
  -            answer = createSession();
  -            sessionPool.set(answer);
  -        }
  -        return answer;
  +        return getMessengerSession().getSession();
       }
       
       protected void returnSession(Session session) {
       }
       
  -    protected void deleteSession(Session session) throws JMSException {
  -        sessionPool.set(null);
  +    protected Session borrowListenerSession() throws JMSException {
  +        return getMessengerSession().getListenerSession();
       }
       
  -    protected Session borrowListenerSession() throws JMSException {
  -        Session answer = (Session) listenerSessionPool.get();
  +    protected void returnListenerSession(Session session) throws JMSException {
  +    }
  +
  +    /**
  +     * @return the current thread's MessengerSession
  +     */    
  +    protected MessengerSession getMessengerSession() throws JMSException {
  +        MessengerSession answer = (MessengerSession) messengerSessionPool.get();
           if (answer == null) {
  -            answer = createSession();
  -            listenerSessionPool.set(answer);
  +            answer = createMessengerSession();
  +            messengerSessionPool.set(answer);
           }
           return answer;
       }
  -    
  -    protected void returnListenerSession(Session session) throws JMSException {
  -    }
  -    
  -    /** Factory method to create a new JMS Session */
  -    protected Session createSession() throws JMSException {
  -        return getSessionFactory().createSession(getConnection());
  +
  +    /**
  +     * Factory method to create a new MessengerSession
  +     */    
  +    protected MessengerSession createMessengerSession() throws JMSException {
  +        return new MessengerSession( this, getSessionFactory() );
       }
       
       /** Factory method to create a SessionFactory.
  
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java
  
  Index: MessengerSession.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   * 
   * $Id: MessengerDigester.java,v 1.4 2001/11/13 12:46:10 jstrachan Exp $
   */
  package org.apache.commons.messenger;
  
  import java.util.HashMap;
  import java.util.Map;
  
  import javax.jms.Destination;
  import javax.jms.JMSException;
  import javax.jms.Message;
  import javax.jms.MessageConsumer;
  import javax.jms.MessageListener;
  import javax.jms.MessageProducer;
  import javax.jms.Queue;
  import javax.jms.QueueConnection;
  import javax.jms.QueueRequestor;
  import javax.jms.QueueSender;
  import javax.jms.QueueSession;
  import javax.jms.Session;
  import javax.jms.Topic;
  import javax.jms.TopicConnection;
  import javax.jms.TopicPublisher;
  import javax.jms.TopicRequestor;
  import javax.jms.TopicSession;
  
  /** <p><code>MessengerSession</code> represents all the local information for a single thread.</p>
    *
    * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
    * @version $Revision: 1.4 $
    */
  public class MessengerSession {
  
      /** the JMS Session for this thread */
      private Session session;
      
      /** the JMS Listener (async subscription) Session for this thread */
      private Session listenerSession;
      
      /** The factory used to create each thread's JMS Session */
      private SessionFactory sessionFactory;
  
      /** An optional cache of requestors */
      private Map requestorsMap;    
  
      /** The inbox which is used for the call() methods */
      private Destination replyToDestination;
  
      /** The current messenger to which I'm connected */
      private MessengerSupport messenger;
  
      public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
          this.messenger = messenger;
          this.sessionFactory = sessionFactory;
      }
  
      public SessionFactory getSessionFactory() {
          return sessionFactory;        
      }
      
      /** 
       * @return the JMS Session for this thread for synchronous mode 
       */
      public Session getSession() throws JMSException {
          if ( session == null ) {
              session = createSession();
          }
          return session;
      }
      
      /** 
       * @return the JMS Session for this thread for asynchronous mode 
       */
      public Session getListenerSession() throws JMSException {
          if ( listenerSession == null ) {
              listenerSession = createSession();
          }
          return listenerSession;
      }
      
      /** 
       * @return the reply to destination (a temporary queue) 
       * used to reply to this thread and session
       */
      protected Destination getReplyToDestination() throws JMSException {
          if (replyToDestination == null) {
              replyToDestination = createTemporaryDestination();
          }
          return replyToDestination;
      }
      
      /**
       * @return either a cached TopicRequestor or creates a new one
       */
      public TopicRequestor getTopicRequestor(
          TopicSession session,
          Topic destination)
          throws JMSException {
          if (messenger.isCacheRequestors()) {
              TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination);
              if (requestor == null) {
                  requestor = new TopicRequestor(session, destination);
                  getRequestorsMap().put(destination, requestor);
              }
              return requestor;
          }
          else {
              return new TopicRequestor(session, destination);
          }
      }
      
      /**
       * @return either a cached QueueRequestor or creates a new one
       */
      public QueueRequestor getQueueRequestor(
          QueueSession session,
          Queue destination)
          throws JMSException {
          if (messenger.isCacheRequestors()) {
              QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination);
              if (requestor == null) {
                  requestor = new QueueRequestor(session, destination);
                  getRequestorsMap().put(destination, requestor);
              }
              return requestor;
          }
          else {
              return new QueueRequestor(session, destination);
          }
      }
  
      
      /** 
       * Factory method to create a new JMS Session 
       */
      protected Session createSession() throws JMSException {
          return getSessionFactory().createSession(messenger.getConnection());
      }
      
      /**
       * Factory method to create a new temporary destination
       */
  	protected Destination createTemporaryDestination() throws JMSException {
  		if (messenger.isTopic(session)) {
  			TopicSession topicSession = (TopicSession) session;
  			return topicSession.createTemporaryTopic();
  		} else {
  			QueueSession queueSession = (QueueSession) session;
  			return queueSession.createTemporaryQueue();
  		}
  	}
      
      
      /** 
       * @return the map of requestors, indexed by destination.
       *  The Map will be lazily constructed
       */
      protected Map getRequestorsMap() {
          if ( requestorsMap == null ) {
              requestorsMap = new HashMap();
          }
          return requestorsMap;
      }
  }
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>