You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2002/05/15 16:36:35 UTC

cvs commit: jakarta-commons-sandbox/messenger/src/webapp/src EchoMDO.java

jstrachan    02/05/15 07:36:35

  Modified:    messenger/src/java/org/apache/commons/messenger
                        SessionFactory.java MessengerSupport.java
                        Messenger.java DefaultMessenger.java
               messenger/src/java/org/apache/commons/messagelet/impl
                        Subscription.java SubscriptionDigester.java
               messenger/src/conf MANIFEST.MF MessengerSpiritWave.xml
                        subscribe.xml
               messenger build.xml TODO.txt
               messenger/src/java/org/apache/commons/messagelet
                        ManagerServlet.java MessengerMDO.java
               messenger/src/webapp/conf subscriptions.xml
               messenger/src/webapp/src EchoMDO.java
  Added:       messenger .project
               messenger/src/java/org/apache/commons/messagelet
                        BridgeMDO.java
  Log:
  Added support for bridges in the Messagelet engine. 
  
  This allows messages to be consumed on one JMS connection, subject and selector and publish them on another JMS connection and destination.
  For example this mechanism can be used to bridge SpiritWave messages to some other JMS provider like MQSeries.
  
  Also the subscription.xml has been modified a little, adding a new <bridge> element and using 'connection' and 'subject' to refer to the JMS connection name and destination.
  
  Revision  Changes    Path
  1.11      +1 -277    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SessionFactory.java
  
  Index: SessionFactory.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SessionFactory.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- SessionFactory.java	26 Feb 2002 04:12:53 -0000	1.10
  +++ SessionFactory.java	15 May 2002 14:36:33 -0000	1.11
  @@ -1,277 +1 @@
  -/*
  - * Copyright (C) The Apache Software Foundation. All rights reserved.
  - *
  - * This software is published under the terms of the Apache Software License
  - * version 1.1, a copy of which has been included with this distribution in
  - * the LICENSE file.
  - * 
  - * $Id: SessionFactory.java,v 1.10 2002/02/26 04:12:53 jstrachan Exp $
  - */
  -package org.apache.commons.messenger;
  -
  -import java.io.Serializable;
  -import java.util.Properties;
  -
  -import javax.jms.Connection;
  -import javax.jms.ConnectionFactory;
  -import javax.jms.JMSException;
  -import javax.jms.MessageListener;
  -import javax.jms.QueueConnection;
  -import javax.jms.QueueConnectionFactory;
  -import javax.jms.ServerSessionPool;
  -import javax.jms.Session;
  -import javax.jms.TopicConnection;
  -import javax.jms.TopicConnectionFactory;
  -
  -import org.apache.commons.logging.Log;
  -import org.apache.commons.logging.LogFactory;
  -
  -
  -/** <p><code>SessionFactory</code> is a Factory of JMS Session objects.
  -  * It can be configured with a JMS Connection object to use or can use 
  -  * a JMS ConnectionFactory instance to create the JMS Connection lazily</p>
  -  *
  -  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.10 $
  -  */
  -public class SessionFactory {
  -
  -    /** Logger */
  -    private static final Log log = LogFactory.getLog( SessionFactory.class );
  -    
  -    /** The JMS connection used to create JMS sessions */
  -    private Connection connection;
  -    /** The JMS ConnectionFactory used to create JMS Connection instances */
  -    private ConnectionFactory connectionFactory;
  -    /** JMS acknowlege mode used on each session */
  -    private int acknowlegeMode = Session.AUTO_ACKNOWLEDGE;    
  -    /** whether JMS sessions should be transacted */
  -    private boolean transacted;
  -    /** the optional username used when creating a new JMS connection via a JMS ConnectionFactory */
  -    private String username;
  -    /** the optional password used when creating a new JMS connection via a JMS ConnectionFactory */
  -    private String password;
  -    /** the properties used to create the connection */
  -    protected Properties properties;
  -    /** Whether to use a Topic or Queue connection/session */
  -    private boolean topic = true;
  -
  -    
  -    
  -    /** Creates a new Session instance */
  -    public Session createSession(Connection connection) throws JMSException {
  -        if ( topic ) {
  -            TopicConnection topicConnection = (TopicConnection) connection;
  -            return topicConnection.createTopicSession( isTransacted(), getAcknowledgeMode() );
  -        }
  -        else {
  -            QueueConnection queueConnection = (QueueConnection) connection;
  -            return queueConnection.createQueueSession( isTransacted(), getAcknowledgeMode() );
  -        }
  -    }   
  -    
  -    /** Creates a new Session instance */
  -    public Session createSession() throws JMSException {
  -        Connection connection = getConnection();
  -        if ( topic ) {
  -            TopicConnection topicConnection = (TopicConnection) connection;
  -            return topicConnection.createTopicSession( isTransacted(), getAcknowledgeMode() );
  -        }
  -        else {
  -            QueueConnection queueConnection = (QueueConnection) connection;
  -            return queueConnection.createQueueSession( isTransacted(), getAcknowledgeMode() );
  -        }
  -    }   
  -    
  -    public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads) throws JMSException {
  -        return new DefaultServerSessionPool(this, messageListener, maxThreads);
  -    }
  -
  -    /** Factory method used to create a connection */
  -    public Connection createConnection() throws JMSException {
  -        ConnectionFactory factory = getConnectionFactory();
  -        if ( factory == null ) {
  -            throw new JMSException( "No ConnectionFactory configured. Cannot create a JMS Session" );
  -        }
  -        
  -        if ( log.isDebugEnabled() ) {
  -            log.debug( "About to create a connection from: " + factory );
  -        }
  -        
  -        if ( topic ) {
  -            return createTopicConnection((TopicConnectionFactory) factory);
  -        }
  -        else {
  -            return createQueueConnection((QueueConnectionFactory) factory);
  -        }
  -    }
  -
  -    /** Closes the JMS Connection that this object is using, if any */
  -    public void close() throws JMSException {
  -        if ( connection != null ) {
  -            connection.close();
  -            connection = null;
  -        }
  -    }
  -    
  -    // Properties
  -    //-------------------------------------------------------------------------    
  -    
  -    /** Returns the JMS connection used to create new sessions */
  -    public Connection getConnection() throws JMSException {
  -        if ( connection == null ) {
  -            setConnection( createConnection() );
  -            
  -            connection.start();
  -        }
  -        return connection;
  -    }
  -    
  -    public void setConnection(Connection connection) {
  -        this.connection = connection;
  -        
  -        // change the topic flag if the wrong topic/queue type
  -        if ( topic ) {
  -            if ( ! ( connection instanceof TopicConnection ) ) {
  -                setTopic( false );
  -            }
  -        }
  -        else {
  -            if ( ! ( connection instanceof QueueConnection ) ) {
  -                setTopic( true );
  -            }
  -        }
  -    }
  -    
  -    /** Returns the JMS ConnectionFactory used to create a new connection */
  -    public ConnectionFactory getConnectionFactory() throws JMSException {
  -        if ( connectionFactory == null ) {
  -            setConnectionFactory( createConnectionFactory() );
  -        }
  -        return connectionFactory;
  -    }
  -    
  -    public void setConnectionFactory(ConnectionFactory connectionFactory) {
  -        this.connectionFactory = connectionFactory;
  -        
  -        // change the topic flag if the wrong topic/queue type
  -        if ( topic ) {
  -            if ( ! ( connectionFactory instanceof TopicConnectionFactory ) ) {
  -                setTopic( false );
  -            }
  -        }
  -        else {
  -            if ( ! ( connectionFactory instanceof QueueConnectionFactory ) ) {
  -                setTopic( true );
  -            }
  -        }
  -    }
  -    
  -    /** Returns true if sessions created by this factory should be transacted */
  -    public boolean isTransacted() {
  -        return transacted;
  -    }
  -    
  -    public void setTransacted(boolean transacted) {
  -        this.transacted = transacted;
  -    }
  -    
  -    
  -    /** Returns the JMS acknowledge mode used by the JMS sessions created by this session */
  -    public int getAcknowledgeMode() {
  -        return acknowlegeMode;
  -    }
  -    
  -    public void setAcknowledgeMode(int acknowlegeMode) {
  -        this.acknowlegeMode = acknowlegeMode;
  -    }
  -
  -    
  -    /** Returns the optional username used when creating a new JMS connection via a JMS ConnectionFactory */
  -    public String getUsername() {
  -        return username;
  -    }
  -    
  -    public void setUsername(String username) {
  -        this.username = username;
  -    }
  -    
  -    /** Returns the optional password used when creating a new JMS connection via a JMS ConnectionFactory */
  -    public String getPassword() {
  -        return password;
  -    }
  -    
  -    public void setPassword(String password) {
  -        this.password = password;
  -    }
  -
  -    /** Returns the Properties that can be used to configure the connection creation */
  -    public Properties getProperties() {
  -        if ( properties == null ) {
  -            properties = createProperties();
  -        }
  -        return properties;
  -    }
  -    
  -    public void setProperties(Properties properties) {
  -        this.properties = properties;
  -    }
  -
  -    public void addProperty(String name, String value) {
  -        getProperties().setProperty(name, value);
  -    }
  -    
  -    /** @return whether to use a Topic or Queue connection/session */
  -    public boolean isTopic() {
  -        return topic;
  -    }
  -    
  -    /** Sets whether to use a Topic or Queue connection/session */
  -    public void setTopic(boolean topic) {
  -        this.topic = topic;
  -    }
  -    
  -    
  -    // Implementation methods
  -    //-------------------------------------------------------------------------    
  -    
  -    protected QueueConnection createQueueConnection(QueueConnectionFactory queueConnectionFactory) throws JMSException {
  -        if ( username != null || password != null ) {
  -            return queueConnectionFactory.createQueueConnection( username, password );
  -        }
  -        else {
  -            return queueConnectionFactory.createQueueConnection();
  -        }
  -    }
  -    
  -    protected TopicConnection createTopicConnection(TopicConnectionFactory topicConnectionFactory) throws JMSException {
  -        if ( username != null || password != null ) {
  -            return topicConnectionFactory.createTopicConnection( username, password );
  -        }
  -        else {
  -            return topicConnectionFactory.createTopicConnection();
  -        }
  -    }
  -    
  -    
  -    /** Factory method used to create a connection factory. 
  -      * Derived classes may wish to use JNDI to load the ConnectionFactory
  -      */
  -    protected ConnectionFactory createConnectionFactory() throws JMSException {
  -        return null;
  -    }
  -    
  -    /** Factory method used to create the initial JNDI context properties.
  -      * Derived classes may wish to overload this method to provide different properties
  -      */
  -    protected Properties createProperties() {
  -        try {
  -            return new Properties( System.getProperties() );
  -        }
  -        catch (Throwable e) {
  -            // security exceptoin
  -            return new Properties();
  -        }
  -    }
  -}
  -
  +/*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE file.
 * 
 * $Id: SessionFactory.java,v 1.11 2002/05/15 14:36:33 jstrachan Exp $
 */
package org.apache.commons.messenger;

import java.io.Serializable;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/** <p><code>SessionFactory</code> is a Factory of JMS Session objects.
  * It can be configured with a JMS Connection object to use or can use 
  * a JMS ConnectionFactory instance to create the JMS Connection lazily</p>
  *
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  * @version $Revision: 1.11 $
  */
public class SessionFactory {

    /** Logger */
    private static final Log log = LogFactory.getLog(SessionFactory.class);

    /** The JMS connection used to create JMS sessions */
    private Connection connection;

    /** The JMS ConnectionFactory used to create JMS Connection instances */
    private ConnectionFactory connectionFactory;

    /** JMS acknowlege mode used on each session */
    private int acknowlegeMode = Session.AUTO_ACKNOWLEDGE;

    /** whether JMS sessions should be transacted */
    private boolean transacted;

    /** the optional username used when creating a new JMS connection via a JMS ConnectionFactory */
    private String username;

    /** the optional password used when creating a new JMS connection via a JMS ConnectionFactory */
    private String password;

    /** the properties used to create the connection */
    protected Properties properties;

    /** Whether to use a Topic or Queue connection/session */
    private boolean topic = true;

    /** Creates a new Session instance */
    public Session createSession(Connection connection) throws JMSException {
        if (topic) {
            TopicConnection topicConnection = (TopicConnection) connection;
            return topicConnection.createTopicSession(isTransacted(), getAcknowledgeMode());
        }
        else {
            QueueConnection queueConnection = (QueueConnection) connection;
            return queueConnection.createQueueSession(isTransacted(), getAcknowledgeMode());
        }
    }

    /** Creates a new Session instance */
    public Session createSession() throws JMSException {
        Connection connection = getConnection();
        if (topic) {
            TopicConnection topicConnection = (TopicConnection) connection;
            return topicConnection.createTopicSession(isTransacted(), getAcknowledgeMode());
        }
        else {
            QueueConnection queueConnection = (QueueConnection) connection;
            return queueConnection.createQueueSession(isTransacted(), getAcknowledgeMode());
        }
    }

    public ServerSessionPool createServerSessionPool(
        MessageListener messageListener,
        int maxThreads)
        throws JMSException {
        return new DefaultServerSessionPool(this, messageListener, maxThreads);
    }

    /** Factory method used to create a connection */
    public Connection createConnection() throws JMSException {
        ConnectionFactory factory = getConnectionFactory();
        if (factory == null) {
            throw new JMSException("No ConnectionFactory configured. Cannot create a JMS Session");
        }
        if (log.isDebugEnabled()) {
            log.debug("About to create a connection from: " + factory);
        }
        if (topic) {
            return createTopicConnection((TopicConnectionFactory) factory);
        }
        else {
            return createQueueConnection((QueueConnectionFactory) factory);
        }
    }

