You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2003/06/06 12:51:09 UTC

cvs commit: jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger MessengerSupport.java

jstrachan    2003/06/06 03:51:09

  Modified:    messenger/src/java/org/apache/commons/messenger
                        MessengerSupport.java
  Log:
  Added a log message when Destination objects are created.

Also fixed a bug where the noLocal flag wasn't correctly being passed through.
  
  Revision  Changes    Path
  1.35      +172 -131  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
  
  Index: MessengerSupport.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
  retrieving revision 1.34
  retrieving revision 1.35
  diff -u -r1.34 -r1.35
  --- MessengerSupport.java	4 Mar 2003 10:21:00 -0000	1.34
  +++ MessengerSupport.java	6 Jun 2003 10:51:08 -0000	1.35
  @@ -54,6 +54,8 @@
   
       /** Logger */
       private static final Log log = LogFactory.getLog(MessengerSupport.class);
  +    private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination");
  +    
       
       private static final boolean CACHE_REQUESTOR = true;
   
  @@ -65,38 +67,39 @@
        * or wether they should be created on the fly 
        */
       private boolean jndiDestinations;
  -    
  +
       /** are topic subscribers durable? */
       private boolean durable;
   
  -	/** the delivery mode used by default when sending messages */
  -	private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
  -	    
  +    /** the delivery mode used by default when sending messages */
  +    private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
  +
       /** the durable name used for durable topic based subscriptions */
       private String durableName;
  -    
  +
       /** 
        * whether local messages are ignored when topic based subscription is used
        * with a message selector 
        */
       private boolean noLocal;
  -    
  +
       /** Should we cache the requestor object per thread? */
       private boolean cacheRequestors;
   
       /** A Map of ListenerKey objects to MessageConsumer objects */
       private Map listeners = new HashMap();
  -    
  +
       /** whether MessageProducer instances should be cached or not */
       private boolean cacheProducers = true;
  -    
  +
       public MessengerSupport() {
       }
  -    
  +
       public String toString() {
           try {
               Session session = borrowSession();
  -            String answer = super.toString() + " session: " + session.toString();
  +            String answer =
  +                super.toString() + " session: " + session.toString();
               returnSession(session);
               return answer;
           }
  @@ -104,14 +107,21 @@
               return super.toString() + " session: " + e.toString();
           }
       }
  -    
  +
       public Destination getDestination(String subject) throws JMSException {
           Session session = borrowSession();
           try {
  +            boolean debug = destinationLog.isInfoEnabled();
               if (isTopic(session)) {
  +                if (debug) {
  +                    destinationLog.info("Using topic: " + subject);
  +                }
                   return getTopic((TopicSession) session, subject);
               }
               else {
  +                if (debug) {
  +                    destinationLog.info("Using queue: " + subject);
  +                }
                   return getQueue((QueueSession) session, subject);
               }
           }
  @@ -119,7 +129,7 @@
               returnSession(session);
           }
       }
  -    
  +
       public Destination createTemporaryDestination() throws JMSException {
           Session session = borrowSession();
           try {
  @@ -136,7 +146,7 @@
               returnSession(session);
           }
       }
  -    
  +
       public void send(Destination destination, Message message)
           throws JMSException {
           Session session = borrowSession();
  @@ -156,14 +166,14 @@
           }
       }
   
  -    public Message call( Destination destination, Message message ) throws JMSException {
  +    public Message call(Destination destination, Message message)
  +        throws JMSException {
           Session session = borrowSession();
           MessageProducer producer = null;
           try {
               Destination replyTo = getReplyToDestination();
               message.setJMSReplyTo(replyTo);
   
  -            
               // NOTE - we could consider adding a correlation ID per request so that we can ignore
               // any cruft or old messages that are sent onto our inbound queue.
               //
  @@ -174,17 +184,17 @@
               //
               // Maybe this should be a configurable strategy
   
  -            producer = borrowMessageProducer( session, destination );
  +            producer = borrowMessageProducer(session, destination);
               MessageConsumer consumer = getReplyToConsumer();
   
               if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish( message );
  +                ((TopicPublisher) producer).publish(message);
               }
               else {
  -                ((QueueSender) producer).send( message );
  +                ((QueueSender) producer).send(message);
               }
               Message response = consumer.receive();
  -            if ( response == null ) {
  +            if (response == null) {
                   // we could have timed out so lets trash the temporary destination
                   // so that the next call() method will use a new destination to avoid
                   // the response for this call() coming back on later call() invokcations
  @@ -194,10 +204,10 @@
           }
           finally {
               returnMessageProducer(producer);
  -            returnSession( session );
  +            returnSession(session);
           }
       }
  -    
  +
       public Message call(
           Destination destination,
           Message message,
  @@ -208,7 +218,7 @@
           try {
               Destination replyTo = getReplyToDestination();
               message.setJMSReplyTo(replyTo);
  -            
  +
               // NOTE - we could consider adding a correlation ID per request so that we can ignore
               // any cruft or old messages that are sent onto our inbound queue.
               //
  @@ -218,9 +228,9 @@
               // and use correlation IDs to dispatch to the corrent thread
               //
               // Maybe this should be a configurable strategy
  -            
  +
               producer = borrowMessageProducer(session, destination);
  -            
  +
               MessageConsumer consumer = getReplyToConsumer();
               if (isTopic(producer)) {
                   ((TopicPublisher) producer).publish(message);
  @@ -229,7 +239,7 @@
                   ((QueueSender) producer).send(message);
               }
               Message response = consumer.receive(timeoutMillis);
  -            if ( response == null ) {
  +            if (response == null) {
                   // we could have timed out so lets trash the temporary destination
                   // so that the next call() method will use a new destination to avoid
                   // the response for this call() coming back on later call() invokcations
  @@ -242,7 +252,7 @@
               returnSession(session);
           }
       }
  -    
  +
       public Message receive(Destination destination) throws JMSException {
           Session session = borrowSession();
           MessageConsumer consumer = null;
  @@ -339,7 +349,9 @@
           }
       }
   
  -    public MessageConsumer createConsumer(Destination destination, String selector)
  +    public MessageConsumer createConsumer(
  +        Destination destination,
  +        String selector)
           throws JMSException {
           Session session = borrowSession();
           try {
  @@ -367,7 +379,11 @@
           ServerSessionPool sessionPool,
           int maxMessages)
           throws JMSException {
  -        return createConnectionConsumer(destination, null, sessionPool, maxMessages);
  +        return createConnectionConsumer(
  +            destination,
  +            null,
  +            sessionPool,
  +            maxMessages);
       }
   
       public ConnectionConsumer createConnectionConsumer(
  @@ -407,7 +423,6 @@
   
       public abstract Connection getConnection() throws JMSException;
   
  -
       // Listener API
       //-------------------------------------------------------------------------
       public void addListener(Destination destination, MessageListener listener)
  @@ -418,7 +433,8 @@
           }
           Session session = borrowListenerSession();
           try {
  -            MessageConsumer consumer = createMessageConsumer(session, destination);
  +            MessageConsumer consumer =
  +                createMessageConsumer(session, destination);
               consumer.setMessageListener(listener);
               ListenerKey key = new ListenerKey(destination, listener);
               listeners.put(key, consumer);
  @@ -451,7 +467,9 @@
           }
       }
   
  -    public void removeListener(Destination destination, MessageListener listener)
  +    public void removeListener(
  +        Destination destination,
  +        MessageListener listener)
           throws JMSException {
           ListenerKey key = new ListenerKey(destination, listener);
           MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
  @@ -584,18 +602,19 @@
   
       /**
        * Creates a browser on the given Queue
  -     */    
  -    public QueueBrowser createBrowser(Destination destination) throws JMSException {
  +     */
  +    public QueueBrowser createBrowser(Destination destination)
  +        throws JMSException {
           Session session = borrowSession();
           QueueBrowser browser = null;
           try {
               return createBrowser(session, destination);
  -        } 
  +        }
           finally {
               returnSession(session);
           }
       }
  -    
  +
       /** Get the producer's default delivery mode. */
       public int getDeliveryMode(Destination destination) throws JMSException {
           Session session = borrowSession();
  @@ -709,7 +728,9 @@
       }
       /** Set whether message timestamps are disabled. */
   
  -    public void setDisableMessageTimestamp(Destination destination, boolean value)
  +    public void setDisableMessageTimestamp(
  +        Destination destination,
  +        boolean value)
           throws JMSException {
           Session session = borrowSession();
           MessageProducer producer = null;
  @@ -743,7 +764,11 @@
                       timeToLive);
               }
               else {
  -                ((QueueSender) producer).send(message, deliveryMode, priority, timeToLive);
  +                ((QueueSender) producer).send(
  +                    message,
  +                    deliveryMode,
  +                    priority,
  +                    timeToLive);
               }
           }
           finally {
  @@ -784,7 +809,6 @@
           }
       }
   
  -
       // Properties
       //-------------------------------------------------------------------------
       /** Gets the name that this Messenger is called in a MessengerManager */
  @@ -821,13 +845,12 @@
       public boolean isCacheRequestors() {
           return cacheRequestors;
       }
  -    
  +
       /** Sets whether we should cache the requestor object per thread? */
       public void setCacheRequestors(boolean cacheRequestors) {
           this.cacheRequestors = cacheRequestors;
       }
   
  -
       /** @return the durable name used for durable topic based subscriptions */
       public String getDurableName() {
           return durableName;
  @@ -845,7 +868,7 @@
       public boolean isNoLocal() {
           return noLocal;
       }
  -    
  +
       /** 
        * Sets whether local messages are ignored when topic based subscription is used
        * with a message selector 
  @@ -853,7 +876,7 @@
       public void setNoLocal(boolean noLocal) {
           this.noLocal = noLocal;
       }
  -    
  +
       /** Gets whether MessageProducer instances should be cached or not, which defaults to true */
       public boolean isCacheProducers() {
           return cacheProducers;
  @@ -864,96 +887,99 @@
           this.cacheProducers = cacheProducers;
       }
   
  -	/**
  -	 * Returns the delivery mode used on messages sent via this Messenger
  -	 * @return int
  -	 */
  -	public int getDeliveryMode() {
  -		return deliveryMode;
  -	}
  -
  -	/**
  -	 * Sets the delivery mode used on messages sent via this Messenger
  -	 * @param deliveryMode The deliveryMode to set
  -	 */
  -	public void setDeliveryMode(int deliveryMode) {
  -		this.deliveryMode = deliveryMode;
  -	}
  +    /**
  +     * Returns the delivery mode used on messages sent via this Messenger
  +     * @return int
  +     */
  +    public int getDeliveryMode() {
  +        return deliveryMode;
  +    }
  +
  +    /**
  +     * Sets the delivery mode used on messages sent via this Messenger
  +     * @param deliveryMode The deliveryMode to set
  +     */
  +    public void setDeliveryMode(int deliveryMode) {
  +        this.deliveryMode = deliveryMode;
  +    }
   
  -    
       /**
        * Sets whether message delivery should be persistent or not
  -     * 
     * @param persistentDelivery
     */
  +     * 
  +     * @param persistentDelivery
  +     */
       public void setPersistentDelivery(boolean persistentDelivery) {
  -    	if (persistentDelivery) {
  -    		setDeliveryMode(DeliveryMode.PERSISTENT);
  -    	}
  -    	else {
  -    		setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  -    	}
  +        if (persistentDelivery) {
  +            setDeliveryMode(DeliveryMode.PERSISTENT);
  +        }
  +        else {
  +            setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  +        }
       }
  -    
  +
       // Implementation methods
       //-------------------------------------------------------------------------
  -    
  +
       /** Borrows a session instance from the pool */
       protected abstract Session borrowSession() throws JMSException;
  -    
  +
       /** @return a session instance back to the pool */
       protected abstract void returnSession(Session session) throws JMSException;
  -    
  +
       /** Deletes a session instance */
       //protected abstract void deleteSession(Session session) throws JMSException;
  -    
  +
       /** Borrows a session instance from the pool */
       protected abstract Session borrowListenerSession() throws JMSException;
  -    
  +
       /** @return a session instance back to the pool */
       protected abstract void returnListenerSession(Session session)
           throws JMSException;
  -    
  -    protected abstract boolean isTopic(Connection connection) throws JMSException;
  -    
  +
  +    protected abstract boolean isTopic(Connection connection)
  +        throws JMSException;
  +
       protected abstract boolean isTopic(ConnectionFactory factory)
           throws JMSException;
  -    
  +
       protected abstract boolean isTopic(Session session) throws JMSException;
  -    
  +
       protected abstract boolean isTopic(MessageProducer producer)
           throws JMSException;
  -    
  +
       /**
        * @return the current thread's MessengerSession
  -     */    
  -    protected abstract MessengerSession getMessengerSession() throws JMSException;
  -    
  -    
  +     */
  +    protected abstract MessengerSession getMessengerSession()
  +        throws JMSException;
  +
       /** @return a message producer for the given session and destination */
       protected MessageProducer borrowMessageProducer(
           Session session,
           Destination destination)
           throws JMSException {
  -            
  -        if ( isCacheProducers() ) {
  -            return getMessengerSession().getMessageProducer(destination);            
  +
  +        if (isCacheProducers()) {
  +            return getMessengerSession().getMessageProducer(destination);
           }
  -        else {            
  +        else {
               return createMessageProducer(session, destination);
           }
       }
  -    
  -    protected void returnMessageProducer(MessageProducer producer) throws JMSException {
  -        if ( ! isCacheProducers() ) {
  +
  +    protected void returnMessageProducer(MessageProducer producer)
  +        throws JMSException {
  +        if (!isCacheProducers()) {
               producer.close();
           }
       }
  -    
  +
       /** @return a newly created message producer for the given session and destination */
       protected MessageProducer createMessageProducer(
           Session session,
           Destination destination)
           throws JMSException {
  -        	
  +
           MessageProducer answer = null;
           if (isTopic(session)) {
               TopicSession topicSession = (TopicSession) session;
  @@ -963,14 +989,14 @@
               QueueSession queueSession = (QueueSession) session;
               answer = queueSession.createSender((Queue) destination);
           }
  -        
  +
           // configure the MessageProducer
           if (deliveryMode != Message.DEFAULT_DELIVERY_MODE) {
  -        	answer.setDeliveryMode(deliveryMode);
  +            answer.setDeliveryMode(deliveryMode);
           }
           return answer;
       }
  -    
  +
       /**
        * @return the MessageConsumer for this threads temporary destination
        * which is cached for the duration of this process.
  @@ -978,16 +1004,16 @@
       protected MessageConsumer getReplyToConsumer() throws JMSException {
           MessengerSession messengerSession = getMessengerSession();
           MessageConsumer consumer = messengerSession.getReplyToConsumer();
  -        if ( consumer == null ) {
  -            consumer = createMessageConsumer(
  -                messengerSession.getSession(),
  -                messengerSession.getReplyToDestination()
  -            );
  +        if (consumer == null) {
  +            consumer =
  +                createMessageConsumer(
  +                    messengerSession.getSession(),
  +                    messengerSession.getReplyToDestination());
               messengerSession.setReplyToConsumer(consumer);
           }
           return consumer;
       }
  -    
  +
       /**
        * Clears the temporary destination used to receive reply-to messages
        * which will lazily force a new destination and consumer to be created next
  @@ -995,65 +1021,76 @@
        */
       protected void clearReplyToDestination() throws JMSException {
           MessengerSession messengerSession = getMessengerSession();
  -        
  +
           messengerSession.setReplyToDestination(null);
           MessageConsumer consumer = messengerSession.getReplyToConsumer();
  -        if ( consumer != null ) {
  +        if (consumer != null) {
               messengerSession.setReplyToConsumer(null);
  -            
  +
               // ensure that everything is nullified first before we close
               // just in case an exception occurs
               consumer.close();
           }
       }
   
  -    
       /** @return a MessageConsumer for the given session and destination */
       protected MessageConsumer borrowMessageConsumer(
           Session session,
           Destination destination)
           throws JMSException {
  -            
  +
           MessageConsumer consumer = createMessageConsumer(session, destination);
   
           if (log.isDebugEnabled()) {
  -            log.debug( "Created new consumer: " + consumer + " on destination: " + destination );
  +            log.debug(
  +                "Created new consumer: "
  +                    + consumer
  +                    + " on destination: "
  +                    + destination);
           }
  -        
  -        return consumer;        
  +
  +        return consumer;
       }
  -    
  +
       /** @return a MessageConsumer for the given session, destination and selector */
       protected MessageConsumer borrowMessageConsumer(
           Session session,
           Destination destination,
           String selector)
           throws JMSException {
  -            
  -        MessageConsumer consumer = createMessageConsumer(session, destination, selector);
  -        
  +
  +        MessageConsumer consumer =
  +            createMessageConsumer(session, destination, selector);
  +
           if (log.isDebugEnabled()) {
  -            log.debug( "Created new consumer: " + consumer + " on destination: " + destination + " selector: " +  selector );
  +            log.debug(
  +                "Created new consumer: "
  +                    + consumer
  +                    + " on destination: "
  +                    + destination
  +                    + " selector: "
  +                    + selector);
           }
  -        
  -        return consumer;        
  +
  +        return consumer;
       }
  -    
  +
       /** 
        * Returns a message consumer back to the pool. 
        * By default this method will close message consumers though we should
        * be able to cache then
        */
  -    protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException {
  +    protected void returnMessageConsumer(MessageConsumer messageConsumer)
  +        throws JMSException {
           if (log.isDebugEnabled()) {
  -            log.debug( "Closing consumer: " + messageConsumer );
  +            log.debug("Closing consumer: " + messageConsumer);
           }
  -        
  -        if ( messageConsumer != null ) {
  +
  +        if (messageConsumer != null) {
               messageConsumer.close();
           }
       }
  -    
  +
       /** @return a new MessageConsumer for the given session and destination */
       protected MessageConsumer createMessageConsumer(
           Session session,
  @@ -1064,10 +1101,15 @@
               if (isDurable()) {
                   return topicSession.createDurableSubscriber(
                       (Topic) destination,
  -                    getDurableName());
  +                    getDurableName(),
  +                    null,
  +                    isNoLocal());
               }
               else {
  -                return topicSession.createSubscriber((Topic) destination);
  +                return topicSession.createSubscriber(
  +                    (Topic) destination,
  +                    null,
  +                    isNoLocal());
               }
           }
           else {
  @@ -1075,8 +1117,7 @@
               return queueSession.createReceiver((Queue) destination);
           }
       }
  -    
  -    
  +
       /** @return a new MessageConsumer for the given session, destination and selector */
       protected MessageConsumer createMessageConsumer(
           Session session,
  @@ -1104,7 +1145,7 @@
               return queueSession.createReceiver((Queue) destination, selector);
           }
       }
  -    
  +
       /** @return a new QueueBrowser for the given session and destination */
       protected QueueBrowser createBrowser(
           Session session,
  @@ -1112,25 +1153,25 @@
           throws JMSException {
           if (isTopic(session)) {
               return null;
  -        } 
  +        }
           else {
               QueueSession queueSession = (QueueSession) session;
               return queueSession.createBrowser((Queue) destination);
           }
       }
  -    
  +
       protected Queue getQueue(QueueSession session, String subject)
           throws JMSException {
           // XXXX: might want to cache
           return session.createQueue(subject);
       }
  -    
  +
       protected Topic getTopic(TopicSession session, String subject)
           throws JMSException {
           // XXXX: might want to cache
           return session.createTopic(subject);
       }
  -    
  +
       protected Destination getReplyToDestination() throws JMSException {
           return getMessengerSession().getReplyToDestination();
       }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org