You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/03 16:52:44 UTC

svn commit: r562489 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms: MessageActor.java SessionImpl.java

Author: arnaudsimon
Date: Fri Aug  3 07:52:43 2007
New Revision: 562489

URL: http://svn.apache.org/viewvc?view=rev&rev=562489
Log:
implemented message dispatching thread 

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562489&r1=562488&r2=562489
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java Fri Aug  3 07:52:43 2007
@@ -37,17 +37,23 @@
     /**
      * Indicates whether this MessageActor is closed.
      */
-    boolean _isClosed = false;
+    private boolean _isClosed = false;
 
     /**
      * This messageActor's session
      */
-    SessionImpl _session;
+    private SessionImpl _session;
 
     /**
      * The JMS destination this actor is set for.
      */
-    DestinationImpl _destination;
+    private DestinationImpl _destination;
+
+
+    /**
+     * The ID of this actor for the session.
+     */
+    private String _messageActorID;
 
     //-- Constructor
 
@@ -140,5 +146,16 @@
     {
         return _session;
     }
+
+    /**
+     * Get the ID of this actor within its session.
+     *  
+     * @return This actor ID.
+     */
+    protected String getMessageActorID()
+    {
+        return _messageActorID;
+    }
+
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562489&r1=562488&r2=562489
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java Fri Aug  3 07:52:43 2007
@@ -28,9 +28,9 @@
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Vector;
 import java.util.LinkedList;
+import java.util.HashMap;
 
 /**
  * Implementation of the JMS Session interface
@@ -54,7 +54,7 @@
     private boolean _hasStopped = false;
 
     /**
-     * lock for the sessionThread to wiat on when the session is stopped
+     * lock for the sessionThread to wait until the session is stopped
      */
     private Object _stoppingLock = new Object();
 
@@ -63,11 +63,16 @@
      */
     private Object _stoppingJoin = new Object();
 
+    /**
+     * thread to dispatch messages to async consumers
+     */
+    private MessageDispatcherThread _messageDispatcherThread = null;
+
 
     /**
      * The messageActors of this session.
      */
-    private ArrayList<MessageActor> _messageActors = new ArrayList<MessageActor>();
+    private HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>();
 
     /**
      * All the not yet acknoledged messages
@@ -151,6 +156,10 @@
         {
             throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
+        // Create and start a MessageDispatcherThread
+        // This thread is dispatching messages to the async consumers
+        _messageDispatcherThread = new MessageDispatcherThread();
+        _messageDispatcherThread.start();
     }
 
     //--- javax.jms.Session API
@@ -362,10 +371,43 @@
     {
         if (!_isClosed)
         {
+            _messageDispatcherThread.interrupt();
+            if (!_isClosing)
+            {
+                _isClosing = true;
+                // if the session is stopped then restart it before notifying on the lock
+                // that will stop the sessionThread
+                if (_isStopped)
+                {
+                    start();
+                }
+
+                //stop the sessionThread
+                synchronized (_incomingAsynchronousMessages)
+                {
+                    _incomingAsynchronousMessages.notifyAll();
+                }
+
+                try
+                {
+                    _messageDispatcherThread.join();
+                    _messageDispatcherThread = null;
+                }
+                catch (InterruptedException ie)
+                {
+                    /* ignore */
+                }
+            }
             // from now all the session methods will throw a IllegalStateException
             _isClosed = true;
             // close all the actors
             closeAllActors();
+            _messageActors.clear();
+            synchronized (_incomingAsynchronousMessages)
+            {
+                _incomingAsynchronousMessages.clear();
+                _incomingAsynchronousMessages.notifyAll();
+            }
             // close the underlaying QpidSession
             try
             {
@@ -375,6 +417,7 @@
             {
                 throw ExceptionHelper.convertQpidExceptionToJMSException(e);
             }
+
         }
     }
 
@@ -466,7 +509,7 @@
         checkNotClosed();
         MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination);
         // register this actor with the session