    /** Closes the JMS Connection that this object is using, if any */
    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
            connection = null;
        }
    }


    // Properties
    //-------------------------------------------------------------------------    
    /** Returns the JMS connection used to create new sessions */
    public Connection getConnection() throws JMSException {
        if (connection == null) {
            setConnection(createConnection());
            connection.start();
        }
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
        // change the topic flag if the wrong topic/queue type
        if (topic) {
            if (!(connection instanceof TopicConnection)) {
                setTopic(false);
            }
        }
        else {
            if (!(connection instanceof QueueConnection)) {
                setTopic(true);
            }
        }
    }

    /** Returns the JMS ConnectionFactory used to create a new connection */
    public ConnectionFactory getConnectionFactory() throws JMSException {
        if (connectionFactory == null) {
            setConnectionFactory(createConnectionFactory());
        }
        return connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
        // change the topic flag if the wrong topic/queue type
        if (topic) {
            if (!(connectionFactory instanceof TopicConnectionFactory)) {
                setTopic(false);
            }
        }
        else {
            if (!(connectionFactory instanceof QueueConnectionFactory)) {
                setTopic(true);
            }
        }
    }

    /** Returns true if sessions created by this factory should be transacted */
    public boolean isTransacted() {
        return transacted;
    }

    public void setTransacted(boolean transacted) {
        this.transacted = transacted;
    }

    /** Returns the JMS acknowledge mode used by the JMS sessions created by this session */
    public int getAcknowledgeMode() {
        return acknowlegeMode;
    }

    public void setAcknowledgeMode(int acknowlegeMode) {
        this.acknowlegeMode = acknowlegeMode;
    }

    /** Returns the optional username used when creating a new JMS connection via a JMS ConnectionFactory */
    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    /** Returns the optional password used when creating a new JMS connection via a JMS ConnectionFactory */
    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    /** Returns the Properties that can be used to configure the connection creation */
    public Properties getProperties() {
        if (properties == null) {
            properties = createProperties();
        }
        return properties;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    public void addProperty(String name, String value) {
        getProperties().setProperty(name, value);
    }

    /** @return whether to use a Topic or Queue connection/session */
    public boolean isTopic() {
        return topic;
    }

    /** Sets whether to use a Topic or Queue connection/session */
    public void setTopic(boolean topic) {
        this.topic = topic;
    }


    // Implementation methods
    //-------------------------------------------------------------------------    
    protected QueueConnection createQueueConnection(QueueConnectionFactory queueConnectionFactory)
        throws JMSException {
        if (username != null || password != null) {
            return queueConnectionFactory.createQueueConnection(username, password);
        }
        else {
            return queueConnectionFactory.createQueueConnection();
        }
    }

    protected TopicConnection createTopicConnection(TopicConnectionFactory topicConnectionFactory)
        throws JMSException {
        if (username != null || password != null) {
            return topicConnectionFactory.createTopicConnection(username, password);
        }
        else {
            return topicConnectionFactory.createTopicConnection();
        }
    }

    /** Factory method used to create a connection factory. 
      * Derived classes may wish to use JNDI to load the ConnectionFactory
      */
    protected ConnectionFactory createConnectionFactory() throws JMSException {
        return null;
    }

    /** Factory method used to create the initial JNDI context properties.
      * Derived classes may wish to overload this method to provide different properties
      */
    protected Properties createProperties() {
        try {
            return new Properties(System.getProperties());
        }
        catch (Throwable e) {
            // security exceptoin
            return new Properties();
        }
    }
}
  \ No newline at end of file
  
  
  
  1.20      +1 -905    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
  
  Index: MessengerSupport.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
  retrieving revision 1.19
  retrieving revision 1.20
  diff -u -r1.19 -r1.20
  --- MessengerSupport.java	26 Feb 2002 04:12:53 -0000	1.19
  +++ MessengerSupport.java	15 May 2002 14:36:34 -0000	1.20
  @@ -1,905 +1 @@
  -/*
  - * Copyright (C) The Apache Software Foundation. All rights reserved.
  - *
  - * This software is published under the terms of the Apache Software License
  - * version 1.1, a copy of which has been included with this distribution in
  - * the LICENSE file.
  - *
  - * $Id: MessengerSupport.java,v 1.19 2002/02/26 04:12:53 jstrachan Exp $
  - */
  -package org.apache.commons.messenger;
  -
  -import java.io.Serializable;
  -import java.util.HashMap;
  -import java.util.Map;
  -
  -import javax.jms.BytesMessage;
  -import javax.jms.Connection;
  -import javax.jms.ConnectionConsumer;
  -import javax.jms.ConnectionFactory;
  -import javax.jms.Destination;
  -import javax.jms.JMSException;
  -import javax.jms.MapMessage;
  -import javax.jms.Message;
  -import javax.jms.MessageConsumer;
  -import javax.jms.MessageListener;
  -import javax.jms.MessageProducer;
  -import javax.jms.ObjectMessage;
  -import javax.jms.Queue;
  -import javax.jms.QueueConnection;
  -import javax.jms.QueueRequestor;
  -import javax.jms.QueueSender;
  -import javax.jms.QueueSession;
  -import javax.jms.Session;
  -import javax.jms.StreamMessage;
  -import javax.jms.ServerSessionPool;
  -import javax.jms.TextMessage;
  -import javax.jms.Topic;
  -import javax.jms.TopicConnection;
  -import javax.jms.TopicPublisher;
  -import javax.jms.TopicRequestor;
  -import javax.jms.TopicSession;
  -
  -
  -/** <p><code>MessengerSupport</code> is an abstract base class which implements
  -  * most of the functionality of Messenger. Derivations need to specify the
  -  * connection and session creation and the pooling strategy.</p>
  -  *
  -  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.19 $
  -  */
  -public abstract class MessengerSupport implements Messenger {
  -
  -    private static final boolean CACHE_REQUESTOR = true;
  -
  -
  -    /** The name of the Messenger */
  -    private String name;
  -
  -    /** Whether Queue's and Topic's are looked up using JNDI (true)
  -    * or wether they should be created on the fly */
  -    private boolean jndiDestinations;
  -
  -    /** are topic subscribers durable? */
  -    private boolean durable;
  -
  -    /** the durable name used for durable topic based subscriptions */
  -    private String durableName;
  -
  -    /** whether local messages are ignored when topic based subscription is used
  -     * with a message selector */
  -    private boolean noLocal;
  -
  -    /** A Map of ListenerKey objects to MessageConsumer objects */
  -    private Map listeners = new HashMap();
  -
  -    /** A Map of MessageConsumer objects indexed by Destination or Destination and selector */
  -    private Map consumers = new HashMap();
  -
  -    /** A Map of MessageProducer objects indexed by Destination */
  -    private Map producers = new HashMap();
  -
  -    ///** A Map of Queue or Topic Requestors indexed by Destination */
  -    //private Map requestors = new HashMap();
  -    private ThreadLocal requestorsMap = new ThreadLocal() {
  -        protected Object initialValue() {
  -            return new HashMap();
  -        }
  -    };
  -
  -    /** The inbox which is used for the call() methods */
  -    private Destination replyToDestination;
  -
  -    public MessengerSupport() {
  -    }
  -
  -    public String toString() {
  -        try {
  -            Session session = borrowSession();
  -            String answer = super.toString() + " session: " + session.toString();
  -            returnSession( session );
  -            return answer;
  -        }
  -        catch (Exception e) {
  -            return super.toString() + " session: " + e.toString();
  -        }
  -    }
  -
  -    public Destination getDestination(String subject) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            if ( isTopic( session ) ) {
  -                return getTopic( (TopicSession) session, subject );
  -            }
  -            else {
  -                return getQueue( (QueueSession) session, subject );
  -            }
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Destination createTemporaryDestination() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            if ( isTopic( session ) ) {
  -                TopicSession topicSession = (TopicSession) session;
  -                return topicSession.createTemporaryTopic();
  -            }
  -            else {
  -                QueueSession queueSession = (QueueSession) session;
  -                return queueSession.createTemporaryQueue();
  -            }
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public void send( Destination destination, Message message ) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            if ( isTopic( producer ) ) {
  -                ((TopicPublisher) producer).publish( message );
  -            }
  -            else {
  -                ((QueueSender) producer).send( message );
  -            }
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Message call( Destination destination, Message message ) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            if ( isTopic( session ) ) {
  -                TopicRequestor requestor = getTopicRequestor( (TopicSession) session, (Topic) destination );
  -                return requestor.request( message );
  -            }
  -            else {
  -                QueueRequestor requestor = getQueueRequestor( (QueueSession) session, (Queue) destination );
  -                return requestor.request( message );
  -            }
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -/*
  -    public Message call( Destination destination, Message message ) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            Destination replyTo = getReplyToDestination();
  -            message.setJMSReplyTo(replyTo);
  -
  -            MessageProducer producer = getMessageProducer( session, destination );
  -            MessageConsumer consumer = getMessageConsumer( session, replyTo );
  -
  -            if ( isTopic( session ) ) {
  -                ((TopicPublisher) producer).publish( message );
  -            }
  -            else {
  -                ((QueueSender) producer).send( message );
  -            }
  -            return consumer.receive();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -*/
  -
  -    public Message call( Destination destination, Message message, long timeoutMillis ) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        MessageConsumer consumer = null;
  -        try {
  -            Destination replyTo = getReplyToDestination();
  -            message.setJMSReplyTo(replyTo);
  -
  -            producer = getMessageProducer( session, destination );
  -            consumer = getMessageConsumer( session, replyTo );
  -
  -            if ( isTopic( session ) ) {
  -                ((TopicPublisher) producer).publish( message );
  -            }
  -            else {
  -                ((QueueSender) producer).send( message );
  -            }
  -            return consumer.receive(timeoutMillis);
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Message receive(Destination destination) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            MessageConsumer consumer = getMessageConsumer( session, destination );
  -            return consumer.receive();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Message receive(Destination destination, String selector) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            MessageConsumer consumer = getMessageConsumer( session, destination, selector );
  -            return consumer.receive();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Message receive(Destination destination, long timeoutMillis) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            MessageConsumer consumer = getMessageConsumer( session, destination );
  -            return consumer.receive(timeoutMillis);
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            MessageConsumer consumer = getMessageConsumer( session, destination, selector );
  -            return consumer.receive(timeoutMillis);
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Message receiveNoWait(Destination destination) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            MessageConsumer consumer = getMessageConsumer( session, destination );
  -            return consumer.receiveNoWait();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Message receiveNoWait(Destination destination, String selector) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            MessageConsumer consumer = getMessageConsumer( session, destination, selector );
  -            return consumer.receiveNoWait();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public MessageConsumer createConsumer(Destination destination) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return createMessageConsumer( session, destination );
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return createMessageConsumer( session, destination, selector );
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public void run() {
  -        // don't return sessions which throw an exception
  -        try {
  -            Session session = borrowSession();
  -            session.run();
  -            returnSession( session );
  -        }
  -        catch (JMSException e) {
  -            // ### ignore
  -        }
  -    }
  -
  -    public ConnectionConsumer createConnectionConsumer(Destination destination, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
  -        return createConnectionConsumer(destination, null, sessionPool, maxMessages);
  -    }
  -
  -    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
  -        Connection connection = getConnection();
  -        if ( isTopic( connection ) ) {
  -            TopicConnection topicConnection = (TopicConnection) connection;
  -            if ( isDurable() ) {
  -                return topicConnection.createDurableConnectionConsumer( (Topic) destination, getDurableName(), selector, sessionPool, maxMessages );
  -            }
  -            else {
  -                return topicConnection.createConnectionConsumer( (Topic) destination, selector, sessionPool, maxMessages );
  -            }
  -        }
  -        else {
  -            QueueConnection queueConnection = (QueueConnection) connection;
  -            return queueConnection.createConnectionConsumer( (Queue) destination, selector, sessionPool, maxMessages );
  -        }
  -    }
  -
  -    public abstract Connection getConnection() throws JMSException;
  -
  -    // Listener API
  -    //-------------------------------------------------------------------------
  -
  -    public void addListener(Destination destination, MessageListener listener) throws JMSException {
  -        if ( listener instanceof MessengerListener ) {
  -            MessengerListener messengerListener = (MessengerListener) listener;
  -            messengerListener.setMessenger( this );
  -        }
  -        Session session = borrowListenerSession();
  -        try {
  -            MessageConsumer consumer = createMessageConsumer( session, destination );
  -            consumer.setMessageListener( listener );
  -
  -            ListenerKey key = new ListenerKey( destination, listener );
  -            listeners.put( key, consumer );
  -        }
  -        finally {
  -            returnListenerSession( session );
  -        }
  -    }
  -
  -    public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
  -        if ( listener instanceof MessengerListener ) {
  -            MessengerListener messengerListener = (MessengerListener) listener;
  -            messengerListener.setMessenger( this );
  -        }
  -        Session session = borrowListenerSession();
  -        try {
  -            MessageConsumer consumer = createMessageConsumer( session, destination, selector );
  -            consumer.setMessageListener( listener );
  -
  -            ListenerKey key = new ListenerKey( destination, listener, selector );
  -            listeners.put( key, consumer );
  -        }
  -        finally {
  -            returnListenerSession( session );
  -        }
  -    }
  -
  -
  -    public void removeListener(Destination destination, MessageListener listener ) throws JMSException {
  -        ListenerKey key = new ListenerKey( destination, listener );
  -        MessageConsumer consumer = (MessageConsumer) listeners.remove( key );
  -        if ( consumer == null ) {
  -            throw new JMSException( "The given listener object has not been added for the given destination" );
  -        }
  -        consumer.close();
  -    }
  -
  -    public void removeListener(Destination destination, String selector, MessageListener listener ) throws JMSException {
  -        ListenerKey key = new ListenerKey( destination, listener, selector );
  -        MessageConsumer consumer = (MessageConsumer) listeners.remove( key );
  -        if ( consumer == null ) {
  -            throw new JMSException( "The given listener object has not been added for the given destination and selector" );
  -        }
  -        consumer.close();
  -    }
  -
  -
  -    // Message factory methods
  -    //-------------------------------------------------------------------------
  -
  -    public BytesMessage createBytesMessage() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return session.createBytesMessage();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public MapMessage createMapMessage() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return session.createMapMessage();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public Message createMessage() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return session.createMessage();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public ObjectMessage createObjectMessage() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return session.createObjectMessage();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return session.createObjectMessage(object);
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public StreamMessage createStreamMessage() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return session.createStreamMessage();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public TextMessage createTextMessage() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return session.createTextMessage();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public TextMessage createTextMessage(String text) throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            return session.createTextMessage(text);
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public void commit() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            session.commit();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public void rollback() throws JMSException {
  -        Session session = borrowSession();
  -        try {
  -            session.rollback();
  -        }
  -        finally {
  -            returnSession( session );
  -        }
  -    }
  -
  -    public void close() throws JMSException {
  -        getConnection().close();
  -    }
  -
  -    /** Get the producer's default delivery mode. */
  -    public int getDeliveryMode(Destination destination)  throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        int deliveryMode = 0;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            deliveryMode = producer.getDeliveryMode( );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -        return deliveryMode;
  -    }
  -
  -    /** Set the producer's default delivery mode. */
  -    public void setDeliveryMode(Destination destination, int deliveryMode)  throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            producer.setDeliveryMode( deliveryMode );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -    }
  -    /**  Get the producer's default priority. */
  -    public int getPriority(Destination destination ) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        int priority = 0;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            priority = producer.getPriority( );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -        return priority;
  -    }
  -
  -    /**   Set the producer's default priority. */
  -    public void setPriority(Destination destination, int priority ) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            producer.setPriority( priority );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -    }
  -
  -    /**  Get the producer's default delivery mode. */
  -    public long getTimeToLive(Destination destination ) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        long timeToLive = 0;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            timeToLive = producer.getTimeToLive( );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -        return timeToLive;
  -    }
  -
  -    /**  <p>Set the default length of time in milliseconds from its dispatch time that
  -     *   a produced message should be retained by the message system.</p>
  -     */
  -    public void setTimeToLive(Destination destination, long timeToLive) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            producer.setTimeToLive( timeToLive );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -    }
  -
  -    /** Get an indication of whether message timestamps are disabled. */
  -    public boolean getDisableMessageTimestamp(Destination destination)  throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        boolean value = false;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            value = producer.getDisableMessageTimestamp( );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -        return value;
  -    }
  -
  -    /** Set whether message timestamps are disabled. */
  -    public void setDisableMessageTimestamp(Destination destination, boolean value) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            producer.setDisableMessageTimestamp( value );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -    }
  -
  -    /** Extends the send capability to send by specifying additional options. */
  -    public void send( Destination destination, Message message , int deliveryMode, int priority, long timeToLive) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            if ( isTopic(producer) ) {
  -                ((TopicPublisher) producer).publish( message, deliveryMode, priority, timeToLive );
  -            }
  -            else {
  -                ((QueueSender) producer).send( message, deliveryMode, priority, timeToLive);
  -            }
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -    }
  -
  -    /**  Get an indication of whether message IDs are disabled. */
  -    public boolean getDisableMessageID(Destination destination)  throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        boolean value = false;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            value = producer.getDisableMessageID( );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -        return value;
  -    }
  -
  -    /** Set whether message IDs are disabled. */
  -    public void setDisableMessageID(Destination destination, boolean value) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            producer = getMessageProducer( session, destination );
  -            producer.setDisableMessageID( value );
  -        }
  -        finally {
  -            producer.close();
  -            returnSession( session );
  -        }
  -    }
  -    
  -    // Properties
  -    //-------------------------------------------------------------------------
  -
  -    /** Gets the name that this Messenger is called in a MessengerManager */
  -    public String getName() {
  -        return name;
  -    }
  -
  -    /** Sets the name that this Messenger is called in a MessengerManager */
  -    public void setName(String name) {
  -        this.name = name;
  -    }
  -
  -    /** Setter for jndiDestinations */
  -    public void setJndiDestinations(boolean jndiDestinations){
  -        this.jndiDestinations = jndiDestinations;
  -    }
  -
  -    /** Getter for jndiDestinations */
  -    public boolean isJndiDestinations() {
  -        return jndiDestinations;
  -    }
  -
  -    /** Gets whether topic subscribers are durable or not */
  -    public boolean isDurable() {
  -        return noLocal;
  -    }
  -
  -    /** Sets whether topic subscribers are durable or not */
  -    public void setDurable(boolean durable) {
  -        this.durable = durable;
  -    }
  -
  -    /** Returns the durable name used for durable topic based subscriptions */
  -    public String getDurableName() {
  -        return durableName;
  -    }
  -
  -    /** Sets the durable name used for durable topic based subscriptions */
  -    public void setDurableName(String durableName) {
  -        this.durableName = durableName;
  -    }
  -
  -    /** Gets whether local messages are ignored when topic based subscription is used
  -     * with a message selector */
  -    public boolean isNoLocal() {
  -        return noLocal;
  -    }
  -
  -    /** Sets whether local messages are ignored when topic based subscription is used
  -     * with a message selector */
  -    public void setNoLocal(boolean noLocal) {
  -        this.noLocal = noLocal;
  -    }
  -
  -    // Implementation methods
  -    //-------------------------------------------------------------------------
  -
  -
  -    /** Borrows a session instance from the pool */
  -    protected abstract Session borrowSession() throws JMSException;
  -
  -    /** Returns a session instance back to the pool */
  -    protected abstract void returnSession(Session session) throws JMSException;
  -
  -    /** Deletes a session instance */
  -    protected abstract void deleteSession(Session session) throws JMSException;
  -
  -    /** Borrows a session instance from the pool */
  -    protected abstract Session borrowListenerSession() throws JMSException;
  -
  -    /** Returns a session instance back to the pool */
  -    protected abstract void returnListenerSession(Session session) throws JMSException;
  -
  -    protected abstract boolean isTopic(Connection connection) throws JMSException;
  -    
  -    protected abstract boolean isTopic(ConnectionFactory factory) throws JMSException;
  -    
  -    protected abstract boolean isTopic(Session session) throws JMSException;
  -    
  -    protected abstract boolean isTopic(MessageProducer producer) throws JMSException;
  -    
  -    /** Returns a message producer for the given session and destination */
  -    protected MessageProducer getMessageProducer( Session session, Destination destination ) throws JMSException {
  -        return createMessageProducer( session, destination );
  -/**
  -        MessageProducer producer = (MessageProducer) producers.get( destination );
  -        if ( producer == null ) {
  -            producer = createMessageProducer( session, destination );
  -        }
  -        return producer;
  -*/
  -    }
  -
  -    /** Returns a newly created message producer for the given session and destination */
  -    protected MessageProducer createMessageProducer( Session session, Destination destination ) throws JMSException {
  -        if ( isTopic( session ) ) {
  -            TopicSession topicSession = (TopicSession) session;
  -            return topicSession.createPublisher( (Topic) destination );
  -        }
  -        else {
  -            QueueSession queueSession = (QueueSession) session;
  -            return queueSession.createSender( (Queue) destination );
  -        }
  -    }
  -
  -    /** Returns a MessageConsumer for the given session and destination */
  -    protected MessageConsumer getMessageConsumer( Session session, Destination destination ) throws JMSException {
  -        return createMessageConsumer( session, destination );
  -/*
  -        MessageConsumer consumer = (MessageConsumer) consumers.get( destination );
  -        if ( consumer == null ) {
  -            consumer = createMessageConsumer( session, destination );
  -        }
  -        return consumer;
  -*/
  -    }
  -
  -    /** Returns a MessageConsumer for the given session, destination and selector */
  -    protected MessageConsumer getMessageConsumer( Session session, Destination destination, String selector ) throws JMSException {
  -        // XXXX: could do caching one day
  -        return createMessageConsumer( session, destination, selector );
  -    }
  -
  -    /** Returns a new MessageConsumer for the given session and destination */
  -    protected MessageConsumer createMessageConsumer( Session session, Destination destination ) throws JMSException {
  -        if ( isTopic( session ) ) {
  -            TopicSession topicSession = (TopicSession) session;
  -            if ( isDurable() ) {
  -                return topicSession.createDurableSubscriber(
  -                    (Topic) destination,
  -                    getDurableName()
  -                );
  -            }
  -            else {
  -                return topicSession.createSubscriber(
  -                    (Topic) destination
  -                );
  -            }
  -        }
  -        else {
  -            QueueSession queueSession = (QueueSession) session;
  -            return queueSession.createReceiver( (Queue) destination );
  -        }
  -    }
  -
  -    /** Returns a new MessageConsumer for the given session, destination and selector */
  -    protected MessageConsumer createMessageConsumer( Session session, Destination destination, String selector ) throws JMSException {
  -        if ( isTopic( session ) ) {
  -            TopicSession topicSession = (TopicSession) session;
  -            if ( isDurable() ) {
  -                return topicSession.createDurableSubscriber(
  -                    (Topic) destination,
  -                    getDurableName(),
  -                    selector,
  -                    isNoLocal()
  -                );
  -            }
  -            else {
  -                return topicSession.createSubscriber(
  -                    (Topic) destination,
  -                    selector,
  -                    isNoLocal()
  -                );
  -            }
  -        }
  -        else {
  -            QueueSession queueSession = (QueueSession) session;
  -            return queueSession.createReceiver(
  -                (Queue) destination,
  -                selector
  -            );
  -        }
  -    }
  -
  -    protected Queue getQueue(QueueSession session, String subject) throws JMSException {
  -        // XXXX: might want to cache
  -        return session.createQueue( subject );
  -    }
  -
  -    protected Topic getTopic(TopicSession session, String subject) throws JMSException {
  -        // XXXX: might want to cache
  -        return session.createTopic( subject );
  -    }
  -
  -    protected Destination getReplyToDestination() throws JMSException {
  -        if ( replyToDestination == null ) {
  -            replyToDestination = createTemporaryDestination();
  -        }
  -        return replyToDestination;
  -    }
  -
  -    protected TopicRequestor getTopicRequestor( TopicSession session, Topic destination ) throws JMSException {
  -        if ( CACHE_REQUESTOR ) {
  -            Map requestors = (Map) requestorsMap.get();
  -            TopicRequestor requestor = (TopicRequestor) requestors.get( destination );
  -            if ( requestor == null ) {
  -                requestor = new TopicRequestor( session, destination );
  -                requestors.put( destination, requestor );
  -            }
  -            return requestor;
  -        }
  -        else {
  -            return new TopicRequestor( session, destination );
  -        }
  -    }
  -
  -    protected QueueRequestor getQueueRequestor( QueueSession session, Queue destination ) throws JMSException {
  -        if ( CACHE_REQUESTOR ) {
  -            Map requestors = (Map) requestorsMap.get();
  -            QueueRequestor requestor = (QueueRequestor) requestors.get( destination );
  -            if ( requestor == null ) {
  -                requestor = new QueueRequestor( session, destination );
  -                requestors.put( destination, requestor );
  -            }
  -            return requestor;
  -        }
  -        else {
  -            return new QueueRequestor( session, destination );
  -        }
  -    }
  -
  -}
  -
  +/*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE file.
 *
 * $Id: MessengerSupport.java,v 1.20 2002/05/15 14:36:34 jstrachan Exp $
 */
