You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@commons.apache.org by js...@apache.org on 2002/02/01 15:32:33 UTC

cvs commit: jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger InitMessengerServlet.java MessengerManager.java MessengerSupport.java

jstrachan    02/02/01 06:32:33

  Modified:    messenger/src/java/org/apache/commons/messenger
                        MessengerManager.java MessengerSupport.java
  Added:       messenger/src/java/org/apache/commons/messenger
                        InitMessengerServlet.java
  Log:
  Applied William's patches to close producers after using them.
  
  Revision  Changes    Path
  1.5       +3 -11     jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerManager.java
  
  Index: MessengerManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerManager.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- MessengerManager.java	12 Oct 2001 16:37:09 -0000	1.4
  +++ MessengerManager.java	1 Feb 2002 14:32:33 -0000	1.5
  @@ -5,7 +5,7 @@
    * version 1.1, a copy of which has been included with this distribution in
    * the LICENSE file.
    * 
  - * $Id: MessengerManager.java,v 1.4 2001/10/12 16:37:09 jstrachan Exp $
  + * $Id: MessengerManager.java,v 1.5 2002/02/01 14:32:33 jstrachan Exp $
    */
   package org.apache.commons.messenger;
   
  @@ -19,7 +19,7 @@
   /** <p><code>MessengerManager</code> is a manager of {@link Messenger} instances.</p>
     *
     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.4 $
  +  * @version $Revision: 1.5 $
     */
   public class MessengerManager {
       
  @@ -61,15 +61,7 @@
         * from a given XML deployment configuration document
         */
       public static void configure( String xmlURL ) throws JMSException {
  -        try {
  -            MessengerDigester digester = new MessengerDigester();
  -            setInstance( (MessengerManager) digester.parse( xmlURL ) );
  -        }
  -        catch (Exception e) {
  -            JMSException newException = new JMSException( "Could not load the Messenger XML config file from: " + xmlURL );
  -            newException.setLinkedException(e);
  -            throw newException;
  -        }
  +        setInstance( load( xmlURL ) );
       }
       
       /** Returns the messenger for the given name */
  
  
  
  1.16      +98 -93    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
  
  Index: MessengerSupport.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -r1.15 -r1.16
  --- MessengerSupport.java	25 Oct 2001 21:07:12 -0000	1.15
  +++ MessengerSupport.java	1 Feb 2002 14:32:33 -0000	1.16
  @@ -4,8 +4,8 @@
    * This software is published under the terms of the Apache Software License
    * version 1.1, a copy of which has been included with this distribution in
    * the LICENSE file.
  - * 
  - * $Id: MessengerSupport.java,v 1.15 2001/10/25 21:07:12 jstrachan Exp $
  + *
  + * $Id: MessengerSupport.java,v 1.16 2002/02/01 14:32:33 jstrachan Exp $
    */
   package org.apache.commons.messenger;
   
  @@ -45,26 +45,26 @@
     * connection and session creation and the pooling strategy.</p>
     *
     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  -  * @version $Revision: 1.15 $
  +  * @version $Revision: 1.16 $
     */
   public abstract class MessengerSupport implements Messenger {
   
       private static final boolean CACHE_REQUESTOR = true;
  -    
  -    
  +
  +
       /** The name of the Messenger */
       private String name;
  -    
  +
       /** are topic subscribers durable? */
       private boolean durable;
  -    
  +
       /** the durable name used for durable topic based subscriptions */
       private String durableName;
  -    
  +
       /** whether local messages are ignored when topic based subscription is used
        * with a message selector */
       private boolean noLocal;
  -    
  +
       /** A Map of ListenerKey objects to MessageConsumer objects */
       private Map listeners = new HashMap();
   
  @@ -84,11 +84,11 @@
   
       /** The inbox which is used for the call() methods */
       private Destination replyToDestination;
  -    
  -    
  +
  +
       public MessengerSupport() {
       }
  -    
  +
       public String toString() {
           try {
               Session session = borrowSession();
  @@ -100,7 +100,7 @@
               return super.toString() + " session: " + e.toString();
           }
       }
  -    
  +
       public Destination getDestination(String subject) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -132,26 +132,28 @@
               returnSession( session );
           }
       }
  -    
  +
       public void send( Destination destination, Message message ) throws JMSException {
           Session session = borrowSession();
  +        MessageProducer producer = null;
           try {
  -            MessageProducer producer = getMessageProducer( session, destination );
  +            producer = getMessageProducer( session, destination );
               if ( producer instanceof TopicPublisher ) {
                   ((TopicPublisher) producer).publish( message );
  -            } 
  +            }
               else {
                   ((QueueSender) producer).send( message );
               }
           }
           finally {
  +            producer.close();
               returnSession( session );
           }
       }
   
       public Message call( Destination destination, Message message ) throws JMSException {
           Session session = borrowSession();
  -        try {            
  +        try {
               if ( session instanceof TopicSession ) {
                   TopicRequestor requestor = getTopicRequestor( (TopicSession) session, (Topic) destination );
                   return requestor.request( message );
  @@ -166,16 +168,16 @@
           }
       }
   
  -/*    
  +/*
       public Message call( Destination destination, Message message ) throws JMSException {
           Session session = borrowSession();
           try {
               Destination replyTo = getReplyToDestination();
               message.setJMSReplyTo(replyTo);
  -            
  +
               MessageProducer producer = getMessageProducer( session, destination );
               MessageConsumer consumer = getMessageConsumer( session, replyTo );
  -            
  +
               if ( session instanceof TopicSession ) {
                   ((TopicPublisher) producer).publish( message );
               }
  @@ -189,16 +191,18 @@
           }
       }
   */
  -    
  +
       public Message call( Destination destination, Message message, long timeoutMillis ) throws JMSException {
           Session session = borrowSession();
  +        MessageProducer producer = null;
  +        MessageConsumer consumer = null;
           try {
               Destination replyTo = getReplyToDestination();
               message.setJMSReplyTo(replyTo);
  -            
  -            MessageProducer producer = getMessageProducer( session, destination );
  -            MessageConsumer consumer = getMessageConsumer( session, replyTo );
  -            
  +
  +            producer = getMessageProducer( session, destination );
  +            consumer = getMessageConsumer( session, replyTo );
  +
               if ( session instanceof TopicSession ) {
                   ((TopicPublisher) producer).publish( message );
               }
  @@ -208,10 +212,11 @@
               return consumer.receive(timeoutMillis);
           }
           finally {
  +            producer.close();
               returnSession( session );
           }
       }
  -    
  +
       public Message receive(Destination destination) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -222,7 +227,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public Message receive(Destination destination, String selector) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -233,7 +238,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public Message receive(Destination destination, long timeoutMillis) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -244,7 +249,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -255,7 +260,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public Message receiveNoWait(Destination destination) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -287,7 +292,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -297,7 +302,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public void run() {
           // don't return sessions which throw an exception
           try {
  @@ -309,11 +314,11 @@
               // ### ignore
           }
       }
  -    
  +
       public ConnectionConsumer createConnectionConsumer(Destination destination, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
           return createConnectionConsumer(destination, null, sessionPool, maxMessages);
       }
  -    
  +
       public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
           Connection connection = getConnection();
           if ( connection instanceof TopicConnection ) {
  @@ -332,10 +337,10 @@
       }
   
       public abstract Connection getConnection() throws JMSException;
  -    
  +
       // Listener API
  -    //-------------------------------------------------------------------------    
  -    
  +    //-------------------------------------------------------------------------
  +
       public void addListener(Destination destination, MessageListener listener) throws JMSException {
           if ( listener instanceof MessengerListener ) {
               MessengerListener messengerListener = (MessengerListener) listener;
  @@ -345,7 +350,7 @@
           try {
               MessageConsumer consumer = createMessageConsumer( session, destination );
               consumer.setMessageListener( listener );
  -            
  +
               ListenerKey key = new ListenerKey( destination, listener );
               listeners.put( key, consumer );
           }
  @@ -353,7 +358,7 @@
               returnListenerSession( session );
           }
       }
  -    
  +
       public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
           if ( listener instanceof MessengerListener ) {
               MessengerListener messengerListener = (MessengerListener) listener;
  @@ -363,7 +368,7 @@
           try {
               MessageConsumer consumer = createMessageConsumer( session, destination, selector );
               consumer.setMessageListener( listener );
  -            
  +
               ListenerKey key = new ListenerKey( destination, listener, selector );
               listeners.put( key, consumer );
           }
  @@ -371,7 +376,7 @@
               returnListenerSession( session );
           }
       }
  -    
  +
   
       public void removeListener(Destination destination, MessageListener listener ) throws JMSException {
           ListenerKey key = new ListenerKey( destination, listener );
  @@ -391,10 +396,10 @@
           consumer.close();
       }
   
  -    
  +
       // Message factory methods
  -    //-------------------------------------------------------------------------    
  -    
  +    //-------------------------------------------------------------------------
  +
       public BytesMessage createBytesMessage() throws JMSException {
           Session session = borrowSession();
           try {
  @@ -404,7 +409,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public MapMessage createMapMessage() throws JMSException {
           Session session = borrowSession();
           try {
  @@ -414,8 +419,8 @@
               returnSession( session );
           }
       }
  -    
  -    public Message createMessage() throws JMSException {    
  +
  +    public Message createMessage() throws JMSException {
           Session session = borrowSession();
           try {
               return session.createMessage();
  @@ -424,7 +429,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public ObjectMessage createObjectMessage() throws JMSException {
           Session session = borrowSession();
           try {
  @@ -434,7 +439,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -444,7 +449,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public StreamMessage createStreamMessage() throws JMSException {
           Session session = borrowSession();
           try {
  @@ -454,7 +459,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public TextMessage createTextMessage() throws JMSException {
           Session session = borrowSession();
           try {
  @@ -464,7 +469,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public TextMessage createTextMessage(String text) throws JMSException {
           Session session = borrowSession();
           try {
  @@ -474,7 +479,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public void commit() throws JMSException {
           Session session = borrowSession();
           try {
  @@ -484,7 +489,7 @@
               returnSession( session );
           }
       }
  -    
  +
       public void rollback() throws JMSException {
           Session session = borrowSession();
           try {
  @@ -494,39 +499,39 @@
               returnSession( session );
           }
       }
  -    
  +
       public void close() throws JMSException {
           getConnection().close();
       }
  -    
  +
       // Properties
  -    //-------------------------------------------------------------------------    
  +    //-------------------------------------------------------------------------
   
       /** Gets the name that this Messenger is called in a MessengerManager */
       public String getName() {
           return name;
       }
  -    
  +
       /** Sets the name that this Messenger is called in a MessengerManager */
       public void setName(String name) {
           this.name = name;
       }
  -    
  +
       /** Gets whether topic subscribers are durable or not */
       public boolean isDurable() {
           return noLocal;
       }
  -    
  +
       /** Sets whether topic subscribers are durable or not */
       public void setDurable(boolean durable) {
           this.durable = durable;
       }
  -    
  +
       /** Returns the durable name used for durable topic based subscriptions */
       public String getDurableName() {
           return durableName;
       }
  -    
  +
       /** Sets the durable name used for durable topic based subscriptions */
       public void setDurableName(String durableName) {
           this.durableName = durableName;
  @@ -537,20 +542,20 @@
       public boolean isNoLocal() {
           return noLocal;
       }
  -    
  +
       /** Sets whether local messages are ignored when topic based subscription is used
        * with a message selector */
       public void setNoLocal(boolean noLocal) {
           this.noLocal = noLocal;
       }
  -    
  +
       // Implementation methods
  -    //-------------------------------------------------------------------------    
  +    //-------------------------------------------------------------------------
  +
   
  -    
       /** Borrows a session instance from the pool */
       protected abstract Session borrowSession() throws JMSException;
  -    
  +
       /** Returns a session instance back to the pool */
       protected abstract void returnSession(Session session) throws JMSException;
   
  @@ -559,14 +564,14 @@
   
       /** Borrows a session instance from the pool */
       protected abstract Session borrowListenerSession() throws JMSException;
  -    
  +
       /** Returns a session instance back to the pool */
       protected abstract void returnListenerSession(Session session) throws JMSException;
   
       /** Returns a message producer for the given session and destination */
       protected MessageProducer getMessageProducer( Session session, Destination destination ) throws JMSException {
           return createMessageProducer( session, destination );
  -/**        
  +/**
           MessageProducer producer = (MessageProducer) producers.get( destination );
           if ( producer == null ) {
               producer = createMessageProducer( session, destination );
  @@ -574,7 +579,7 @@
           return producer;
   */
       }
  -    
  +
       /** Returns a newly created message producer for the given session and destination */
       protected MessageProducer createMessageProducer( Session session, Destination destination ) throws JMSException {
           if ( session instanceof TopicSession ) {
  @@ -590,7 +595,7 @@
       /** Returns a MessageConsumer for the given session and destination */
       protected MessageConsumer getMessageConsumer( Session session, Destination destination ) throws JMSException {
           return createMessageConsumer( session, destination );
  -/*        
  +/*
           MessageConsumer consumer = (MessageConsumer) consumers.get( destination );
           if ( consumer == null ) {
               consumer = createMessageConsumer( session, destination );
  @@ -598,25 +603,25 @@
           return consumer;
   */
       }
  -    
  +
       /** Returns a MessageConsumer for the given session, destination and selector */
       protected MessageConsumer getMessageConsumer( Session session, Destination destination, String selector ) throws JMSException {
           // XXXX: could do caching one day
           return createMessageConsumer( session, destination, selector );
       }
  -    
  +
       /** Returns a new MessageConsumer for the given session and destination */
       protected MessageConsumer createMessageConsumer( Session session, Destination destination ) throws JMSException {
           if ( session instanceof TopicSession ) {
               TopicSession topicSession = (TopicSession) session;
               if ( isDurable() ) {
  -                return topicSession.createDurableSubscriber( 
  -                    (Topic) destination, 
  -                    getDurableName() 
  +                return topicSession.createDurableSubscriber(
  +                    (Topic) destination,
  +                    getDurableName()
                   );
               }
               else {
  -                return topicSession.createSubscriber( 
  +                return topicSession.createSubscriber(
                       (Topic) destination
                   );
               }
  @@ -626,23 +631,23 @@
               return queueSession.createReceiver( (Queue) destination );
           }
       }
  -    
  +
       /** Returns a new MessageConsumer for the given session, destination and selector */
       protected MessageConsumer createMessageConsumer( Session session, Destination destination, String selector ) throws JMSException {
           if ( session instanceof TopicSession ) {
               TopicSession topicSession = (TopicSession) session;
               if ( isDurable() ) {
  -                return topicSession.createDurableSubscriber( 
  -                    (Topic) destination, 
  -                    getDurableName(), 
  -                    selector, 
  -                    isNoLocal() 
  +                return topicSession.createDurableSubscriber(
  +                    (Topic) destination,
  +                    getDurableName(),
  +                    selector,
  +                    isNoLocal()
                   );
               }
               else {
  -                return topicSession.createSubscriber( 
  -                    (Topic) destination, 
  -                    selector, 
  +                return topicSession.createSubscriber(
  +                    (Topic) destination,
  +                    selector,
                       isNoLocal()
                   );
               }
  @@ -650,29 +655,29 @@
           else {
               QueueSession queueSession = (QueueSession) session;
               return queueSession.createReceiver(
  -                (Queue) destination, 
  -                selector 
  +                (Queue) destination,
  +                selector
               );
           }
       }
  -    
  +
       protected Queue getQueue(QueueSession session, String subject) throws JMSException {
           // XXXX: might want to cache
           return session.createQueue( subject );
       }
  -    
  +
       protected Topic getTopic(TopicSession session, String subject) throws JMSException {
           // XXXX: might want to cache
           return session.createTopic( subject );
       }
  -    
  +
       protected Destination getReplyToDestination() throws JMSException {
           if ( replyToDestination == null ) {
               replyToDestination = createTemporaryDestination();
           }
           return replyToDestination;
       }
  -    
  +
       protected TopicRequestor getTopicRequestor( TopicSession session, Topic destination ) throws JMSException {
           if ( CACHE_REQUESTOR ) {
               Map requestors = (Map) requestorsMap.get();
  @@ -687,7 +692,7 @@
               return new TopicRequestor( session, destination );
           }
       }
  -    
  +
       protected QueueRequestor getQueueRequestor( QueueSession session, Queue destination ) throws JMSException {
           if ( CACHE_REQUESTOR ) {
               Map requestors = (Map) requestorsMap.get();
  
  
  
  1.1                  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/InitMessengerServlet.java
  
  Index: InitMessengerServlet.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   *
   * $Id: InitMessengerServlet.java,v 1.1 2002/02/01 14:32:33 jstrachan Exp $
   */
  package org.apache.commons.messenger;
  
  import java.net.URL;
  
  import javax.servlet.ServletException;
  import javax.servlet.http.HttpServlet;
  
  
  /** <p><code>InitMessengerServlet</code> is a simple servlet that
   * will initialize the MessengerManager from a URL specified in the
   * web.xml deployment descriptor.</p>
   *
   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
   * @version $Revision: 1.1 $
   */
  public class InitMessengerServlet extends HttpServlet {
  
      private static boolean initialized = false;
      
      public InitMessengerServlet() {
      }
      
      public void init() throws ServletException {
          if ( ! initialized ) {
              initialized = true;
              //System.out.println( "About to initialize MessengerManager" );
              getServletContext().log( "About to initialize MessengerManager" );
              
              String config = getRequiredInitParmeter( "config", "The URL of the Messenger XML deployment document" );
              try {            
                  URL url = getServletContext().getResource( config );
                  MessengerManager.configure( url.toString() );
              }
              catch (Exception e) {
                  throw new ServletException( "Failed to initialise MessengerManager from config: " + config + ". Reason : " + e, e );
              }
          }
      }
      
      // Implementation methods
      //-------------------------------------------------------------------------        
      protected String getRequiredInitParmeter(String key, String description) throws ServletException {
          String value = getInitParameter( key );
          if ( value == null || value.length() == 0 ) {
              throw new ServletException( 
                  "No initialization parameter for parameter: " + key 
                  + " description: " + description 
              );
          }
          return value;
      }
  }
  
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>