-        _messageActors.add(producer);
+        _messageActors.put(producer.getMessageActorID(), producer);
         return producer;
     }
 
@@ -523,7 +566,7 @@
         checkDestination(destination);
         MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
         // register this actor with the session
-        _messageActors.add(consumer);
+        _messageActors.put(consumer.getMessageActorID(), consumer);
         return consumer;
     }
 
@@ -610,7 +653,7 @@
         checkNotClosed();
         checkDestination(topic);
         TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
-        _messageActors.add(subscriber);
+        _messageActors.put(subscriber.getMessageActorID(), subscriber);
         return subscriber;
     }
 
@@ -643,7 +686,7 @@
         checkDestination(queue);
         QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector);
         // register this actor with the session
-        _messageActors.add(browser);
+        _messageActors.put(browser.getMessageActorID(), browser);
         return browser;
     }
 
@@ -710,7 +753,18 @@
      */
     protected void start() throws JMSException
     {
-        // TODO: make sure that the correct options are used
+        if (_isStopped)
+        {
+            synchronized (_stoppingLock)
+            {
+                _isStopped = false;
+                _stoppingLock.notify();
+            }
+            synchronized (_stoppingJoin)
+            {
+                _hasStopped = false;
+            }
+        }
     }
 
     /**
@@ -720,7 +774,30 @@
      */
     protected void stop() throws JMSException
     {
-        // TODO: make sure that the correct options are used
+        if (!_isClosing && !_isStopped)
+        {
+            synchronized (_incomingAsynchronousMessages)
+            {
+                _isStopped = true;
+                // unlock the sessionThread that will then wait on _stoppingLock
+                _incomingAsynchronousMessages.notifyAll();
+            }
+            // wait for the sessionThread to stop processing messages
+            synchronized (_stoppingJoin)
+            {
+                while (!_hasStopped)
+                {
+                    try
+                    {
+                        _stoppingJoin.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        /* ignore */
+                    }
+                }
+            }
+        }
     }
 
     /**
@@ -847,7 +924,7 @@
      */
     private void closeAllActors() throws JMSException
     {
-        for (MessageActor messageActor : _messageActors)
+        for (MessageActor messageActor : _messageActors.values())
         {
             messageActor.closeMessageActor();
         }
@@ -861,8 +938,6 @@
      * This thread is responsible for removing messages from m_incomingMessages and
      * dispatching them to the appropriate MessageConsumer.
      * <p> Messages have to be dispatched serially.
-     *
-     * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous message consumer {0} from session {1} has thrown a RunTimeException "{2}".
      */
     private class MessageDispatcherThread extends Thread
     {
@@ -932,27 +1007,27 @@
                     }
                 }
 
-              /*  if (message != null)
+                if (message != null)
                 {
                     MessageConsumerImpl mc;
-                    synchronized (_actors)
+                    synchronized (_messageActors)
                     {
-                        mc = (MessageConsumerImpl) m_actors.get(actorMessage.consumerID);
+                        mc = null; // todo _messageActors.get(message.consumerID);
                     }
                     boolean consumed = false;
                     if (mc != null)
                     {
                         try
                         {
-                            consumed = mc.onMessage(actorMessage.genericMessage);
+                            // todo call onMessage
                         }
                         catch (RuntimeException t)
                         {
                             // the JMS specification tells us to flag that to the client!
-                            log.errorb(SessionThread.class.getName(), "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t);
+                            _logger.error("Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
                         }
                     }
-                } */
+                }
                 message = null;
             }
             while (!_isClosing);   // repeat as long as this session is not closing



Re: svn commit: r562489 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms: MessageActor.java SessionImpl.java

Posted by Rajith Attapattu <ra...@gmail.com>.
I figured that part the hard way