package org.apache.commons.messenger;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueRequestor;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.ServerSessionPool;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
/** <p><code>MessengerSupport</code> is an abstract base class which implements
  * most of the functionality of Messenger. Derivations need to specify the
  * connection and session creation and the pooling strategy.</p>
  *
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  * @version $Revision: 1.20 $
  */
public abstract class MessengerSupport implements Messenger {

    private static final boolean CACHE_REQUESTOR = true;

    /** The name of the Messenger */
    private String name;

    /** 
     * Whether Queue's and Topic's are looked up using JNDI (true)
     * or wether they should be created on the fly 
     */
    private boolean jndiDestinations;
    
    /** are topic subscribers durable? */
    private boolean durable;
    
    /** the durable name used for durable topic based subscriptions */
    private String durableName;
    
    /** 
     * whether local messages are ignored when topic based subscription is used
     * with a message selector 
     */
    private boolean noLocal;
    
    /** A Map of ListenerKey objects to MessageConsumer objects */
    private Map listeners = new HashMap();
    
    /** A Map of MessageConsumer objects indexed by Destination or Destination and selector */
    private Map consumers = new HashMap();
    
    /** A Map of MessageProducer objects indexed by Destination */
    private Map producers = new HashMap();
    
    ///** A Map of Queue or Topic Requestors indexed by Destination */
    //private Map requestors = new HashMap();
    
    private ThreadLocal requestorsMap = new ThreadLocal() {
        protected Object initialValue() {
            return new HashMap();
        }
    };
    
    /** The inbox which is used for the call() methods */
    private Destination replyToDestination;
    
    public MessengerSupport() {
    }
    
    public String toString() {
        try {
            Session session = borrowSession();
            String answer = super.toString() + " session: " + session.toString();
            returnSession(session);
            return answer;
        }
        catch (Exception e) {
            return super.toString() + " session: " + e.toString();
        }
    }
    
    public Destination getDestination(String subject) throws JMSException {
        Session session = borrowSession();
        try {
            if (isTopic(session)) {
                return getTopic((TopicSession) session, subject);
            }
            else {
                return getQueue((QueueSession) session, subject);
            }
        }
        finally {
            returnSession(session);
        }
    }
    
    public Destination createTemporaryDestination() throws JMSException {
        Session session = borrowSession();
        try {
            if (isTopic(session)) {
                TopicSession topicSession = (TopicSession) session;
                return topicSession.createTemporaryTopic();
            }
            else {
                QueueSession queueSession = (QueueSession) session;
                return queueSession.createTemporaryQueue();
            }
        }
        finally {
            returnSession(session);
        }
    }
    
