You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2003/08/27 18:26:38 UTC

cvs commit: jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger MessengerSession.java DefaultMessenger.java MessengerSupport.java

jstrachan    2003/08/27 09:26:38

  Modified:    messenger/src/java/org/apache/commons/messenger
                        MessengerSession.java DefaultMessenger.java
                        MessengerSupport.java
  Log:
  removed the caching of MessageProducer instances.

Also refactored some of the code into the DefaultMessenger implementation to make other Messenger implementations easier
  
  Revision  Changes    Path
  1.7       +7 -15     jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java
  
  Index: MessengerSession.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- MessengerSession.java	4 Mar 2003 10:20:59 -0000	1.6
  +++ MessengerSession.java	27 Aug 2003 16:26:38 -0000	1.7
  @@ -46,15 +46,15 @@
       /** An optional cache of requestors */
       private Map requestorsMap;    
   
  -    /** the cache of producers */
  -    private Map producers;
  -    
       /** The inbox which is used for the call() methods */
       private Destination replyToDestination;
   
       /** The current messenger to which I'm connected */
       private MessengerSupport messenger;
   
  +	/** The producer used to send messages using this session */
  +    private MessageProducer producer;
  +
       public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
           this.messenger = messenger;
           this.sessionFactory = sessionFactory;
  @@ -101,16 +101,8 @@
        * @return the MessageProducer for the given destination.
        */
       public MessageProducer getMessageProducer(Destination destination) throws JMSException {
  -        MessageProducer producer = null;
  -        if ( producers == null ) {
  -            producers = new HashMap();
  -        }
  -        else {
  -            producer = (MessageProducer) producers.get( destination );
  -        }
  -        if ( producer == null ) {
  -            producer = messenger.createMessageProducer( session, destination );
  -            producers.put( destination, producer );
  +    	if (producer == null) {
  +    		producer = messenger.createMessageProducer( session, null );
           }
           return producer;
       }
  
  
  
  1.18      +155 -1    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
  
  Index: DefaultMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- DefaultMessenger.java	20 Mar 2003 08:32:54 -0000	1.17
  +++ DefaultMessenger.java	27 Aug 2003 16:26:38 -0000	1.18
  @@ -58,14 +58,19 @@
   
   import javax.jms.Connection;
   import javax.jms.ConnectionFactory;
  +import javax.jms.Destination;
   import javax.jms.JMSException;
  +import javax.jms.Message;
  +import javax.jms.MessageConsumer;
   import javax.jms.MessageListener;
   import javax.jms.MessageProducer;
   import javax.jms.Queue;
  +import javax.jms.QueueSender;
   import javax.jms.QueueSession;
   import javax.jms.ServerSessionPool;
   import javax.jms.Session;
   import javax.jms.Topic;
  +import javax.jms.TopicPublisher;
   import javax.jms.TopicSession;
   import javax.naming.Context;
   
  @@ -133,6 +138,94 @@
       public Session getAsyncSession() throws JMSException {
           return getMessengerSession().getListenerSession();
       }
  +
  +	public Message call(Destination destination, Message message)
  +		throws JMSException {
  +		Session session = borrowSession();
  +		MessageProducer producer = null;
  +		try {
  +			Destination replyTo = getReplyToDestination();
  +			message.setJMSReplyTo(replyTo);
  +
  +			// NOTE - we could consider adding a correlation ID per request so that we can ignore
  +			// any cruft or old messages that are sent onto our inbound queue.
  +			//
  +			// Though that does mean that we must then rely on the inbound message having
  +			// the right correlationID. Though at least this strategy would mean
  +			// that we could have a single consumer on a temporary queue for all threads
  +			// and use correlation IDs to dispatch to the corrent thread
  +			//
  +			// Maybe this should be a configurable strategy
  +
  +			producer = borrowMessageProducer(session, destination);
  +			MessageConsumer consumer = getReplyToConsumer();
  +
  +			if (isTopic(producer)) {
  +				((TopicPublisher) producer).publish((Topic) destination, message);
  +			}
  +			else {
  +				((QueueSender) producer).send((Queue) destination, message);
  +			}
  +			Message response = consumer.receive();
  +			if (response == null) {
  +				// we could have timed out so lets trash the temporary destination
  +				// so that the next call() method will use a new destination to avoid
  +				// the response for this call() coming back on later call() invokcations
  +				clearReplyToDestination();
  +			}
  +			return response;
  +		}
  +		finally {
  +			returnMessageProducer(producer);
  +			returnSession(session);
  +		}
  +	}
  +
  +	public Message call(
  +		Destination destination,
  +		Message message,
  +		long timeoutMillis)
  +		throws JMSException {
  +		Session session = borrowSession();
  +		MessageProducer producer = null;
  +		try {
  +			Destination replyTo = getReplyToDestination();
  +			message.setJMSReplyTo(replyTo);
  +
  +			// NOTE - we could consider adding a correlation ID per request so that we can ignore
  +			// any cruft or old messages that are sent onto our inbound queue.
  +			//
  +			// Though that does mean that we must then rely on the inbound message having
  +			// the right correlationID. Though at least this strategy would mean
  +			// that we could have a single consumer on a temporary queue for all threads
  +			// and use correlation IDs to dispatch to the corrent thread
  +			//
  +			// Maybe this should be a configurable strategy
  +
  +			producer = borrowMessageProducer(session, destination);
  +
  +			MessageConsumer consumer = getReplyToConsumer();
  +			if (isTopic(producer)) {
  +				((TopicPublisher) producer).publish((Topic) destination, message);
  +			}
  +			else {
  +				((QueueSender) producer).send((Queue) destination, message);
  +			}
  +			Message response = consumer.receive(timeoutMillis);
  +			if (response == null) {
  +				// we could have timed out so lets trash the temporary destination
  +				// so that the next call() method will use a new destination to avoid
  +				// the response for this call() coming back on later call() invokcations
  +				clearReplyToDestination();
  +			}
  +			return response;
  +		}
  +		finally {
  +			returnMessageProducer(producer);
  +			returnSession(session);
  +		}
  +	}
  +
       
       // Implementation methods
       //-------------------------------------------------------------------------
  @@ -163,8 +256,69 @@
           return getMessengerSession().getListenerSession();
       }
       
  +	/** @return a message producer for the given session and destination */
  +	protected MessageProducer borrowMessageProducer(
  +		Session session,
  +		Destination destination)
  +		throws JMSException {
  +
  +		if (isCacheProducers()) {
  +			return getMessengerSession().getMessageProducer(destination);
  +		}
  +		else {
  +			return createMessageProducer(session, destination);
  +		}
  +	}
  +
  +	protected void returnMessageProducer(MessageProducer producer)
  +		throws JMSException {
  +		if (!isCacheProducers()) {
  +			producer.close();
  +		}
  +	}
  +    
       protected void returnListenerSession(Session session) throws JMSException {
       }
  +
  +	/**
  +	 * @return the MessageConsumer for this threads temporary destination
  +	 * which is cached for the duration of this process.
  +	 */
  +	protected MessageConsumer getReplyToConsumer() throws JMSException {
  +		MessengerSession messengerSession = getMessengerSession();
  +		MessageConsumer consumer = messengerSession.getReplyToConsumer();
  +		if (consumer == null) {
  +			consumer =
  +				createMessageConsumer(
  +					messengerSession.getSession(),
  +					messengerSession.getReplyToDestination());
  +			messengerSession.setReplyToConsumer(consumer);
  +		}
  +		return consumer;
  +	}
  +
  +	/**
  +	 * Clears the temporary destination used to receive reply-to messages
  +	 * which will lazily force a new destination and consumer to be created next
  +	 * time a call() method is invoked.
  +	 */
  +	protected void clearReplyToDestination() throws JMSException {
  +		MessengerSession messengerSession = getMessengerSession();
  +
  +		messengerSession.setReplyToDestination(null);
  +		MessageConsumer consumer = messengerSession.getReplyToConsumer();
  +		if (consumer != null) {
  +			messengerSession.setReplyToConsumer(null);
  +
  +			// ensure that everything is nullified first before we close
  +			// just in case an exception occurs
  +			consumer.close();
  +		}
  +	}
  +
  +	protected Destination getReplyToDestination() throws JMSException {
  +		return getMessengerSession().getReplyToDestination();
  +	}
   
       /**
        * @return the current thread's MessengerSession
  
  
  
  1.36      +9 -153    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
  
  Index: MessengerSupport.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
  retrieving revision 1.35
  retrieving revision 1.36
  diff -u -r1.35 -r1.36
  --- MessengerSupport.java	6 Jun 2003 10:51:08 -0000	1.35
  +++ MessengerSupport.java	27 Aug 2003 16:26:38 -0000	1.36
  @@ -154,10 +154,10 @@
           try {
               producer = borrowMessageProducer(session, destination);
               if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(message);
  +                ((TopicPublisher) producer).publish((Topic) destination, message);
               }
               else {
  -                ((QueueSender) producer).send(message);
  +                ((QueueSender) producer).send((Queue) destination, message);
               }
           }
           finally {
  @@ -166,93 +166,6 @@
           }
       }
   
  -    public Message call(Destination destination, Message message)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            Destination replyTo = getReplyToDestination();
  -            message.setJMSReplyTo(replyTo);
  -
  -            // NOTE - we could consider adding a correlation ID per request so that we can ignore
  -            // any cruft or old messages that are sent onto our inbound queue.
  -            //
  -            // Though that does mean that we must then rely on the inbound message having
  -            // the right correlationID. Though at least this strategy would mean
  -            // that we could have a single consumer on a temporary queue for all threads
  -            // and use correlation IDs to dispatch to the corrent thread
  -            //
  -            // Maybe this should be a configurable strategy
  -
  -            producer = borrowMessageProducer(session, destination);
  -            MessageConsumer consumer = getReplyToConsumer();
  -
  -            if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(message);
  -            }
  -            else {
  -                ((QueueSender) producer).send(message);
  -            }
  -            Message response = consumer.receive();
  -            if (response == null) {
  -                // we could have timed out so lets trash the temporary destination
  -                // so that the next call() method will use a new destination to avoid
  -                // the response for this call() coming back on later call() invokcations
  -                clearReplyToDestination();
  -            }
  -            return response;
  -        }
  -        finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  -        }
  -    }
  -
  -    public Message call(
  -        Destination destination,
  -        Message message,
  -        long timeoutMillis)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -        try {
  -            Destination replyTo = getReplyToDestination();
  -            message.setJMSReplyTo(replyTo);
  -
  -            // NOTE - we could consider adding a correlation ID per request so that we can ignore
  -            // any cruft or old messages that are sent onto our inbound queue.
  -            //
  -            // Though that does mean that we must then rely on the inbound message having
  -            // the right correlationID. Though at least this strategy would mean
  -            // that we could have a single consumer on a temporary queue for all threads
  -            // and use correlation IDs to dispatch to the corrent thread
  -            //
  -            // Maybe this should be a configurable strategy
  -
  -            producer = borrowMessageProducer(session, destination);
  -
  -            MessageConsumer consumer = getReplyToConsumer();
  -            if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(message);
  -            }
  -            else {
  -                ((QueueSender) producer).send(message);
  -            }
  -            Message response = consumer.receive(timeoutMillis);
  -            if (response == null) {
  -                // we could have timed out so lets trash the temporary destination
  -                // so that the next call() method will use a new destination to avoid
  -                // the response for this call() coming back on later call() invokcations
  -                clearReplyToDestination();
  -            }
  -            return response;
  -        }
  -        finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  -        }
  -    }
  -
       public Message receive(Destination destination) throws JMSException {
           Session session = borrowSession();
           MessageConsumer consumer = null;
  @@ -758,6 +671,7 @@
               producer = borrowMessageProducer(session, destination);
               if (isTopic(producer)) {
                   ((TopicPublisher) producer).publish(
  +                	(Topic) destination,
                       message,
                       deliveryMode,
                       priority,
  @@ -765,6 +679,7 @@
               }
               else {
                   ((QueueSender) producer).send(
  +                	(Queue) destination, 
                       message,
                       deliveryMode,
                       priority,
  @@ -947,32 +862,12 @@
       protected abstract boolean isTopic(MessageProducer producer)
           throws JMSException;
   
  -    /**
  -     * @return the current thread's MessengerSession
  -     */
  -    protected abstract MessengerSession getMessengerSession()
  -        throws JMSException;
  -
       /** @return a message producer for the given session and destination */
  -    protected MessageProducer borrowMessageProducer(
  +    protected abstract MessageProducer borrowMessageProducer(
           Session session,
  -        Destination destination)
  -        throws JMSException {
  +        Destination destination) throws JMSException;
   
  -        if (isCacheProducers()) {
  -            return getMessengerSession().getMessageProducer(destination);
  -        }
  -        else {
  -            return createMessageProducer(session, destination);
  -        }
  -    }
  -
  -    protected void returnMessageProducer(MessageProducer producer)
  -        throws JMSException {
  -        if (!isCacheProducers()) {
  -            producer.close();
  -        }
  -    }
  +    protected abstract void returnMessageProducer(MessageProducer producer) throws JMSException;
   
       /** @return a newly created message producer for the given session and destination */
       protected MessageProducer createMessageProducer(
  @@ -997,41 +892,6 @@
           return answer;
       }
   
  -    /**
  -     * @return the MessageConsumer for this threads temporary destination
  -     * which is cached for the duration of this process.
  -     */
  -    protected MessageConsumer getReplyToConsumer() throws JMSException {
  -        MessengerSession messengerSession = getMessengerSession();
  -        MessageConsumer consumer = messengerSession.getReplyToConsumer();
  -        if (consumer == null) {
  -            consumer =
  -                createMessageConsumer(
  -                    messengerSession.getSession(),
  -                    messengerSession.getReplyToDestination());
  -            messengerSession.setReplyToConsumer(consumer);
  -        }
  -        return consumer;
  -    }
  -
  -    /**
  -     * Clears the temporary destination used to receive reply-to messages
  -     * which will lazily force a new destination and consumer to be created next
  -     * time a call() method is invoked.
  -     */
  -    protected void clearReplyToDestination() throws JMSException {
  -        MessengerSession messengerSession = getMessengerSession();
  -
  -        messengerSession.setReplyToDestination(null);
  -        MessageConsumer consumer = messengerSession.getReplyToConsumer();
  -        if (consumer != null) {
  -            messengerSession.setReplyToConsumer(null);
  -
  -            // ensure that everything is nullified first before we close
  -            // just in case an exception occurs
  -            consumer.close();
  -        }
  -    }
   
       /** @return a MessageConsumer for the given session and destination */
       protected MessageConsumer borrowMessageConsumer(
  @@ -1170,9 +1030,5 @@
           throws JMSException {
           // XXXX: might want to cache
           return session.createTopic(subject);
  -    }
  -
  -    protected Destination getReplyToDestination() throws JMSException {
  -        return getMessengerSession().getReplyToDestination();
       }
   }