On 8/3/07, Rafael Schloming <ra...@redhat.com> wrote:
>
> This change broke the build. Changing the _destination field in
> MessageActor to private caused compilation errors in other classes that
> depend on package access to that field.
>
> I've changed it back to package access for the moment, but you may want
> to add a getter or alter the other classes if you intended to make it
> private.
>
> Please remember that maven doesn't understand java dependencies, it just
> rebuilds based on timestamp. That means when you change an interface or
> alter the non private signature of any class or method you MUST do an
> mvn clean; mvn install in order to be sure your change really does
> compile.
>
> --Rafael
>
> arnaudsimon@apache.org wrote:
> > Author: arnaudsimon
> > Date: Fri Aug  3 07:52:43 2007
> > New Revision: 562489
> >
> > URL: http://svn.apache.org/viewvc?view=rev&rev=562489
> > Log:
> > implemented message dispatching thread
> >
> > Modified:
> >
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> >
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> >
> > Modified:
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> > URL:
> http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562489&r1=562488&r2=562489
> >
> ==============================================================================
> > ---
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> (original)
> > +++
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> Fri Aug  3 07:52:43 2007
> > @@ -37,17 +37,23 @@
> >      /**
> >       * Indicates whether this MessageActor is closed.
> >       */
> > -    boolean _isClosed = false;
> > +    private boolean _isClosed = false;
> >
> >      /**
> >       * This messageActor's session
> >       */
> > -    SessionImpl _session;
> > +    private SessionImpl _session;
> >
> >      /**
> >       * The JMS destination this actor is set for.
> >       */
> > -    DestinationImpl _destination;
> > +    private DestinationImpl _destination;
> > +
> > +
> > +    /**
> > +     * The ID of this actor for the session.
> > +     */
> > +    private String _messageActorID;
> >
> >      //-- Constructor
> >
> > @@ -140,5 +146,16 @@
> >      {
> >          return _session;
> >      }
> > +
> > +    /**
> > +     * Get the ID of this actor within its session.
> > +     *
> > +     * @return This actor ID.
> > +     */
> > +    protected String getMessageActorID()
> > +    {
> > +        return _messageActorID;
> > +    }
> > +
> >
> >  }
> >
> > Modified:
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> > URL:
> http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562489&r1=562488&r2=562489
> >
> ==============================================================================
> > ---
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> (original)
> > +++
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> Fri Aug  3 07:52:43 2007
> > @@ -28,9 +28,9 @@
> >  import javax.jms.Message;
> >  import javax.jms.MessageListener;
> >  import java.io.Serializable;
> > -import java.util.ArrayList;
> >  import java.util.Vector;
> >  import java.util.LinkedList;
> > +import java.util.HashMap;
> >
> >  /**
> >   * Implementation of the JMS Session interface
> > @@ -54,7 +54,7 @@
> >      private boolean _hasStopped = false;
> >
> >      /**
> > -     * lock for the sessionThread to wiat on when the session is
> stopped
> > +     * lock for the sessionThread to wait until the session is stopped
> >       */
> >      private Object _stoppingLock = new Object();
> >
> > @@ -63,11 +63,16 @@
> >       */
> >      private Object _stoppingJoin = new Object();
> >
> > +    /**
> > +     * thread to dispatch messages to async consumers
> > +     */
> > +    private MessageDispatcherThread _messageDispatcherThread = null;
> > +
> >
> >      /**
> >       * The messageActors of this session.
> >       */
> > -    private ArrayList<MessageActor> _messageActors = new
> ArrayList<MessageActor>();
> > +    private HashMap<String, MessageActor> _messageActors = new
> HashMap<String, MessageActor>();
> >
> >      /**
> >       * All the not yet acknoledged messages
> > @@ -151,6 +156,10 @@
> >          {
> >              throw ExceptionHelper.convertQpidExceptionToJMSException
> (e);
> >          }
> > +        // Create and start a MessageDispatcherThread
> > +        // This thread is dispatching messages to the async consumers
> > +        _messageDispatcherThread = new MessageDispatcherThread();
> > +        _messageDispatcherThread.start();
> >      }
> >
> >      //--- javax.jms.Session API
> > @@ -362,10 +371,43 @@
> >      {
> >          if (!_isClosed)
> >          {
> > +            _messageDispatcherThread.interrupt();
> > +            if (!_isClosing)
> > +            {
> > +                _isClosing = true;
> > +                // if the session is stopped then restart it before
> notifying on the lock
> > +                // that will stop the sessionThread
> > +                if (_isStopped)
> > +                {
> > +                    start();
> > +                }
> > +
> > +                //stop the sessionThread
> > +                synchronized (_incomingAsynchronousMessages)
> > +                {
> > +                    _incomingAsynchronousMessages.notifyAll();
> > +                }
> > +
> > +                try
> > +                {
> > +                    _messageDispatcherThread.join();
> > +                    _messageDispatcherThread = null;
> > +                }
> > +                catch (InterruptedException ie)
> > +                {
> > +                    /* ignore */
> > +                }
> > +            }
> >              // from now all the session methods will throw a
> IllegalStateException
> >              _isClosed = true;
> >              // close all the actors
> >              closeAllActors();
> > +            _messageActors.clear();
> > +            synchronized (_incomingAsynchronousMessages)
> > +            {
> > +                _incomingAsynchronousMessages.clear();
> > +                _incomingAsynchronousMessages.notifyAll();
> > +            }
> >              // close the underlaying QpidSession
> >              try
> >              {
> > @@ -375,6 +417,7 @@
> >              {
> >                  throw
> ExceptionHelper.convertQpidExceptionToJMSException(e);
> >              }
> > +
> >          }
> >      }
> >
> > @@ -466,7 +509,7 @@
> >          checkNotClosed();
> >          MessageProducerImpl producer = new MessageProducerImpl(this,
> (DestinationImpl) destination);
> >          // register this actor with the session
> > -        _messageActors.add(producer);
> > +        _messageActors.put(producer.getMessageActorID(), producer);
> >          return producer;
> >      }
> >
> > @@ -523,7 +566,7 @@
> >          checkDestination(destination);
> >          MessageConsumerImpl consumer = new MessageConsumerImpl(this,
> (DestinationImpl) destination, messageSelector, noLocal, null);
> >          // register this actor with the session
> > -        _messageActors.add(consumer);
> > +        _messageActors.put(consumer.getMessageActorID(), consumer);
> >          return consumer;
> >      }
> >
> > @@ -610,7 +653,7 @@
> >          checkNotClosed();
> >          checkDestination(topic);
> >          TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this,
> topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
> > -        _messageActors.add(subscriber);
> > +        _messageActors.put(subscriber.getMessageActorID(), subscriber);
> >          return subscriber;
> >      }
> >
> > @@ -643,7 +686,7 @@
> >          checkDestination(queue);
> >          QueueBrowserImpl browser = new QueueBrowserImpl(this, queue,
> messageSelector);
> >          // register this actor with the session
> > -        _messageActors.add(browser);
> > +        _messageActors.put(browser.getMessageActorID(), browser);
> >          return browser;
> >      }
> >
> > @@ -710,7 +753,18 @@
> >       */
> >      protected void start() throws JMSException
> >      {
> > -        // TODO: make sure that the correct options are used
> > +        if (_isStopped)
> > +        {
> > +            synchronized (_stoppingLock)
> > +            {
> > +                _isStopped = false;
> > +                _stoppingLock.notify();
> > +            }
> > +            synchronized (_stoppingJoin)
> > +            {
> > +                _hasStopped = false;
> > +            }
> > +        }
> >      }
> >
> >      /**
> > @@ -720,7 +774,30 @@
> >       */
> >      protected void stop() throws JMSException
> >      {
> > -        // TODO: make sure that the correct options are used
> > +        if (!_isClosing && !_isStopped)
> > +        {
> > +            synchronized (_incomingAsynchronousMessages)
> > +            {
> > +                _isStopped = true;
> > +                // unlock the sessionThread that will then wait on
> _stoppingLock
> > +                _incomingAsynchronousMessages.notifyAll();
> > +            }
> > +            // wait for the sessionThread to stop processing messages
> > +            synchronized (_stoppingJoin)
> > +            {
> > +                while (!_hasStopped)
> > +                {
> > +                    try
> > +                    {
> > +                        _stoppingJoin.wait();
> > +                    }
> > +                    catch (InterruptedException e)
> > +                    {
> > +                        /* ignore */
> > +                    }
> > +                }
> > +            }
> > +        }
> >      }
> >
> >      /**
> > @@ -847,7 +924,7 @@
> >       */
> >      private void closeAllActors() throws JMSException
> >      {
> > -        for (MessageActor messageActor : _messageActors)
> > +        for (MessageActor messageActor : _messageActors.values())
> >          {
> >              messageActor.closeMessageActor();
> >          }
> > @@ -861,8 +938,6 @@
> >       * This thread is responsible for removing messages from
> m_incomingMessages and
> >       * dispatching them to the appropriate MessageConsumer.
> >       * <p> Messages have to be dispatched serially.
> > -     *
> > -     * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous
> message consumer {0} from session {1} has thrown a RunTimeException "{2}".
> >       */
> >      private class MessageDispatcherThread extends Thread
> >      {
> > @@ -932,27 +1007,27 @@
> >                      }
> >                  }
> >
> > -              /*  if (message != null)
> > +                if (message != null)
> >                  {
> >                      MessageConsumerImpl mc;
> > -                    synchronized (_actors)
> > +                    synchronized (_messageActors)
> >                      {
> > -                        mc = (MessageConsumerImpl) m_actors.get(
> actorMessage.consumerID);
> > +                        mc = null; // todo _messageActors.get(
> message.consumerID);
> >                      }
> >                      boolean consumed = false;
> >                      if (mc != null)
> >                      {
> >                          try
> >                          {
> > -                            consumed = mc.onMessage(
> actorMessage.genericMessage);
> > +                            // todo call onMessage
> >                          }
> >                          catch (RuntimeException t)
> >                          {
> >                              // the JMS specification tells us to flag
> that to the client!
> > -                            log.errorb(SessionThread.class.getName(),
> "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t);
> > +                            _logger.error("Warning! Asynchronous
> message consumer" + mc + " from session " + this + " has thrown a
> RunTimeException " + t);
> >                          }
> >                      }
> > -                } */
> > +                }
> >                  message = null;
> >              }
> >              while (!_isClosing);   // repeat as long as this session is
> not closing
> >
> >
>