    public void send(Destination destination, Message message)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        try {
            producer = getMessageProducer(session, destination);
            if (isTopic(producer)) {
                ((TopicPublisher) producer).publish(message);
            }
            else {
                ((QueueSender) producer).send(message);
            }
        }
        finally {
            producer.close();
            returnSession(session);
        }
    }
    
    public Message call(Destination destination, Message message)
        throws JMSException {
        Session session = borrowSession();
        try {
            if (isTopic(session)) {
                TopicRequestor requestor =
                    getTopicRequestor((TopicSession) session, (Topic) destination);
                return requestor.request(message);
            }
            else {
                QueueRequestor requestor =
                    getQueueRequestor((QueueSession) session, (Queue) destination);
                return requestor.request(message);
            }
        }
        finally {
            returnSession(session);
        }
    }
    /*
        public Message call( Destination destination, Message message ) throws JMSException {
            Session session = borrowSession();
            try {
                Destination replyTo = getReplyToDestination();
                message.setJMSReplyTo(replyTo);
    
                MessageProducer producer = getMessageProducer( session, destination );
                MessageConsumer consumer = getMessageConsumer( session, replyTo );
    
                if ( isTopic( session ) ) {
                    ((TopicPublisher) producer).publish( message );
                }
                else {
                    ((QueueSender) producer).send( message );
                }
                return consumer.receive();
            }
            finally {
                returnSession( session );
            }
        }
    */
    
    public Message call(
        Destination destination,
        Message message,
        long timeoutMillis)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        try {
            Destination replyTo = getReplyToDestination();
            message.setJMSReplyTo(replyTo);
            producer = getMessageProducer(session, destination);
            consumer = getMessageConsumer(session, replyTo);
            if (isTopic(session)) {
                ((TopicPublisher) producer).publish(message);
            }
            else {
                ((QueueSender) producer).send(message);
            }
            return consumer.receive(timeoutMillis);
        }
        finally {
            producer.close();
            returnSession(session);
        }
    }
    
    public Message receive(Destination destination) throws JMSException {
        Session session = borrowSession();
        try {
            MessageConsumer consumer = getMessageConsumer(session, destination);
            return consumer.receive();
        }
        finally {
            returnSession(session);
        }
    }

    public Message receive(Destination destination, String selector)
        throws JMSException {
        Session session = borrowSession();
        try {
            MessageConsumer consumer = getMessageConsumer(session, destination, selector);
            return consumer.receive();
        }
        finally {
            returnSession(session);
        }
    }

    public Message receive(Destination destination, long timeoutMillis)
        throws JMSException {
        Session session = borrowSession();
        try {
            MessageConsumer consumer = getMessageConsumer(session, destination);
            return consumer.receive(timeoutMillis);
        }
        finally {
            returnSession(session);
        }
    }

    public Message receive(
        Destination destination,
        String selector,
        long timeoutMillis)
        throws JMSException {
        Session session = borrowSession();
        try {
            MessageConsumer consumer = getMessageConsumer(session, destination, selector);
            return consumer.receive(timeoutMillis);
        }
        finally {
            returnSession(session);
        }
    }

    public Message receiveNoWait(Destination destination) throws JMSException {
        Session session = borrowSession();
        try {
            MessageConsumer consumer = getMessageConsumer(session, destination);
            return consumer.receiveNoWait();
        }
        finally {
            returnSession(session);
        }
    }

    public Message receiveNoWait(Destination destination, String selector)
        throws JMSException {
        Session session = borrowSession();
        try {
            MessageConsumer consumer = getMessageConsumer(session, destination, selector);
            return consumer.receiveNoWait();
        }
        finally {
            returnSession(session);
        }
    }

    public MessageConsumer createConsumer(Destination destination)
        throws JMSException {
        Session session = borrowSession();
        try {
            return createMessageConsumer(session, destination);
        }
        finally {
            returnSession(session);
        }
    }

    public MessageConsumer createConsumer(Destination destination, String selector)
        throws JMSException {
        Session session = borrowSession();
        try {
            return createMessageConsumer(session, destination, selector);
        }
        finally {
            returnSession(session);
        }
    }

    public void run() {
        // don't return sessions which throw an exception
        try {
            Session session = borrowSession();
            session.run();
            returnSession(session);
        }
        catch (JMSException e) {
            // ### ignore
        }
    }

    public ConnectionConsumer createConnectionConsumer(
        Destination destination,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException {
        return createConnectionConsumer(destination, null, sessionPool, maxMessages);
    }

    public ConnectionConsumer createConnectionConsumer(
        Destination destination,
        String selector,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException {
        Connection connection = getConnection();
        if (isTopic(connection)) {
            TopicConnection topicConnection = (TopicConnection) connection;
            if (isDurable()) {
                return topicConnection.createDurableConnectionConsumer(
                    (Topic) destination,
                    getDurableName(),
                    selector,
                    sessionPool,
                    maxMessages);
            }
            else {
                return topicConnection.createConnectionConsumer(
                    (Topic) destination,
                    selector,
                    sessionPool,
                    maxMessages);
            }
        }
        else {
            QueueConnection queueConnection = (QueueConnection) connection;
            return queueConnection.createConnectionConsumer(
                (Queue) destination,
                selector,
                sessionPool,
                maxMessages);
        }
    }

    public abstract Connection getConnection() throws JMSException;


    // Listener API
    //-------------------------------------------------------------------------
    public void addListener(Destination destination, MessageListener listener)
        throws JMSException {
        if (listener instanceof MessengerListener) {
            MessengerListener messengerListener = (MessengerListener) listener;
            messengerListener.setMessenger(this);
        }
        Session session = borrowListenerSession();
        try {
            MessageConsumer consumer = createMessageConsumer(session, destination);
            consumer.setMessageListener(listener);
            ListenerKey key = new ListenerKey(destination, listener);
            listeners.put(key, consumer);
        }
        finally {
            returnListenerSession(session);
        }
    }

    public void addListener(
        Destination destination,
        String selector,
        MessageListener listener)
        throws JMSException {

        if (listener instanceof MessengerListener) {
            MessengerListener messengerListener = (MessengerListener) listener;
            messengerListener.setMessenger(this);
        }
        Session session = borrowListenerSession();
        try {
            MessageConsumer consumer =
                createMessageConsumer(session, destination, selector);
            consumer.setMessageListener(listener);
            ListenerKey key = new ListenerKey(destination, listener, selector);
            listeners.put(key, consumer);
        }
        finally {
            returnListenerSession(session);
        }
    }

    public void removeListener(Destination destination, MessageListener listener)
        throws JMSException {
        ListenerKey key = new ListenerKey(destination, listener);
        MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
        if (consumer == null) {
            throw new JMSException("The given listener object has not been added for the given destination");
        }
        consumer.close();
    }

    public void removeListener(
        Destination destination,
        String selector,
        MessageListener listener)
        throws JMSException {

        ListenerKey key = new ListenerKey(destination, listener, selector);
        MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
        if (consumer == null) {
            throw new JMSException("The given listener object has not been added for the given destination and selector");
        }
        consumer.close();
    }

    // Message factory methods
    //-------------------------------------------------------------------------
    public BytesMessage createBytesMessage() throws JMSException {
        Session session = borrowSession();
        try {
            return session.createBytesMessage();
        }
        finally {
            returnSession(session);
        }
    }

    public MapMessage createMapMessage() throws JMSException {
        Session session = borrowSession();
        try {
            return session.createMapMessage();
        }
        finally {
            returnSession(session);
        }
    }

    public Message createMessage() throws JMSException {
        Session session = borrowSession();
        try {
            return session.createMessage();
        }
        finally {
            returnSession(session);
        }
    }

    public ObjectMessage createObjectMessage() throws JMSException {
        Session session = borrowSession();
        try {
            return session.createObjectMessage();
        }
        finally {
            returnSession(session);
        }
    }

    public ObjectMessage createObjectMessage(Serializable object)
        throws JMSException {
        Session session = borrowSession();
        try {
            return session.createObjectMessage(object);
        }
        finally {
            returnSession(session);
        }
    }

    public StreamMessage createStreamMessage() throws JMSException {
        Session session = borrowSession();
        try {
            return session.createStreamMessage();
        }
        finally {
            returnSession(session);
        }
    }

    public TextMessage createTextMessage() throws JMSException {
        Session session = borrowSession();
        try {
            return session.createTextMessage();
        }
        finally {
            returnSession(session);
        }
    }

    public TextMessage createTextMessage(String text) throws JMSException {
        Session session = borrowSession();
        try {
            return session.createTextMessage(text);
        }
        finally {
            returnSession(session);
        }
    }

    public void commit() throws JMSException {
        Session session = borrowSession();
        try {
            session.commit();
        }
        finally {
            returnSession(session);
        }
    }

    public void rollback() throws JMSException {
        Session session = borrowSession();
        try {
            session.rollback();
        }
        finally {
            returnSession(session);
        }
    }

    public void close() throws JMSException {
        getConnection().close();
    }

    /** Get the producer's default delivery mode. */
    public int getDeliveryMode(Destination destination) throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        int deliveryMode = 0;
        try {
            producer = getMessageProducer(session, destination);
            deliveryMode = producer.getDeliveryMode();
        }
        finally {
            producer.close();
            returnSession(session);
        }
        return deliveryMode;
    }

    /** Set the producer's default delivery mode. */
    public void setDeliveryMode(Destination destination, int deliveryMode)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        try {
            producer = getMessageProducer(session, destination);
            producer.setDeliveryMode(deliveryMode);
        }
        finally {
            producer.close();
            returnSession(session);
        }
    }

    /**  Get the producer's default priority. */
    public int getPriority(Destination destination) throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        int priority = 0;
        try {
            producer = getMessageProducer(session, destination);
            priority = producer.getPriority();
        }
        finally {
            producer.close();
            returnSession(session);
        }
        return priority;
    }

    /**   Set the producer's default priority. */
    public void setPriority(Destination destination, int priority)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        try {
            producer = getMessageProducer(session, destination);
            producer.setPriority(priority);
        }
        finally {
            producer.close();
            returnSession(session);
        }
    }

    /**  Get the producer's default delivery mode. */
    public long getTimeToLive(Destination destination) throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        long timeToLive = 0;
        try {
            producer = getMessageProducer(session, destination);
            timeToLive = producer.getTimeToLive();
        }
        finally {
            producer.close();
            returnSession(session);
        }
        return timeToLive;
    }

    /**  <p>Set the default length of time in milliseconds from its dispatch time that
     *   a produced message should be retained by the message system.</p>
     */
    public void setTimeToLive(Destination destination, long timeToLive)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        try {
            producer = getMessageProducer(session, destination);
            producer.setTimeToLive(timeToLive);
        }
        finally {
            producer.close();
            returnSession(session);
        }
    }

    /** Get an indication of whether message timestamps are disabled. */
    public boolean getDisableMessageTimestamp(Destination destination)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        boolean value = false;
        try {
            producer = getMessageProducer(session, destination);
            value = producer.getDisableMessageTimestamp();
        }
        finally {
            producer.close();
            returnSession(session);
        }
        return value;
    }
    /** Set whether message timestamps are disabled. */

    public void setDisableMessageTimestamp(Destination destination, boolean value)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        try {
            producer = getMessageProducer(session, destination);
            producer.setDisableMessageTimestamp(value);
        }
        finally {
            producer.close();
            returnSession(session);
        }
    }

    /** Extends the send capability to send by specifying additional options. */
    public void send(
        Destination destination,
        Message message,
        int deliveryMode,
        int priority,
        long timeToLive)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        try {
            producer = getMessageProducer(session, destination);
            if (isTopic(producer)) {
                ((TopicPublisher) producer).publish(
                    message,
                    deliveryMode,
                    priority,
                    timeToLive);
            }
            else {
                ((QueueSender) producer).send(message, deliveryMode, priority, timeToLive);
            }
        }
        finally {
            producer.close();
            returnSession(session);
        }
    }

    /**  Get an indication of whether message IDs are disabled. */
    public boolean getDisableMessageID(Destination destination)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        boolean value = false;
        try {
            producer = getMessageProducer(session, destination);
            value = producer.getDisableMessageID();
        }
        finally {
            producer.close();
            returnSession(session);
        }
        return value;
    }

    /** Set whether message IDs are disabled. */
    public void setDisableMessageID(Destination destination, boolean value)
        throws JMSException {
        Session session = borrowSession();
        MessageProducer producer = null;
        try {
            producer = getMessageProducer(session, destination);
            producer.setDisableMessageID(value);
        }
        finally {
            producer.close();
            returnSession(session);
        }
    }


    // Properties
    //-------------------------------------------------------------------------
    /** Gets the name that this Messenger is called in a MessengerManager */
    public String getName() {
        return name;
    }

    /** Sets the name that this Messenger is called in a MessengerManager */
    public void setName(String name) {
        this.name = name;
    }

    /** Setter for jndiDestinations */
    public void setJndiDestinations(boolean jndiDestinations) {
        this.jndiDestinations = jndiDestinations;
    }

    /** Getter for jndiDestinations */
    public boolean isJndiDestinations() {
        return jndiDestinations;
    }

    /** Gets whether topic subscribers are durable or not */
    public boolean isDurable() {
        return noLocal;
    }

    /** Sets whether topic subscribers are durable or not */
    public void setDurable(boolean durable) {
        this.durable = durable;
    }

    /** Returns the durable name used for durable topic based subscriptions */
    public String getDurableName() {
        return durableName;
    }

    /** Sets the durable name used for durable topic based subscriptions */
    public void setDurableName(String durableName) {
        this.durableName = durableName;
    }

    /** 
     * Gets whether local messages are ignored when topic based subscription is used
     * with a message selector 
     */
    public boolean isNoLocal() {
        return noLocal;
    }
    
    /** 
     * Sets whether local messages are ignored when topic based subscription is used
     * with a message selector 
     */
    public void setNoLocal(boolean noLocal) {
        this.noLocal = noLocal;
    }
    
    
    // Implementation methods
    //-------------------------------------------------------------------------
    
    /** Borrows a session instance from the pool */
    protected abstract Session borrowSession() throws JMSException;
    
    /** Returns a session instance back to the pool */
    protected abstract void returnSession(Session session) throws JMSException;
    
    /** Deletes a session instance */
    protected abstract void deleteSession(Session session) throws JMSException;
    
    /** Borrows a session instance from the pool */
    protected abstract Session borrowListenerSession() throws JMSException;
    
    /** Returns a session instance back to the pool */
    protected abstract void returnListenerSession(Session session)
        throws JMSException;
    
    protected abstract boolean isTopic(Connection connection) throws JMSException;
    
    protected abstract boolean isTopic(ConnectionFactory factory)
        throws JMSException;
    
    protected abstract boolean isTopic(Session session) throws JMSException;
    
    protected abstract boolean isTopic(MessageProducer producer)
        throws JMSException;
    
    /** Returns a message producer for the given session and destination */
    protected MessageProducer getMessageProducer(
        Session session,
        Destination destination)
        throws JMSException {
        return createMessageProducer(session, destination);
        /**
                MessageProducer producer = (MessageProducer) producers.get( destination );
                if ( producer == null ) {
                    producer = createMessageProducer( session, destination );
                }
                return producer;
        */
    }
    
    /** Returns a newly created message producer for the given session and destination */
    protected MessageProducer createMessageProducer(
        Session session,
        Destination destination)
        throws JMSException {
        if (isTopic(session)) {
            TopicSession topicSession = (TopicSession) session;
            return topicSession.createPublisher((Topic) destination);
        }
        else {
            QueueSession queueSession = (QueueSession) session;
            return queueSession.createSender((Queue) destination);
        }
    }
    
    /** Returns a MessageConsumer for the given session and destination */
    protected MessageConsumer getMessageConsumer(
        Session session,
        Destination destination)
        throws JMSException {
        return createMessageConsumer(session, destination);
        /*
                MessageConsumer consumer = (MessageConsumer) consumers.get( destination );
                if ( consumer == null ) {
                    consumer = createMessageConsumer( session, destination );
                }
                return consumer;
        */
    }
    
    /** Returns a MessageConsumer for the given session, destination and selector */
    protected MessageConsumer getMessageConsumer(
        Session session,
        Destination destination,
        String selector)
        throws JMSException {
        // XXXX: could do caching one day
        return createMessageConsumer(session, destination, selector);
    }
    
    /** Returns a new MessageConsumer for the given session and destination */
    protected MessageConsumer createMessageConsumer(
        Session session,
        Destination destination)
        throws JMSException {
        if (isTopic(session)) {
            TopicSession topicSession = (TopicSession) session;
            if (isDurable()) {
                return topicSession.createDurableSubscriber(
                    (Topic) destination,
                    getDurableName());
            }
            else {
                return topicSession.createSubscriber((Topic) destination);
            }
        }
        else {
            QueueSession queueSession = (QueueSession) session;
            return queueSession.createReceiver((Queue) destination);
        }
    }
    
    /** Returns a new MessageConsumer for the given session, destination and selector */
    protected MessageConsumer createMessageConsumer(
        Session session,
        Destination destination,
        String selector)
        throws JMSException {
        if (isTopic(session)) {
            TopicSession topicSession = (TopicSession) session;
            if (isDurable()) {
                return topicSession.createDurableSubscriber(
                    (Topic) destination,
                    getDurableName(),
                    selector,
                    isNoLocal());
            }
            else {
                return topicSession.createSubscriber(
                    (Topic) destination,
                    selector,
                    isNoLocal());
            }
        }
        else {
            QueueSession queueSession = (QueueSession) session;
            return queueSession.createReceiver((Queue) destination, selector);
        }
    }
    
    protected Queue getQueue(QueueSession session, String subject)
        throws JMSException {
        // XXXX: might want to cache
        return session.createQueue(subject);
    }
    
    protected Topic getTopic(TopicSession session, String subject)
        throws JMSException {
        // XXXX: might want to cache
        return session.createTopic(subject);
    }
    
    protected Destination getReplyToDestination() throws JMSException {
        if (replyToDestination == null) {
            replyToDestination = createTemporaryDestination();
        }
        return replyToDestination;
    }
    
    protected TopicRequestor getTopicRequestor(
        TopicSession session,
        Topic destination)
        throws JMSException {
        if (CACHE_REQUESTOR) {
            Map requestors = (Map) requestorsMap.get();
            TopicRequestor requestor = (TopicRequestor) requestors.get(destination);
            if (requestor == null) {
                requestor = new TopicRequestor(session, destination);
                requestors.put(destination, requestor);
            }
            return requestor;
        }
        else {
            return new TopicRequestor(session, destination);
        }
    }
    
    protected QueueRequestor getQueueRequestor(
        QueueSession session,
        Queue destination)
        throws JMSException {
        if (CACHE_REQUESTOR) {
            Map requestors = (Map) requestorsMap.get();
            QueueRequestor requestor = (QueueRequestor) requestors.get(destination);
            if (requestor == null) {
                requestor = new QueueRequestor(session, destination);
                requestors.put(destination, requestor);
            }
            return requestor;
        }
        else {
            return new QueueRequestor(session, destination);
        }
    }
}
  \ No newline at end of file
  
  
  
  1.11      +1 -162    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java
  
  Index: Messenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- Messenger.java	26 Feb 2002 04:12:53 -0000	1.10
  +++ Messenger.java	15 May 2002 14:36:34 -0000	1.11
  @@ -1,162 +1 @@
  -/*
  - * Copyright (C) The Apache Software Foundation. All rights reserved.
  - *
  - * This software is published under the terms of the Apache Software License
  - * version 1.1, a copy of which has been included with this distribution in
  - * the LICENSE file.
  - * 
  - * $Id: Messenger.java,v 1.10 2002/02/26 04:12:53 jstrachan Exp $
  - */
  -package org.apache.commons.messenger;
  -
  -import java.io.Serializable;
  -
  -import javax.jms.BytesMessage;
  -import javax.jms.Connection;
  -import javax.jms.ConnectionConsumer;
  -import javax.jms.Destination;
  -import javax.jms.JMSException;
  -import javax.jms.MapMessage;
  -import javax.jms.Message;
  -import javax.jms.MessageConsumer;
  -import javax.jms.MessageListener;
  -import javax.jms.ObjectMessage;
  -import javax.jms.StreamMessage;
  -import javax.jms.ServerSessionPool;
  -import javax.jms.TextMessage;
  -
  -
  -/** <p><code>Messenger</code> a facade over the JMS API making it easy to use JMS
  -  * and hiding much of the complexity of threading and configuration.
  -  * A Messenger will internally associate a JMS Session with the calling thread
  -  * so that all methods called in the same thread (such as inside a Servlet or 
  -  * taglib) will use the same JMS Session.</p>
  -  *
  -  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.10 $
  -  */
  -public interface Messenger {
  -
  -    /** Temporary hack - this method has been added so that Messenger works better with the digester */
  -    public String getName();
  -
  -    /** Returns the destination for the given subject name */
  -    public Destination getDestination(String subject) throws JMSException;
  -    
  -    /** Returns a new temporary destination */
  -    public Destination createTemporaryDestination() throws JMSException;
  -    
  -    /** Sends a message on the given destination */
  -    public void send(Destination destination, Message message) throws JMSException;
  -
  -    /** Sends a message on the given destination and blocks until a response is returned */
  -    public Message call(Destination destination, Message message) throws JMSException;
  -
  -    /** Sends a message on the given destination and blocks until a response is returned or the given timeout period expires */
  -    public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException;
  -
  -    /** Receives a message on the given destination, blocking until one is returned */    
  -    public Message receive(Destination destination) throws JMSException;
  -    
  -    /** Receives a message on the given destination and message selector, blocking until one is returned */    
  -    public Message receive(Destination destination, String selector) throws JMSException;
  -    
  -    /** Receives a message on the given destination, blocking for the specified timeout */    
  -    public Message receive(Destination destination, long timeoutMillis) throws JMSException;
  -    
  -    /** Receives a message on the given destination and selector, blocking for the specified timeout */    
  -    public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException;
  -    
  -    /** Receives a message on the given destination without blocking or returns null */    
  -    public Message receiveNoWait(Destination destination) throws JMSException;
  -
  -    /** Receives a message on the given destination and selector without blocking or returns null */    
  -    public Message receiveNoWait(Destination destination, String selector) throws JMSException;
  -
  -
  -    /** Creates a MessageConsumer for the given JMS Desintation
  -     */
  -    public MessageConsumer createConsumer(Destination destination) throws JMSException;
  -    
  -    /** Creates a MessageConsumer for the given JMS Desintation and JMS selector
  -     */
  -    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException;
  -
  -    /** Allows this current thread to be given to the JMS connection to process messages. This
  -     * method can be useful for creating background processing threads
  -     */
  -    public void run();
  -
  -    /** Returns the underlying JMS connection that this Messenger is using */
  -    public Connection getConnection() throws JMSException;
  -    
  -    /** Creates a ConnectionConsumer which is useful if used inside an application server
  -     * to associate multiple threads with consuming from a JMS destination */
  -    public ConnectionConsumer createConnectionConsumer(Destination destination, ServerSessionPool sessionPool, int maxMessages) throws JMSException;
  -    
  -    /** Creates a ConnectionConsumer which is useful if used inside an application server
  -     * to associate multiple threads with consuming from a JMS destination */
  -    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool sessionPool, int maxMessages) throws JMSException;
  -    
  -    /** Creates a new ServerSessionPool implementation with a given maximum number of threads
  -     * for use by a ConnectionConsumer
  -     */
  -    public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads) throws JMSException;
  -    
  -    
  -    // Listener API
  -    //-------------------------------------------------------------------------    
  -    
  -    /** Adds a message listener on the given destination */
  -    public void addListener(Destination destination, MessageListener listener) throws JMSException;
  -    public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException;
  -
  -    public void removeListener(Destination destination, MessageListener listener) throws JMSException;
  -    public void removeListener(Destination destination, String selector, MessageListener listener) throws JMSException;
  -    
  -    
  -    // Message factory methods
  -    //-------------------------------------------------------------------------    
  -    
  -    public BytesMessage createBytesMessage() throws JMSException;
  -    
  -    public MapMessage createMapMessage() throws JMSException;
  -    
  -    public Message createMessage() throws JMSException;    
  -    
  -    public ObjectMessage createObjectMessage() throws JMSException;
  -    public ObjectMessage createObjectMessage(Serializable object) throws JMSException;
  -    
  -    public StreamMessage createStreamMessage() throws JMSException;
  -    
  -    public TextMessage createTextMessage() throws JMSException;
  -    public TextMessage createTextMessage(String text) throws JMSException;
  -    
  -    
  -    // Transaction related methods
  -    //-------------------------------------------------------------------------    
  -    
  -    /** Commits all messages done in this thread and releases any locks */
  -    public void commit() throws JMSException;
  -    
  -    /** Rolls back any messages done in this thread and releases any locks */
  -    public void rollback() throws JMSException;
  -    
  -    /** Closes the underlying JMS connection */
  -    public void close() throws JMSException;
  -    
  -    
  -    // Extra configuration methods available in JMS
  -    //-------------------------------------------------------------------------        
  -    public int getDeliveryMode(Destination destination) throws JMSException;
  -    public boolean getDisableMessageID(Destination destination) throws JMSException;
  -    public boolean getDisableMessageTimestamp(Destination destination) throws JMSException;
  -    public int getPriority(Destination destination) throws JMSException;
  -    public long getTimeToLive(Destination destination) throws JMSException;
  -    public void setDeliveryMode(Destination destination,int deliveryMode) throws JMSException;
  -    public void setDisableMessageID(Destination destination, boolean value) throws JMSException;
  -    public void setDisableMessageTimestamp(Destination destination,boolean value) throws JMSException;
  -    public void setPriority(Destination destination,int defaultPriority) throws JMSException;
  -    public void setTimeToLive(Destination destination,long timeToLive) throws JMSException;
  -}
  -
  +/*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE file.
 * 
 * $Id: Messenger.java,v 1.11 2002/05/15 14:36:34 jstrachan Exp $
 */
