You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/03/14 12:36:50 UTC

svn commit: r637066 - in /incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue: AMQQueue.java Subscription.java SubscriptionImpl.java

Author: ritchiem
Date: Fri Mar 14 04:36:42 2008
New Revision: 637066

URL: http://svn.apache.org/viewvc?rev=637066&view=rev
Log:
QPID-852 : Updated broker so that it closes consumers when there are no messages on the queue.

Modified:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Mar 14 04:36:42 2008
@@ -700,6 +700,8 @@
         {
             _subscribers.setExclusive(true);
         }
+
+        subscription.start();
     }
 
     private boolean isExclusive()

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Mar 14 04:36:42 2008
@@ -45,8 +45,6 @@
 
     void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
 
-    boolean isAutoClose();
-
     void close();
 
     boolean isClosed();
@@ -60,4 +58,6 @@
     Object getSendLock();
 
     AMQChannel getChannel();
+
+    void start();
 }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Mar 14 04:36:42 2008
@@ -461,7 +461,7 @@
         }
     }
 
-    public boolean isAutoClose()
+    private boolean isAutoClose()
     {
         return _autoClose;
     }
@@ -523,19 +523,24 @@
         {
             _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
 
-            ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
-            converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
-            _sentClose = true;
-
-            //fixme JIRA do this better
+            boolean unregisteredOK = false;
             try
             {
-                channel.unsubscribeConsumer(protocolSession, consumerTag);
+                unregisteredOK = channel.unsubscribeConsumer(protocolSession, consumerTag);
             }
             catch (AMQException e)
             {
                 // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag.
+                _logger.info("Unable to UnsubscribeConsumer :" + consumerTag +" so not going to send CancelOK.");
             }
+
+            if (unregisteredOK)
+            {
+                ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
+                converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
+                _sentClose = true;
+            }
+
         }
     }
 
@@ -664,6 +669,21 @@
     public AMQChannel getChannel()
     {
         return channel;
+    }
+
+    public void start()
+    {
+        //Check to see if we need to autoclose
+        if (filtersMessages())
+        {
+            if (isAutoClose())
+            {
+                if (_messages.isEmpty())
+                {
+                    autoclose();
+                }
+            }
+        }
     }
 
 }