Re: svn commit: r562489 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms: MessageActor.java SessionImpl.java

Posted by Rafael Schloming <ra...@redhat.com>.
This change broke the build. Changing the _destination field in 
MessageActor to private caused compilation errors in other classes that 
depend on package access to that field.

I've changed it back to package access for the moment, but you may want 
to add a getter or alter the other classes if you intended to make it 
private.

Please remember that maven doesn't understand java dependencies, it just 
rebuilds based on timestamp. That means when you change an interface or 
alter the non private signature of any class or method you MUST do an 
mvn clean; mvn install in order to be sure your change really does compile.

--Rafael

arnaudsimon@apache.org wrote:
> Author: arnaudsimon
> Date: Fri Aug  3 07:52:43 2007
> New Revision: 562489
> 
> URL: http://svn.apache.org/viewvc?view=rev&rev=562489
> Log:
> implemented message dispatching thread 
> 
> Modified:
>     incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
>     incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> 
> Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562489&r1=562488&r2=562489
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java (original)
> +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java Fri Aug  3 07:52:43 2007
> @@ -37,17 +37,23 @@
>      /**
>       * Indicates whether this MessageActor is closed.
>       */
> -    boolean _isClosed = false;
> +    private boolean _isClosed = false;
>  
>      /**
>       * This messageActor's session
>       */
> -    SessionImpl _session;
> +    private SessionImpl _session;
>  
>      /**
>       * The JMS destination this actor is set for.
>       */
> -    DestinationImpl _destination;
> +    private DestinationImpl _destination;
> +
> +
> +    /**
> +     * The ID of this actor for the session.
> +     */
> +    private String _messageActorID;
>  
>      //-- Constructor
>  
> @@ -140,5 +146,16 @@
>      {
>          return _session;
>      }
> +
> +    /**
> +     * Get the ID of this actor within its session.
> +     *  
> +     * @return This actor ID.
> +     */
> +    protected String getMessageActorID()
> +    {
> +        return _messageActorID;
> +    }
> +
>  
>  }
> 
> Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562489&r1=562488&r2=562489
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java (original)
> +++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java Fri Aug  3 07:52:43 2007
> @@ -28,9 +28,9 @@
>  import javax.jms.Message;
>  import javax.jms.MessageListener;
>  import java.io.Serializable;
> -import java.util.ArrayList;
>  import java.util.Vector;
>  import java.util.LinkedList;
> +import java.util.HashMap;
>  
>  /**
>   * Implementation of the JMS Session interface
> @@ -54,7 +54,7 @@
>      private boolean _hasStopped = false;
>  
>      /**
> -     * lock for the sessionThread to wiat on when the session is stopped
> +     * lock for the sessionThread to wait until the session is stopped
>       */
>      private Object _stoppingLock = new Object();
>  
> @@ -63,11 +63,16 @@
>       */
>      private Object _stoppingJoin = new Object();
>  
> +    /**
> +     * thread to dispatch messages to async consumers
> +     */
> +    private MessageDispatcherThread _messageDispatcherThread = null;
> +
>  
>      /**
>       * The messageActors of this session.
>       */
> -    private ArrayList<MessageActor> _messageActors = new ArrayList<MessageActor>();
> +    private HashMap<String, MessageActor> _messageActors = new HashMap<String, MessageActor>();
>  
>      /**
>       * All the not yet acknoledged messages
> @@ -151,6 +156,10 @@
>          {
>              throw ExceptionHelper.convertQpidExceptionToJMSException(e);
>          }
> +        // Create and start a MessageDispatcherThread
> +        // This thread is dispatching messages to the async consumers
> +        _messageDispatcherThread = new MessageDispatcherThread();
> +        _messageDispatcherThread.start();
>      }
>  
>      //--- javax.jms.Session API
> @@ -362,10 +371,43 @@
>      {
>          if (!_isClosed)
>          {
> +            _messageDispatcherThread.interrupt();
> +            if (!_isClosing)
> +            {
> +                _isClosing = true;
> +                // if the session is stopped then restart it before notifying on the lock
> +                // that will stop the sessionThread
> +                if (_isStopped)
> +                {
> +                    start();
> +                }
> +
> +                //stop the sessionThread
> +                synchronized (_incomingAsynchronousMessages)
> +                {
> +                    _incomingAsynchronousMessages.notifyAll();
> +                }
> +
> +                try
> +                {
> +                    _messageDispatcherThread.join();
> +                    _messageDispatcherThread = null;
> +                }
> +                catch (InterruptedException ie)
> +                {
> +                    /* ignore */
> +                }
> +            }
>              // from now all the session methods will throw a IllegalStateException
>              _isClosed = true;
>              // close all the actors
>              closeAllActors();
> +            _messageActors.clear();
> +            synchronized (_incomingAsynchronousMessages)
> +            {
> +                _incomingAsynchronousMessages.clear();
> +                _incomingAsynchronousMessages.notifyAll();
> +            }
>              // close the underlaying QpidSession
>              try
>              {
> @@ -375,6 +417,7 @@
>              {
>                  throw ExceptionHelper.convertQpidExceptionToJMSException(e);
>              }
> +
>          }
>      }
>  
> @@ -466,7 +509,7 @@
>          checkNotClosed();
>          MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination);
>          // register this actor with the session
> -        _messageActors.add(producer);
> +        _messageActors.put(producer.getMessageActorID(), producer);
>          return producer;
>      }
>  
> @@ -523,7 +566,7 @@
>          checkDestination(destination);
>          MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
>          // register this actor with the session
> -        _messageActors.add(consumer);
> +        _messageActors.put(consumer.getMessageActorID(), consumer);
>          return consumer;
>      }
>  
> @@ -610,7 +653,7 @@
>          checkNotClosed();
>          checkDestination(topic);
>          TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
> -        _messageActors.add(subscriber);
> +        _messageActors.put(subscriber.getMessageActorID(), subscriber);
>          return subscriber;
>      }
>  
> @@ -643,7 +686,7 @@
>          checkDestination(queue);
>          QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector);
>          // register this actor with the session
> -        _messageActors.add(browser);
> +        _messageActors.put(browser.getMessageActorID(), browser);
>          return browser;
>      }
>  
> @@ -710,7 +753,18 @@
>       */
>      protected void start() throws JMSException
>      {
> -        // TODO: make sure that the correct options are used
> +        if (_isStopped)
> +        {
> +            synchronized (_stoppingLock)
> +            {
> +                _isStopped = false;
> +                _stoppingLock.notify();
> +            }
> +            synchronized (_stoppingJoin)
> +            {
> +                _hasStopped = false;
> +            }
> +        }
>      }
>  
>      /**
> @@ -720,7 +774,30 @@
>       */
>      protected void stop() throws JMSException
>      {
> -        // TODO: make sure that the correct options are used
> +        if (!_isClosing && !_isStopped)
> +        {
> +            synchronized (_incomingAsynchronousMessages)
> +            {
> +                _isStopped = true;
> +                // unlock the sessionThread that will then wait on _stoppingLock
> +                _incomingAsynchronousMessages.notifyAll();
> +            }
> +            // wait for the sessionThread to stop processing messages
> +            synchronized (_stoppingJoin)
> +            {
> +                while (!_hasStopped)
> +                {
> +                    try
> +                    {
> +                        _stoppingJoin.wait();
> +                    }
> +                    catch (InterruptedException e)
> +                    {
> +                        /* ignore */
> +                    }
> +                }
> +            }
> +        }
>      }
>  
>      /**
> @@ -847,7 +924,7 @@
>       */
>      private void closeAllActors() throws JMSException
>      {
> -        for (MessageActor messageActor : _messageActors)
> +        for (MessageActor messageActor : _messageActors.values())
>          {
>              messageActor.closeMessageActor();
>          }
> @@ -861,8 +938,6 @@
>       * This thread is responsible for removing messages from m_incomingMessages and
>       * dispatching them to the appropriate MessageConsumer.
>       * <p> Messages have to be dispatched serially.
> -     *
> -     * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous message consumer {0} from session {1} has thrown a RunTimeException "{2}".
>       */
>      private class MessageDispatcherThread extends Thread
>      {
> @@ -932,27 +1007,27 @@
>                      }
>                  }
>  
> -              /*  if (message != null)
> +                if (message != null)
>                  {
>                      MessageConsumerImpl mc;
> -                    synchronized (_actors)
> +                    synchronized (_messageActors)
>                      {
> -                        mc = (MessageConsumerImpl) m_actors.get(actorMessage.consumerID);
> +                        mc = null; // todo _messageActors.get(message.consumerID);
>                      }
>                      boolean consumed = false;
>                      if (mc != null)
>                      {
>                          try
>                          {
> -                            consumed = mc.onMessage(actorMessage.genericMessage);
> +                            // todo call onMessage
>                          }
>                          catch (RuntimeException t)
>                          {
>                              // the JMS specification tells us to flag that to the client!
> -                            log.errorb(SessionThread.class.getName(), "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t);
> +                            _logger.error("Warning! Asynchronous message consumer" + mc + " from session " + this + " has thrown a RunTimeException " + t);
>                          }
>                      }
> -                } */
> +                }
>                  message = null;
>              }
>              while (!_isClosing);   // repeat as long as this session is not closing
> 
>