package org.apache.commons.messenger;

import java.io.Serializable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.ServerSessionPool;
import javax.jms.TextMessage;

/** <p><code>Messenger</code> a facade over the JMS API making it easy to use JMS
  * and hiding much of the complexity of threading and configuration.
  * A Messenger will internally associate a JMS Session with the calling thread
  * so that all methods called in the same thread (such as inside a Servlet or 
  * taglib) will use the same JMS Session.</p>
  *
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  * @version $Revision: 1.11 $
  */
public interface Messenger {

    /** Temporary hack - this method has been added so that Messenger works better with the digester */
    public String getName();

    /** Returns the destination for the given subject name */
    public Destination getDestination(String subject) throws JMSException;

    /** Returns a new temporary destination */
    public Destination createTemporaryDestination() throws JMSException;

    /** Sends a message on the given destination */
    public void send(Destination destination, Message message) throws JMSException;

    /** Sends a message on the given destination and blocks until a response is returned */
    public Message call(Destination destination, Message message)
        throws JMSException;

    /** Sends a message on the given destination and blocks until a response is returned or the given timeout period expires */
    public Message call(
        Destination destination,
        Message message,
        long timeoutMillis)
        throws JMSException;

    /** Receives a message on the given destination, blocking until one is returned */
    public Message receive(Destination destination) throws JMSException;

    /** Receives a message on the given destination and message selector, blocking until one is returned */
    public Message receive(Destination destination, String selector)
        throws JMSException;

    /** Receives a message on the given destination, blocking for the specified timeout */
    public Message receive(Destination destination, long timeoutMillis)
        throws JMSException;

    /** Receives a message on the given destination and selector, blocking for the specified timeout */
    public Message receive(
        Destination destination,
        String selector,
        long timeoutMillis)
        throws JMSException;

    /** Receives a message on the given destination without blocking or returns null */
    public Message receiveNoWait(Destination destination) throws JMSException;

    /** Receives a message on the given destination and selector without blocking or returns null */
    public Message receiveNoWait(Destination destination, String selector)
        throws JMSException;

    /** Creates a MessageConsumer for the given JMS Desintation
     */
    public MessageConsumer createConsumer(Destination destination)
        throws JMSException;

    /** Creates a MessageConsumer for the given JMS Desintation and JMS selector
     */
    public MessageConsumer createConsumer(Destination destination, String selector)
        throws JMSException;

    /** Allows this current thread to be given to the JMS connection to process messages. This
     * method can be useful for creating background processing threads
     */
    public void run();

    /** Returns the underlying JMS connection that this Messenger is using */
    public Connection getConnection() throws JMSException;

    /** Creates a ConnectionConsumer which is useful if used inside an application server
     * to associate multiple threads with consuming from a JMS destination */
    public ConnectionConsumer createConnectionConsumer(
        Destination destination,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException;

    /** Creates a ConnectionConsumer which is useful if used inside an application server
     * to associate multiple threads with consuming from a JMS destination */
    public ConnectionConsumer createConnectionConsumer(
        Destination destination,
        String selector,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException;

    /** Creates a new ServerSessionPool implementation with a given maximum number of threads
     * for use by a ConnectionConsumer
     */
    public ServerSessionPool createServerSessionPool(
        MessageListener messageListener,
        int maxThreads)
        throws JMSException;

    // Listener API
    //-------------------------------------------------------------------------    

    /** Adds a message listener on the given destination */
    public void addListener(Destination destination, MessageListener listener)
        throws JMSException;

    public void addListener(
        Destination destination,
        String selector,
        MessageListener listener)
        throws JMSException;

    public void removeListener(Destination destination, MessageListener listener)
        throws JMSException;

    public void removeListener(
        Destination destination,
        String selector,
        MessageListener listener)
        throws JMSException;


    // Message factory methods
    //-------------------------------------------------------------------------    

    public BytesMessage createBytesMessage() throws JMSException;

    public MapMessage createMapMessage() throws JMSException;

    public Message createMessage() throws JMSException;

    public ObjectMessage createObjectMessage() throws JMSException;

    public ObjectMessage createObjectMessage(Serializable object)
        throws JMSException;

    public StreamMessage createStreamMessage() throws JMSException;

    public TextMessage createTextMessage() throws JMSException;

    public TextMessage createTextMessage(String text) throws JMSException;


    // Transaction related methods
    //-------------------------------------------------------------------------    
    /** Commits all messages done in this thread and releases any locks */
    public void commit() throws JMSException;

    /** Rolls back any messages done in this thread and releases any locks */
    public void rollback() throws JMSException;

    /** Closes the underlying JMS connection */
    public void close() throws JMSException;


    // Extra configuration methods available in JMS
    //-------------------------------------------------------------------------        
    
    /** 
     * Returns the SessionFactory used to create new JMS sessions 
     * and Connections. This allows things like transaction mode to be
     * configured before sessions are created. 
     */
    public SessionFactory getSessionFactory() throws JMSException;
    
    
    public int getDeliveryMode(Destination destination) throws JMSException;

    public boolean getDisableMessageID(Destination destination)
        throws JMSException;

    public boolean getDisableMessageTimestamp(Destination destination)
        throws JMSException;

    public int getPriority(Destination destination) throws JMSException;

    public long getTimeToLive(Destination destination) throws JMSException;

    public void setDeliveryMode(Destination destination, int deliveryMode)
        throws JMSException;

    public void setDisableMessageID(Destination destination, boolean value)
        throws JMSException;

    public void setDisableMessageTimestamp(Destination destination, boolean value)
        throws JMSException;

    public void setPriority(Destination destination, int defaultPriority)
        throws JMSException;

    public void setTimeToLive(Destination destination, long timeToLive)
        throws JMSException;
}
  \ No newline at end of file
  
  
  
  1.9       +1 -224    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
  
  Index: DefaultMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- DefaultMessenger.java	26 Feb 2002 04:12:53 -0000	1.8
  +++ DefaultMessenger.java	15 May 2002 14:36:34 -0000	1.9
  @@ -1,224 +1 @@
  -/*
  - * Copyright (C) The Apache Software Foundation. All rights reserved.
  - *
  - * This software is published under the terms of the Apache Software License
  - * version 1.1, a copy of which has been included with this distribution in
  - * the LICENSE file.
  - *
  - * $Id: DefaultMessenger.java,v 1.8 2002/02/26 04:12:53 jstrachan Exp $
  - */
  -package org.apache.commons.messenger;
  -
  -import java.lang.ThreadLocal; // for javadoc
  -import java.io.Serializable;
  -
  -import javax.jms.Connection;
  -import javax.jms.ConnectionFactory;
  -import javax.jms.JMSException;
  -import javax.jms.Message;
  -import javax.jms.MessageConsumer;
  -import javax.jms.MessageListener;
  -import javax.jms.MessageProducer;
  -import javax.jms.ServerSessionPool;
  -import javax.jms.Session;
  -import javax.jms.Queue;
  -import javax.jms.Topic;
  -import javax.jms.QueueSession;
  -import javax.jms.TopicSession;
  -import javax.naming.NamingException;
  -import javax.naming.Context;
  -
  -import org.apache.commons.logging.Log;
  -import org.apache.commons.logging.LogFactory;
  -
  -/** <p><code>DefaultMessenger</code> is the default implementation of
  -  * Messenger which uses a {@link ThreadLocal} variable
  -  * to keep the JMS Session that should be used for a given calling thread.</p>
  -  *
  -  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.8 $
  -  */
  -public class DefaultMessenger extends MessengerSupport {
  -
  -    private static final boolean SHARE_CONNECTION = true;
  -
  -    /** Logger */
  -    private static final Log log = LogFactory.getLog( DefaultMessenger.class );
  -
  -
  -    /** the session object for each thread */
  -    private ThreadLocal sessionPool = new ThreadLocal();
  -    /** the listener session object for each thread */
  -    private ThreadLocal listenerSessionPool = new ThreadLocal();
  -    /** The factory used to create each thread's JMS Session */
  -    private SessionFactory sessionFactory;
  -    /** A pool of Connections, one per thread */
  -    private ThreadLocal connectionPool = new ThreadLocal();
  -
  -    public DefaultMessenger() {
  -    }
  -
  -
  -    /** Returns the SessionFactory used to create new JMS sessions */
  -    public SessionFactory getSessionFactory() throws JMSException {
  -        if ( sessionFactory == null ) {
  -            sessionFactory = createSessionFactory();
  -        }
  -
  -        return sessionFactory;
  -    }
  -
  -    /** Sets the SessionFactory used to create new JMS sessions */
  -    public void setSessionFactory(SessionFactory sessionFactory) {
  -        this.sessionFactory = sessionFactory;
  -    }
  -
  -
  -    public Connection getConnection() throws JMSException {
  -        if ( SHARE_CONNECTION ) {
  -            return getSessionFactory().getConnection();
  -        }
  -        else {
  -            Connection answer = (Connection) connectionPool.get();
  -            if ( answer == null ) {
  -
  -                answer = getSessionFactory().createConnection();
  -
  -                if ( log.isInfoEnabled() ) {
  -                    log.info( "Created connection: " + answer + " for thread: " + Thread.currentThread() );
  -                }
  -                
  -                connectionPool.set( answer );
  -            }
  -            return answer;
  -        }
  -    }
  -
  -    public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads) throws JMSException {
  -        return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
  -    }
  -
  -    public void close() throws JMSException {
  -        getSessionFactory().close();
  -
  -        // clear all the pools...
  -        sessionPool = new ThreadLocal();
  -        listenerSessionPool = new ThreadLocal();
  -    }
  -
  -    // Implementation methods
  -    //-------------------------------------------------------------------------
  -    protected boolean isTopic(Connection connection) throws JMSException {
  -        return sessionFactory.isTopic();
  -    }
  -    
  -    protected boolean isTopic(ConnectionFactory factory) throws JMSException {
  -        return sessionFactory.isTopic();
  -    }
  -    
  -    protected boolean isTopic(Session session) throws JMSException {
  -        return sessionFactory.isTopic();
  -    }
  -    
  -    protected boolean isTopic(MessageProducer producer) throws JMSException {
  -        return sessionFactory.isTopic();
  -    }
  -    
  -    protected Session borrowSession() throws JMSException {
  -        Session answer = (Session) sessionPool.get();
  -        if ( answer == null ) {
  -            answer = createSession();
  -            sessionPool.set( answer );
  -        }
  -        return answer;
  -    }
  -
  -    protected void returnSession(Session session) {
  -    }
  -
  -    protected void deleteSession(Session session) throws JMSException {
  -        sessionPool.set( null);
  -    }
  -
  -    protected Session borrowListenerSession() throws JMSException {
  -        Session answer = (Session) listenerSessionPool.get();
  -        if ( answer == null ) {
  -            answer = createSession();
  -            listenerSessionPool.set( answer );
  -        }
  -        return answer;
  -    }
  -
  -    protected void returnListenerSession(Session session) throws JMSException {
  -    }
  -
  -    /** Factory method to create a new JMS Session */
  -    protected Session createSession() throws JMSException {
  -        return getSessionFactory().createSession( getConnection() );
  -    }
  -
  -    /** Factory method to create a SessionFactory.
  -      * Derived classes could override this method to create the SessionFactory
  -      * from a well known place
  -      */
  -    protected SessionFactory createSessionFactory() throws JMSException {
  -        throw new JMSException( "No SessionFactory configured for this Messenger. Cannot create a new JMS Session" );
  -    }
  -
  -    public Queue getQueue(QueueSession session, String subject) throws JMSException {
  -        // XXXX: might want to cache
  -        Context ctx = null;
  -        JNDISessionFactory factory = null;;
  -        Queue queue = null;
  -
  -
  -        if ( isJndiDestinations() ) {
  -
  -            try {
  -                factory = (JNDISessionFactory) getSessionFactory();
  -                ctx = factory.getContext();
  -                queue = (Queue) ctx.lookup(subject);
  -            }
  -
  -            catch (Exception e) {
  -                log.error( "Unable to lookup subject: " + subject + ". Exception: " + e, e );
  -            }
  -        }
  -
  -        else {
  -            // XXXX: might want to cache
  -            queue = session.createQueue( subject );
  -        }
  -
  -        return queue;
  -
  -    }
  -
  -    public Topic getTopic(TopicSession session, String subject) throws JMSException {
  -        // XXXX: might want to cache
  -
  -        Context ctx = null;
  -        JNDISessionFactory factory = null;;
  -        Topic topic = null;
  -
  -        if ( isJndiDestinations() ) {
  -
  -            try {
  -                factory = (JNDISessionFactory) getSessionFactory();
  -                ctx = factory.getContext();
  -                topic = (Topic) ctx.lookup(subject);
  -
  -            }
  -
  -            catch (Exception e) {
  -                log.error( "Unable to lookup subject: " + subject + ". Exception: " + e, e );
  -            }
  -        }
  -
  -        else {
  -            topic = session.createTopic( subject );
  -        }
  -        return topic;
  -    }
  -}
  -
  +/*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE file.
 *
 * $Id: DefaultMessenger.java,v 1.9 2002/05/15 14:36:34 jstrachan Exp $
 */
package org.apache.commons.messenger;

import java.lang.ThreadLocal; // for javadoc
import java.io.Serializable;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.QueueSession;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import javax.naming.Context;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/** <p><code>DefaultMessenger</code> is the default implementation of
  * Messenger which uses a {@link ThreadLocal} variable
  * to keep the JMS Session that should be used for a given calling thread.</p>
  *
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  * @version $Revision: 1.9 $
  */
public class DefaultMessenger extends MessengerSupport {
    
