You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/23 23:32:52 UTC

svn commit: r499165 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession.java BasicMessageConsumer.java

Author: ritchiem
Date: Tue Jan 23 14:32:51 2007
New Revision: 499165

URL: http://svn.apache.org/viewvc?view=rev&rev=499165
Log:
QPID-103 Implemented support for MessageListener in AMQSession.
Required configuring an Asynchronous performance test.

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=499165&r1=499164&r2=499165
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jan 23 14:32:51 2007
@@ -67,6 +67,8 @@
     private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
     private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
+    private MessageListener _messageListener = null;
+
     /**
      * Used to reference durable subscribers so they requests for unsubscribe can be handled
      * correctly.  Note this only keeps a record of subscriptions which have been created
@@ -852,13 +854,37 @@
     public MessageListener getMessageListener() throws JMSException
     {
         checkNotClosed();
-        throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+        return _messageListener;
     }
 
     public void setMessageListener(MessageListener listener) throws JMSException
     {
         checkNotClosed();
-        throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+
+        if (!isStopped())
+        {
+            throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+        }
+
+        // We are stopped         
+        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+        {
+            BasicMessageConsumer consumer = i.next();
+
+            if (consumer.isReceiving())
+            {
+                throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+            }
+        }
+
+        _messageListener = listener;
+        
+        for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+        {
+            i.next().setMessageListener(_messageListener);
+        }
+
+
     }
 
     public void run()
@@ -1067,6 +1093,7 @@
     {
         checkTemporaryDestination(destination);
 
+
         return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
         {
             public Object operation() throws JMSException
@@ -1089,6 +1116,11 @@
                                                                          protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
                                                                          _acknowledgeMode, noConsume, autoClose);
 
+                if (_messageListener != null)
+                {
+                    consumer.setMessageListener(_messageListener);
+                }
+
                 try
                 {
                     registerConsumer(consumer, false);
@@ -1736,19 +1768,21 @@
      */
     void deregisterConsumer(BasicMessageConsumer consumer)
     {
-        _consumers.remove(consumer.getConsumerTag());
-        String subscriptionName = _reverseSubscriptionMap.remove(consumer);
-        if (subscriptionName != null)
+        if (_consumers.remove(consumer.getConsumerTag()) != null)
         {
-            _subscriptions.remove(subscriptionName);
-        }
+            String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+            if (subscriptionName != null)
+            {
+                _subscriptions.remove(subscriptionName);
+            }
 
-        Destination dest = consumer.getDestination();
-        synchronized (dest)
-        {
-            if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+            Destination dest = consumer.getDestination();
+            synchronized (dest)
             {
-                _destinationConsumerCount.remove(dest);
+                if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+                {
+                    _destinationConsumerCount.remove(dest);
+                }
             }
         }
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=499165&r1=499164&r2=499165
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jan 23 14:32:51 2007
@@ -221,7 +221,10 @@
         if (_session.isStopped())
         {
             _messageListener.set(messageListener);
-            _logger.debug("Session stopped : Message listener set for destination " + _destination);
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
+            }
         }
         else
         {
@@ -258,10 +261,10 @@
 
                             // Set Message Listener
                             _logger.debug("Set Message Listener");
-                            _messageListener.set(messageListener);                            
+                            _messageListener.set(messageListener);
                         }
                     }
-                    );                    
+                    );
                 }
             }
         }
@@ -328,6 +331,11 @@
     public boolean isExclusive()
     {
         return _exclusive;
+    }
+
+    public boolean isReceiving()
+    {
+        return _receiving.get();
     }
 
     public Message receive() throws JMSException