    private static final boolean SHARE_CONNECTION = true;
    
    /** Logger */
    private static final Log log = LogFactory.getLog(DefaultMessenger.class);
    
    /** the session object for each thread */
    private ThreadLocal sessionPool = new ThreadLocal();
    
    /** the listener session object for each thread */
    private ThreadLocal listenerSessionPool = new ThreadLocal();
    
    /** The factory used to create each thread's JMS Session */
    private SessionFactory sessionFactory;
    
    /** A pool of Connections, one per thread */
    private ThreadLocal connectionPool = new ThreadLocal();
    
    
    public DefaultMessenger() {
    }
    
    /** Returns the SessionFactory used to create new JMS sessions */
    public SessionFactory getSessionFactory() throws JMSException {
        if (sessionFactory == null) {
            sessionFactory = createSessionFactory();
        }
        return sessionFactory;
    }
    
    /** Sets the SessionFactory used to create new JMS sessions */
    public void setSessionFactory(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }
    
    public Connection getConnection() throws JMSException {
        if (SHARE_CONNECTION) {
            return getSessionFactory().getConnection();
        }
        else {
            Connection answer = (Connection) connectionPool.get();
            if (answer == null) {
                answer = getSessionFactory().createConnection();
                if (log.isInfoEnabled()) {
                    log.info(
                        "Created connection: " + answer + " for thread: " + Thread.currentThread());
                }
                connectionPool.set(answer);
            }
            return answer;
        }
    }
    
    public ServerSessionPool createServerSessionPool(
        MessageListener messageListener,
        int maxThreads)
        throws JMSException {
        return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
    }
    
    public void close() throws JMSException {
        getSessionFactory().close();
        // clear all the pools...
        sessionPool = new ThreadLocal();
        listenerSessionPool = new ThreadLocal();
    }
    
    // Implementation methods
    //-------------------------------------------------------------------------
    protected boolean isTopic(Connection connection) throws JMSException {
        return sessionFactory.isTopic();
    }
    
    protected boolean isTopic(ConnectionFactory factory) throws JMSException {
        return sessionFactory.isTopic();
    }
    
    protected boolean isTopic(Session session) throws JMSException {
        return sessionFactory.isTopic();
    }
    
    protected boolean isTopic(MessageProducer producer) throws JMSException {
        return sessionFactory.isTopic();
    }
    
    protected Session borrowSession() throws JMSException {
        Session answer = (Session) sessionPool.get();
        if (answer == null) {
            answer = createSession();
            sessionPool.set(answer);
        }
        return answer;
    }
    
    protected void returnSession(Session session) {
    }
    
    protected void deleteSession(Session session) throws JMSException {
        sessionPool.set(null);
    }
    
    protected Session borrowListenerSession() throws JMSException {
        Session answer = (Session) listenerSessionPool.get();
        if (answer == null) {
            answer = createSession();
            listenerSessionPool.set(answer);
        }
        return answer;
    }
    
    protected void returnListenerSession(Session session) throws JMSException {
    }
    
    /** Factory method to create a new JMS Session */
    protected Session createSession() throws JMSException {
        return getSessionFactory().createSession(getConnection());
    }
    
    /** Factory method to create a SessionFactory.
      * Derived classes could override this method to create the SessionFactory
      * from a well known place
      */
    protected SessionFactory createSessionFactory() throws JMSException {
        throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
    }
    
    public Queue getQueue(QueueSession session, String subject)
        throws JMSException {
        // XXXX: might want to cache
        Context ctx = null;
        JNDISessionFactory factory = null;
        ;
        Queue queue = null;
        if (isJndiDestinations()) {
            try {
                factory = (JNDISessionFactory) getSessionFactory();
                ctx = factory.getContext();
                queue = (Queue) ctx.lookup(subject);
            }
            catch (Exception e) {
                log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
            }
        }
        else {
            // XXXX: might want to cache
            queue = session.createQueue(subject);
        }
        return queue;
    }
    
    public Topic getTopic(TopicSession session, String subject)
        throws JMSException {
        // XXXX: might want to cache
        Context ctx = null;
        JNDISessionFactory factory = null;
        ;
        Topic topic = null;
        if (isJndiDestinations()) {
            try {
                factory = (JNDISessionFactory) getSessionFactory();
                ctx = factory.getContext();
                topic = (Topic) ctx.lookup(subject);
            }
            catch (Exception e) {
                log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
            }
        }
        else {
            topic = session.createTopic(subject);
        }
        return topic;
    }
}
  \ No newline at end of file
  
  
  
  1.2       +26 -42    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/impl/Subscription.java
  
  Index: Subscription.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/impl/Subscription.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Subscription.java	26 Oct 2001 11:52:16 -0000	1.1
  +++ Subscription.java	15 May 2002 14:36:34 -0000	1.2
  @@ -5,7 +5,7 @@
    * version 1.1, a copy of which has been included with this distribution in
    * the LICENSE file.
    * 
  - * $Id: Subscription.java,v 1.1 2001/10/26 11:52:16 jstrachan Exp $
  + * $Id: Subscription.java,v 1.2 2002/05/15 14:36:34 jstrachan Exp $
    */
   package org.apache.commons.messagelet.impl;
   
  @@ -20,15 +20,15 @@
     * create a list of JMS subscriptions.</p>
     *
     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.1 $
  +  * @version $Revision: 1.2 $
     */
   public class Subscription {
   
  -    /** Holds value of property messenger. */
  -    private String messenger;
  +    /** Holds value of property connection. */
  +    private String connection;
       
  -    /** Holds value of property destination. */
  -    private String destination;
  +    /** Holds value of property subject. */
  +    private String subject;
       
       /** Holds value of property selector. */
       private String selector;
  @@ -41,49 +41,33 @@
       
       public Subscription() {
       }        
  -    
  -/*    
  -    public void subscribe(JmsConnector connector) throws JMSException {
  -        String name = getMessenger();
  -        Messenger messenger = MessengerManager.get( name );
  -        if ( messenger != null ) {
  -            throw new JMSException( "No such Messenger called: " + name + " for subscription to: " + destination );
  -        }
           
  -        // now we usually need to create a MessageListener which will dispatch
  -        // to Catalina...
  -        JmsProcessor processor = new JmsProcessor( connector, messenger, getServlet() );
  -        Destination destination = messenger.getDestination( getDestination() );
  -        messenger.addListener( destination, processor );
  -    }
  -*/
  -    
  -    /** Getter for property messenger.
  -     * @return Value of property messenger.
  +    /** Getter for property connection.
  +     * @return Value of property connection.
        */
  -    public String getMessenger() {
  -        return messenger;
  +    public String getConnection() {
  +        return connection;
       }
       
  -    /** Setter for property messenger.
  -     * @param messenger New value of property messenger.
  +    /** Setter for property connection.
  +     * @param connection New value of property connection.
        */
  -    public void setMessenger(String messenger) {
  -        this.messenger = messenger;
  +    public void setConnection(String connection) {
  +        this.connection = connection;
       }
       
  -    /** Getter for property destination.
  -     * @return Value of property destination.
  +    /** Getter for property subject.
  +     * @return Value of property subject.
        */
  -    public String getDestination() {
  -        return destination;
  +    public String getSubject() {
  +        return subject;
       }
       
  -    /** Setter for property destination.
  -     * @param destination New value of property destination.
  +    /** Setter for property subject.
  +     * @param subject New value of property subject.
        */
  -    public void setDestination(String destination) {
  -        this.destination = destination;
  +    public void setSubject(String subject) {
  +        this.subject = subject;
       }
       
       /** Getter for property selector.
  @@ -133,10 +117,10 @@
       /** Outputs a debugging string */
       public String toString() {
           StringBuffer buffer = new StringBuffer( super.toString() );
  -        buffer.append( "[ messenger: " );
  -        buffer.append( messenger );
  -        buffer.append( " destination: " );
  -        buffer.append( destination );
  +        buffer.append( "[ connection: " );
  +        buffer.append( connection );
  +        buffer.append( " subject: " );
  +        buffer.append( subject );
           if ( selector != null ) {
               buffer.append( " selector: " );
               buffer.append( selector );
  
  
  
  1.4       +10 -2     jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/impl/SubscriptionDigester.java
  
  Index: SubscriptionDigester.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/impl/SubscriptionDigester.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SubscriptionDigester.java	13 Nov 2001 09:42:27 -0000	1.3
  +++ SubscriptionDigester.java	15 May 2002 14:36:34 -0000	1.4
  @@ -5,7 +5,7 @@
    * version 1.1, a copy of which has been included with this distribution in
    * the LICENSE file.
    * 
  - * $Id: SubscriptionDigester.java,v 1.3 2001/11/13 09:42:27 jstrachan Exp $
  + * $Id: SubscriptionDigester.java,v 1.4 2002/05/15 14:36:34 jstrachan Exp $
    */
   package org.apache.commons.messagelet.impl;
   
  @@ -15,7 +15,7 @@
     * that JMS Messengers can be created from an XML config file.</p>
     *
     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.3 $
  +  * @version $Revision: 1.4 $
     */
   public class SubscriptionDigester extends Digester {
   
  @@ -23,6 +23,7 @@
       private String subscriptionsClass = "org.apache.commons.messagelet.impl.SubscriptionList";
       private String subscriptionClass = "org.apache.commons.messagelet.impl.Subscription";
       private String listenerClass = "org.apache.commons.messagelet.impl.MessageServletDispatcher";
  +    private String bridgeClass = "org.apache.commons.messagelet.BridgeMDO";
   
       
       public SubscriptionDigester() {
  @@ -58,6 +59,13 @@
           
           path = "subscriptions/subscription/listener";
           addObjectCreate( path, listenerClass, "className" );
  +        addSetProperties( path );
  +        addSetNext( path, "setMessageListener",
  +           "javax.jms.MessageListener"
  +        );
  +        
  +        path = "subscriptions/subscription/bridge";
  +        addObjectCreate( path, bridgeClass, "className" );
           addSetProperties( path );
           addSetNext( path, "setMessageListener",
              "javax.jms.MessageListener"
  
  
  
  1.2       +4 -4      jakarta-commons-sandbox/messenger/src/conf/MANIFEST.MF
  
  Index: MANIFEST.MF
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/conf/MANIFEST.MF,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MANIFEST.MF	24 Aug 2001 14:20:28 -0000	1.1
  +++ MANIFEST.MF	15 May 2002 14:36:34 -0000	1.2
  @@ -1,6 +1,6 @@
  -Extension-Name: org.apache.commons.messenger
  -Specification-Vendor: Apache Software Foundation
  -Specification-Version: 1.0
  -Implementation-Vendor: Apache Software Foundation
  +Extension-Name: org.apache.commons.messenger
  +Specification-Vendor: Apache Software Foundation
  +Specification-Version: 1.0
  +Implementation-Vendor: Apache Software Foundation
   Implementation-Version: 1.0-dev
   
  
  
  
  1.2       +4 -20     jakarta-commons-sandbox/messenger/src/conf/MessengerSpiritWave.xml
  
  Index: MessengerSpiritWave.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/conf/MessengerSpiritWave.xml,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- MessengerSpiritWave.xml	15 Feb 2002 11:50:18 -0000	1.1
  +++ MessengerSpiritWave.xml	15 May 2002 14:36:34 -0000	1.2
  @@ -1,22 +1,6 @@
  -<?xml version="1.0" encoding="UTF-8"?>
  -<manager>
  -
  -  <messenger name="topic">
  -    <factory className="com.spirit.messenger.WaveTopicSessionFactory">
  -      <property>
  -        <name>driverName</name>
  -        <value>SpiritJMQ</value>
  -      </property>          
  -    </factory>
  +<?xml version="1.0" encoding="UTF-8"?>
<manager>

  <messenger name="topic">
  +    <factory className="com.spirit.messenger.WaveTopicSessionFactory">
    </factory>
     </messenger>
  -
     <messenger name="queue">
  -    <factory className="com.spirit.messenger.WaveQueueSessionFactory">
  -      <property>
  -        <name>driverName</name>
  -        <value>SpiritJMQ</value>
  -      </property>          
  -    </factory>
  -  </messenger>
  -
  -</manager>
  +    <factory className="com.spirit.messenger.WaveQueueSessionFactory">
    </factory>
  +  </messenger>
</manager>
  
  
  
  1.4       +8 -16     jakarta-commons-sandbox/messenger/src/conf/subscribe.xml
  
  Index: subscribe.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/conf/subscribe.xml,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- subscribe.xml	30 Oct 2001 11:14:31 -0000	1.3
  +++ subscribe.xml	15 May 2002 14:36:34 -0000	1.4
  @@ -1,25 +1,17 @@
   <?xml version="1.0" encoding="UTF-8"?>
   <subscriptions>
  -
  -  <!-- An example of how subscriptions can look... -->
  -
  -  <subscription messenger="queue" destination="my.queue" selector="b='12'">
  -
  -    <!-- the output will reply to the original message -->
  -    <servlet url="/jms/bar.jsp"/>
  -
  -
  +  <!-- An example of how subscriptions can look... -->
  <subscription connection="queue" subject="my.queue" selector="b='12'">
  +
    <!-- the output will reply to the original message -->
    <servlet url="/jms/bar.jsp"/>
     </subscription>
   
  -  <subscription messenger="queue" destination="my.queue" selector="b='12'">
  +  <subscription connection="queue" subject="my.queue" selector="b='12'">
       <!-- send a reply on some other Messenger and destination -->
  -    <servlet url="/jms/foo.jsp" replyDstination="my.topic" replyMessenger="topic"/>
  -
  +    <servlet url="/jms/foo.jsp" replyDestination="my.topic" replyMessenger="topic"/>
  +
     </subscription>
  -
  -
  -  <subscription messenger="topic" destination="my.queue" selector="b='12'">
  +
  <subscription connection="inputQueue" subject="my.input" selector="b='12'">

    <!-- bridge to another JMS provider -->

    <bridge outputConnection="my.output" outputSubject="outputQueue"/>
    
  </subscription>

  +
  +  <subscription connection="topic" subject="my.queue" selector="b='12'">
       <listener className="com.acme.MyMessageListener"/>
     </subscription>
  -  
   </subscriptions>
  
  
  
  1.28      +10 -26    jakarta-commons-sandbox/messenger/build.xml
  
  Index: build.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/build.xml,v
  retrieving revision 1.27
  retrieving revision 1.28
  diff -u -r1.27 -r1.28
  --- build.xml	7 May 2002 11:52:32 -0000	1.27
  +++ build.xml	15 May 2002 14:36:34 -0000	1.28
  @@ -1,12 +1,9 @@
  -<project name="messenger" default="compile" basedir=".">
  -
  -
  +<project name="messenger" default="compile" basedir=".">

   <!--
           "messenger" component of the Jakarta Commons Subproject
  -        $Id: build.xml,v 1.27 2002/05/07 11:52:32 jstrachan Exp $
  +        $Id: build.xml,v 1.28 2002/05/15 14:36:34 jstrachan Exp $
   -->
   
  -
   <!-- ========== Initialize Properties ===================================== -->
   
   
  @@ -33,7 +30,7 @@
   
     <property name="j2sdkee"                 value="/j2sdkee1.3"/>
   
  -  <property name="messenger.xml"           value="src/conf/Messenger.xml"/>
  +  <property name="messenger.xml"           value="src/conf/Messenger.xml"/>
  
  <property name="testMessenger.xml"       value="{conf.home}/test-connections.xml"/>  
     <property name="jms.classes.dir"         value="jms/classes"/>
     <property name="jms.lib.dir"             value="jms/lib"/>
   
  @@ -139,10 +136,10 @@
       <pathelement location="${j2ee.jar}"/>
       <pathelement location="${j2sdkee}/lib/local"/>
   
  -    <!-- not sure why this is needed - something to do with JNDI I think -->
  -    <!-- if this is missing then the JNDI initial context fails for some reason -->
  -    <pathelement path="${java.class.path}"/>
  -  </path>
  +    <!-- not sure why this is needed - something to do with JNDI I think -->
  +    <!-- if this is missing then the JNDI initial context fails for some reason -->
  +    <pathelement path="${java.class.path}"/>

  +    <!-- add the JARs required for a pluggable JMS provider -->
    <pathelement path="${jms.classes.dir}"/>
    <fileset dir="${jms.lib.dir}">
      <include name="**/*.jar"/>
    </fileset>
  </path>
   
     <!-- Running sample programs against the configured JMS provider -->
     <path id="jms.classpath">
  @@ -331,7 +328,7 @@
         <sysproperty key="java.security.policy" value="${j2sdkee}/lib/security/client.policy"/>
         <sysproperty key="java.security.auth.login.config" value="${j2sdkee}/lib/security/login.config"/>
         <sysproperty key="com.sun.enterprise.home" value="${j2sdkee}"/>
  -      <sysproperty key="org.apache.commons.messenger" value="${conf.home}/test-connections.xml"/>
  +      <sysproperty key="org.apache.commons.messenger" value="${testMessenger.xml}"/>
       </java>
     </target>
   
  @@ -440,20 +437,7 @@
     </target>
     
   
  -  <target name="demo.jms.call.messagelet" depends="compile.tests"
  -    description="Sends a message to a Messagelet engine for processing by a Messagelet and displays the result">
  -    <echo message="Sending a message to the Messagelet engine using the configured JMS provider..."/>
  -    <java classname="org.apache.commons.messenger.tool.Caller" fork="yes">
  -      <classpath refid="jms.classpath"/>
  -
  -      <arg value="queue"/>
  -      <arg value="echo.messagelet"/>
  -      <arg value="src/conf/sampleMessage.txt"/>
  -
  -      <sysproperty key="org.apache.commons.messenger" value="${messenger.xml}"/>
  -    </java>
  -  </target>
  -  
  +  <target name="demo.jms.call.messagelet" depends="compile.tests"

    description="Sends a message to a Messagelet engine for processing by a Messagelet and displays the result">

    <echo message="Sending a message to the Messagelet engine using the configured JMS provider..."/>

    <java classname="org.apache.commons.messenger.tool.Caller" fork="yes">

      <classpath refid="jms.classpath"/>



      <arg value="queue"/>

      <arg value="echo.messagelet"/>

      <arg value="src/conf/sampleMessage.txt"/>



      <sysproperty key="org.apache.commons.messenger" value="${messenger.xml}"/>

    </java>

  </target>

  
  <target name="demo.jms.call.bridge" depends="compile.tests"

    description="Sends a message to a Messagelet engine for processing by a Bridge MDO">

    <echo message="Sending a message to the Messagelet engine using the configured JMS provider..."/>

    <java classname="org.apache.commons.messenger.tool.Producer" fork="yes">

      <classpath refid="jms.classpath"/>



      <arg value="queue"/>

      <arg value="foo.input"/>

      <arg value="src/conf/sampleMessage.txt"/>



      <sysproperty key="org.apache.commons.messenger" value="${messenger.xml}"/>

    </java>

  </target>

  
   
     <target name="demo.jms.call.jsp" depends="compile.tests"
       description="Sends a message to a Messagelet engine for processing by a JSP page and displays the result">
  @@ -499,7 +483,7 @@
         <sysproperty key="org.apache.commons.messenger" value="${messenger.xml}"/>
       </java>
     </target>
  -  
  +  
  <target name="demo.receive.bridge" depends="compile.tests"

    description="Receives a number of messages from the bridge on a JMS destination">

    

    <property name="test.subject" value="foo.output"/>



    <echo message="Waiting to recieve messages on subject: ${test.subject}"/>

    <java classname="org.apache.commons.messenger.tool.Consumer" fork="yes">

      <classpath refid="jms.classpath"/>



      <arg value="queue"/>

      <arg value="${test.subject}"/>



      <sysproperty key="org.apache.commons.messenger" value="${messenger.xml}"/>

    </java>

  </target>

  
   
   
   </project>
  
  
  
  1.9       +1 -6      jakarta-commons-sandbox/messenger/TODO.txt
  
  Index: TODO.txt
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/TODO.txt,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- TODO.txt	2 May 2002 18:42:08 -0000	1.8
  +++ TODO.txt	15 May 2002 14:36:34 -0000	1.9
  @@ -1,6 +1,6 @@
   Project To Do List
   ==================
  -
  +
* create a receive Ant task

* consider a BeanUtils Converter to allow Strings to be converted into Destinations or Messengers?

   * add an option to the producer task to allow the messenger.xml to be specified
   
   * Allow the replyTo destination and Messenger to be overridden 
  @@ -12,11 +12,6 @@
     <subscribe connection="foo" destination="incoming">
       <send connection="bar" destination="outgoing"/>
     </subscribe>
  -
  -* Maybe Ant tasks to generate messages from <fileSets> or something?
  -	- would require some XML based notation for specifying messages?
  -	- maybe using the Jelly scripting engine might be better, where a Jelly
  -		script creates the message, maybe in a loop from some XML data file?
   
   * file based JMS test harness for testing JMS networks? Firing in batches of
     messages to certain JMS connections, via a test agent, then outputtting messages
  
  
  
  1.1                  jakarta-commons-sandbox/messenger/.project
  
  Index: .project
  ===================================================================
  <?xml version="1.0" encoding="UTF-8"?>
  <projectDescription>
  	<name>messenger</name>
  	<comment></comment>
  	<projects>
  	</projects>
  	<buildSpec>
  	</buildSpec>
  	<natures>
  		<nature>org.eclipse.team.cvs.core.cvsnature</nature>
  	</natures>
  </projectDescription>
  
  
  
  1.12      +11 -6     jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ManagerServlet.java
  
  Index: ManagerServlet.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ManagerServlet.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- ManagerServlet.java	19 Apr 2002 09:52:38 -0000	1.11
  +++ ManagerServlet.java	15 May 2002 14:36:34 -0000	1.12
  @@ -5,7 +5,7 @@
    * version 1.1, a copy of which has been included with this distribution in
    * the LICENSE file.
    * 
  - * $Id: ManagerServlet.java,v 1.11 2002/04/19 09:52:38 jstrachan Exp $
  + * $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $
    */
   package org.apache.commons.messagelet;
   
  @@ -36,7 +36,7 @@
     * and use of MessageListener beans for a given ServletContext.</p>
     *
     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.11 $
  +  * @version $Revision: 1.12 $
     */
   public class ManagerServlet extends GenericServlet {
   
  @@ -143,7 +143,7 @@
       }
       
       protected void subscribe( Subscription subscription ) throws ServletException {
  -        String name = subscription.getMessenger();
  +        String name = subscription.getConnection();
           Messenger messenger = getMessenger( name );
           if ( messenger == null ) {
               throw new ServletException( "No such Messenger called: " + name + " for subscription: " + subscription );
  @@ -168,10 +168,15 @@
           // if its an MDO the initialise it!
           if ( listener instanceof MessageDrivenObject ) {
               MessageDrivenObject mdo = (MessageDrivenObject) listener;
  +            if ( mdo instanceof MessengerMDO ) {
  +                MessengerMDO messengerMDO = (MessengerMDO) mdo;
  +                messengerMDO.setMessenger( messenger );
  +                messengerMDO.setMessengerManager( getMessengerManager() );
  +            }
               mdo.init( getServletContext() );
           }
           
  -        String subject = subscription.getDestination();
  +        String subject = subscription.getSubject();
           if ( subject == null || subject.length() == 0 ) {
               throw new ServletException( "No destination defined for subscription: " + subscription );
           }
  @@ -221,12 +226,12 @@
       
       protected void destroyMBOs( Subscription subscription ) throws ServletException {
           // lets unsubscribe first
  -        String name = subscription.getMessenger();
  +        String name = subscription.getConnection();
           Messenger messenger = getMessenger( name );
           MessageListener listener = subscription.getMessageListener();
           if ( messenger != null && listener != null ) {
               Destination destination = null;        
  -            String subject = subscription.getDestination();
  +            String subject = subscription.getSubject();
               if ( subject == null || subject.length() == 0 ) {
                   log( "No destination defined for subscription: " + subscription );
               }
  
  
  
  1.3       +1 -47     jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/MessengerMDO.java
  
  Index: MessengerMDO.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/MessengerMDO.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- MessengerMDO.java	26 Oct 2001 11:52:16 -0000	1.2
  +++ MessengerMDO.java	15 May 2002 14:36:34 -0000	1.3
  @@ -1,47 +1 @@
  -/*
  - * Copyright (C) The Apache Software Foundation. All rights reserved.
  - *
  - * This software is published under the terms of the Apache Software License
  - * version 1.1, a copy of which has been included with this distribution in
  - * the LICENSE file.
  - * 
  - * $Id: MessengerMDO.java,v 1.2 2001/10/26 11:52:16 jstrachan Exp $
  - */
  -package org.apache.commons.messagelet;
  -
  -import org.apache.commons.messenger.Messenger;
  -import org.apache.commons.messenger.MessengerListener;
  -
  -/** <p><code>MessengerMDO</code> is an abstract base
  -  * class for Messenger based MDO implementations. 
  -  * It provides access to the Messenger that was used to receive messages
  -  * so that responses can be sent to the originating Messenger object
  -  * and so reuse the same JMS Session and Connection for responses.</p>
  -  *
  -  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.2 $
  -  */
  -public abstract class MessengerMDO extends MessageDrivenObjectSupport implements MessengerListener {
  -
  -    /**
  -     * The Messenger with which this MDO is associated.
  -     */
  -    private Messenger messenger;
  -
  -
  -    
  -    public MessengerMDO() {
  -    }
  -
  -    /** Provides access to the current Messenger which was used to generate the current
  -     * Message. This allows replies to be sent directly to the originating Messenger
  -     * (and so the same JMS Session and Connection) which received the message
  -     */
  -    public Messenger getMessenger() {
  -        return messenger;
  -    }
  -
  -    public void setMessenger(Messenger messenger) {
  -        this.messenger = messenger;
  -    }
  -}
  +/*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE file.
 * 
 * $Id: MessengerMDO.java,v 1.3 2002/05/15 14:36:34 jstrachan Exp $
 */

package org.apache.commons.messagelet;

import javax.jms.JMSException;

import org.apache.commons.messenger.Messenger;
import org.apache.commons.messenger.MessengerListener;
import org.apache.commons.messenger.MessengerManager;

/** <p><code>MessengerMDO</code> is an abstract base
  * class for Messenger based MDO implementations. 
  * It provides access to the Messenger that was used to receive messages
  * so that responses can be sent to the originating Messenger object
  * and so reuse the same JMS Session and Connection for responses.</p>
  *
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  * @version $Revision: 1.3 $
  */

public abstract class MessengerMDO
    extends MessageDrivenObjectSupport
    implements MessengerListener {

    /** the Messenger with which this MDO is associated */
    private Messenger messenger;

    /** the MessengerManager which manages the Messenger */
    private MessengerManager messengerManager;

    public MessengerMDO() {

    }

    /** 
     * Provides access to the current Messenger which was used to generate the current
     * Message. This allows replies to be sent directly to the originating Messenger
     * (and so the same JMS Session and Connection) which received the message
     */
    public Messenger getMessenger() {
        return messenger;
    }

    public void setMessenger(Messenger messenger) {
        this.messenger = messenger;
    }

    /** 
     * Provides access to the current MessengerManager which
     * manages the Messenger
     */
    public MessengerManager getMessengerManager() throws JMSException {
        if ( messengerManager == null ) {
            messengerManager = MessengerManager.getInstance();
        }
        return messengerManager;
    }

    public void setMessengerManager(MessengerManager messengerManager) {
        this.messengerManager = messengerManager;
    }

}
  \ No newline at end of file
  
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/BridgeMDO.java
  
  Index: BridgeMDO.java
  ===================================================================
  /*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE file.
 * 
 * $Id: MessengerMDO.java,v 1.2 2001/10/26 11:52:16 jstrachan Exp $
 */
package org.apache.commons.messagelet;

import java.util.Enumeration;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.servlet.ServletException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.commons.messenger.Messenger;
import org.apache.commons.messenger.MessengerManager;


/** <p><code>BridgeMDO</code> is an MDO which implements a JMS bridge
 * from one JMS destination and connection to another.
 * This allows messages to be consumed on one destination and sent to 
 * another JMS destination, using possibly different JMS providers.
 * For example this can be used to bridge from SpiritWave to MQSeries. 
 * </p>
 * <p>
 * This class is a useful base class to other possible bridge implementations
 * such as 2 phase commit bridges or bridges with some complex transformation etc.
 * This class has a number of Factory and Strategy methods to further customize
 * the acknowledgement and transaction management, the message construction, 
 * transformation and how to handle message headers etc.
 * </p>
 *
 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
 * @version $Revision: 1.2 $
 */
public class BridgeMDO extends MessengerMDO {

    /** Logger */
    private static final Log log = LogFactory.getLog(BridgeMDO.class);
  
    /** the Messenger used to output messages */
    private Messenger outputMessenger;
      
    /** the Destination output messages will be sent to */
    private Destination outputDestination;

    /** the name of the messenger to use for output */      
    private String outputConnection;
    
    /** the name of the destination to use */
    private String outputSubject;
    
    /** the buffer size used for ByteMessage and StreamMessage copying */
    private int bufferSize = 32 * 1024;
        
    public BridgeMDO() {
    }
    
    public void init() throws ServletException {
        try {
            Messenger messenger = getMessenger();
            Messenger outputMessenger = getOutputMessenger();
            
            if ( messenger == null ) {
                throw new ServletException( "No input Messenger is defined for this Bridge" );
            }
            if ( outputMessenger == null ) {
                throw new ServletException( "No output Messenger is defined for this Bridge" );
            }
            if ( getOutputDestination() == null ) {
                throw new ServletException( "No output Destination is defined for this Bridge" );
            }
            
            // enable transacted mode 
            messenger.getSessionFactory().setTransacted( true );     
            outputMessenger.getSessionFactory().setTransacted( true );     
            
            // use client acknowledgement
            messenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );     
            outputMessenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );     
        }
        catch (JMSException e) {
            log.error( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" );
            log.error( "Caught: " + e, e);
            throw new ServletException( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" + e, e);
        }
    }
    
    // MessageListener interface
    //-------------------------------------------------------------------------
    public void onMessage(Message message) {
        Messenger messenger = getMessenger();
        
        try {
            Message outputMessage = createOutputMessage(message);
            if ( outputMessage != null ) {
                getOutputMessenger().send( getOutputDestination(), outputMessage );
            }
            acknowledge(message);
            acknowledge(outputMessage);
            commit();
        }
        catch (Exception e) {
            log.error("Could not send message due to exception", e);
            rollback();
        }
    }
    
    
    // Properties
    //-------------------------------------------------------------------------
    public String getOutputConnection() {
        return outputConnection;
    }

    /**
     * Sets the connection name (messenger instance) to use
     * to output messages
     */    
    public void setOutputConnection(String outputConnection) {
        this.outputConnection = outputConnection;
    }
    
    public String getOutputSubject() {
        return outputSubject;
    }
    
    /** 
     * Sets the subject (i.e. destination name) to send messages to
     */
    public void setOutputSubject(String outputSubject) {
        this.outputSubject = outputSubject;
    }
    
    /**
     * Gets the Messenger used to output messages 
     */
    public Messenger getOutputMessenger() throws JMSException {
        if ( outputMessenger == null ) {
            String name = getOutputConnection();
            if ( name != null ) {
                outputMessenger = getMessengerManager().getMessenger( name );
            }
            else {
                // default to the input messenger
                outputMessenger = getMessenger();
            }
        }
        return outputMessenger;
    }
    
    /**
     * Sets the Messenger used to output messages 
     */
    public void setOutputMessenger(Messenger outputMessenger) {
        this.outputMessenger = outputMessenger;
    }
    
    /**
     * Gets the Destination output messages will be sent to
     */
    public Destination getOutputDestination() throws JMSException {
        if ( outputDestination == null ) {
            String subject = getOutputSubject();
            if ( subject == null ) {
                throw new JMSException( "A bridge must have an outputSubject defined!" );
            }
            outputDestination = getOutputMessenger().getDestination( subject );
        }
        return outputDestination;
    }
    
    /**
     * Sets the Destination output messages will be sent to
     */
    public void setOutputDestination(Destination outputDestination) {
        this.outputDestination = outputDestination;
    }
    
    /**
     * Gets the buffer size used for ByteMessage and StreamMessage copying
     */
    public int getBufferSize() {
        return bufferSize;
    }

    /**
     * Sets the buffer size used for ByteMessage and StreamMessage copying
     */
    public void setBufferSize(int bufferSize) {
        this.bufferSize = bufferSize;
    }
    
    
    // Implementation methods
    //-------------------------------------------------------------------------

    /**
     * Strategy method to perform a commit() on both the incoming Messenger and the
     * output Messenger.
     */
    protected void commit() throws JMSException {
        getOutputMessenger().commit();
        getMessenger().commit();
    }

    /**
     * Strategy method to perform a rollback() on both the incoming Messenger and the
     * output Messenger.
     */
    protected void rollback() {
        try {
            getOutputMessenger().rollback();
        }
        catch (Exception e) {
            log.error( "Caught exception rolling back the output messenger: " + e, e );
        }
        try {
            getMessenger().rollback();
        }
        catch (Exception e) {
            log.error( "Caught exception rolling back the input messenger: " + e, e );
        }
    }

    
    /**
     * Factory method to create an output message given an input message.
     * Derived classes could override this method to perform any kind of 
     * Message transformation.
     */
    protected Message createOutputMessage(Message inputMessage) throws JMSException {
        Message outputMessage = null;
        
        if ( inputMessage instanceof TextMessage ) {
            outputMessage = createOutputTextMessage( (TextMessage) inputMessage );
        }
        else if ( inputMessage instanceof ObjectMessage ) {
            outputMessage = createOutputObjectMessage( (ObjectMessage) inputMessage );
        }
        else if ( inputMessage instanceof MapMessage ) {
            outputMessage = createOutputMapMessage( (MapMessage) inputMessage );
        }
        else if ( inputMessage instanceof BytesMessage ) {
            outputMessage = createOutputBytesMessage( (BytesMessage) inputMessage );
        }
        else if ( inputMessage instanceof StreamMessage ) {
            outputMessage = createOutputStreamMessage( (StreamMessage) inputMessage );
        }
        else {
            outputMessage = getOutputMessenger().createMessage();
        }
        
        processMessageHeaders(inputMessage, outputMessage);
        
        return outputMessage;
    }
        
    /**
     * Factory method to create ObjectMessage 
     * Derived classes could override this method to perform any kind of 
     * Message transformation.
     */
    protected ObjectMessage createOutputObjectMessage(ObjectMessage inputMessage) throws JMSException {
        return getOutputMessenger().createObjectMessage( inputMessage.getObject() );
    }
    
    /**
     * Factory method to create TextMessage 
     * Derived classes could override this method to perform any kind of 
     * Message transformation.
     */
    protected TextMessage createOutputTextMessage(TextMessage inputMessage) throws JMSException {
        return getOutputMessenger().createTextMessage( inputMessage.getText() );
    }
    
    /**
     * Factory method to create MapMessage 
     * Derived classes could override this method to perform any kind of 
     * Message transformation.
     */
    protected MapMessage createOutputMapMessage(MapMessage inputMessage) throws JMSException {
        MapMessage answer = getOutputMessenger().createMapMessage();
        
        // copy across all values
        for ( Enumeration enum = inputMessage.getMapNames(); enum.hasMoreElements(); ) {
            String name = (String) enum.nextElement();
            Object value = inputMessage.getObject( name );
            answer.setObject( name, value );
        }
        return answer;
    }
    
    /**
     * Factory method to create BytesMessage 
     * Derived classes could override this method to perform any kind of 
     * Message transformation.
     */
    protected BytesMessage createOutputBytesMessage(BytesMessage inputMessage) throws JMSException {
        BytesMessage answer = getOutputMessenger().createBytesMessage();
        
        // copy across all data
        byte[] buffer = new byte[bufferSize];
        while (true ) {
            int size = inputMessage.readBytes( buffer );
            if ( size <= 0 ) {
                break;
            }
            answer.writeBytes( buffer, 0, size );
            if ( size < bufferSize ) {
                break;
            }
        }
        return answer;
    }
    
    /**
     * Factory method to create StreamMessage 
     * Derived classes could override this method to perform any kind of 
     * Message transformation.
     */
    protected StreamMessage createOutputStreamMessage(StreamMessage inputMessage) throws JMSException {
        StreamMessage answer = getOutputMessenger().createStreamMessage();
        
        // copy across all data
        byte[] buffer = new byte[bufferSize];
        while (true ) {
            int size = inputMessage.readBytes( buffer );
            if ( size <= 0 ) {
                break;
            }
            answer.writeBytes( buffer, 0, size );
            if ( size < bufferSize ) {
                break;
            }
        }
        return answer;
    }
    
    
    
    /**
     * Strategy method to add any headers required on the output message.
     * Derived classes could override this method to perform any kind of 
     * header processing, such as copying the correlation ID, copying all
     * headers or adding some new custom headers etc.
     */
    protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException {
    }
     
    /**
     * Strategy method to allow different derived classes to acknowledge
     * messages differently, such as to disable acknowledgements
     */
    protected void acknowledge(Message message) throws JMSException {
        message.acknowledge();
    }            
}


  
  
  1.7       +5 -6      jakarta-commons-sandbox/messenger/src/webapp/conf/subscriptions.xml
  
  Index: subscriptions.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/webapp/conf/subscriptions.xml,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- subscriptions.xml	30 Oct 2001 11:14:31 -0000	1.6
  +++ subscriptions.xml	15 May 2002 14:36:34 -0000	1.7
  @@ -1,20 +1,19 @@
   <?xml version="1.0" encoding="UTF-8"?>
   <subscriptions>
  -
  -  <subscription messenger="queue" destination="jms/Queue">
  +  <subscription connection="queue" subject="jms/Queue">
       <listener className="EchoMDO"/>
     </subscription>
     
  -  <subscription messenger="queue" destination="echo.servlet">
  +  <subscription connection="queue" subject="echo.servlet">
       <servlet>/jms/echoServlet</servlet>
     </subscription>
     
  -  <subscription messenger="queue" destination="echo.messagelet">
  +  <subscription connection="queue" subject="echo.messagelet">
       <servlet>/jms/echoMessagelet</servlet>
     </subscription>
     
  -  <subscription messenger="queue" destination="echo.jsp">
  +  <subscription connection="queue" subject="echo.jsp">
       <servlet>/jms/time.jsp?a=1&amp;b=2</servlet>
     </subscription>
  -  
  +  
  <subscription connection="queue" subject="foo.input">

    <bridge outputConnection="queue" outputSubject="foo.output"/>

  </subscription>

  
   </subscriptions>
  
  
  
  1.3       +1 -50     jakarta-commons-sandbox/messenger/src/webapp/src/EchoMDO.java
  
  Index: EchoMDO.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/webapp/src/EchoMDO.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- EchoMDO.java	12 Oct 2001 18:49:30 -0000	1.2
  +++ EchoMDO.java	15 May 2002 14:36:34 -0000	1.3
  @@ -1,50 +1 @@
  -/*
  - * Copyright (C) The Apache Software Foundation. All rights reserved.
  - *
  - * This software is published under the terms of the Apache Software License
  - * version 1.1, a copy of which has been included with this distribution in
  - * the LICENSE file.
  - * 
  - * $Id: EchoMDO.java,v 1.2 2001/10/12 18:49:30 jstrachan Exp $
  - */
  -
  -import javax.jms.Destination;
  -import javax.jms.JMSException;
  -import javax.jms.Message;
  -
  -import org.apache.commons.messagelet.MessengerMDO;
  -import org.apache.commons.messenger.Messenger;
  -
  -/** <p><code>EchoMDO</code> is a simple echo MDO which
  -  * replies with a piece of text.</p>
  -  *
  -  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.2 $
  -  */
  -public class EchoMDO extends MessengerMDO {
  -
  -    public void onMessage(Message message) {
  -        Destination destination = null;        
  -        try {
  -            destination = message.getJMSReplyTo();
  -        }
  -        catch (JMSException e) {
  -            log( "Could not find replyTo Destination for message: " + message, e );
  -            return;
  -        }
  -        if ( destination == null ) {
  -            log( "No replyTo Destination for message: " + message );
  -            return;
  -        }
  -        
  -        Messenger messenger = getMessenger();
  -        String text = "Received: " + message;
  -        try {
  -            Message reply = messenger.createTextMessage( text );
  -            messenger.send( destination, reply );
  -        }
  -        catch (Exception e) {
  -            log( "Could not send reply", e );
  -        }
  -    }
  -}
  +/*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE file.
 * 
 * $Id: EchoMDO.java,v 1.3 2002/05/15 14:36:34 jstrachan Exp $
 */

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.commons.messagelet.MessengerMDO;
import org.apache.commons.messenger.Messenger;

/** <p><code>EchoMDO</code> is a simple echo MDO which
  * replies with a piece of text.</p>
  *
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  * @version $Revision: 1.3 $
  */
public class EchoMDO extends MessengerMDO {
    
    public void onMessage(Message message) {
        Destination destination = null;
        try {
            destination = message.getJMSReplyTo();
        }
        catch (JMSException e) {
            log("Could not find replyTo Destination for message: " + message, e);
            return;
        }
        if (destination == null) {
            log("No replyTo Destination for message: " + message);
            return;
        }
        
        Messenger messenger = getMessenger();
        String text = "Received: " + message;
        
        try {
            Message reply = messenger.createTextMessage(text);
            messenger.send(destination, reply);
        }
        catch (Exception e) {
            log("Could not send reply", e);
        }
    }
}
  \ No newline at end of file
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>