You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/03/14 12:36:50 UTC
svn commit: r637066 - in
/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue:
AMQQueue.java Subscription.java SubscriptionImpl.java
Author: ritchiem
Date: Fri Mar 14 04:36:42 2008
New Revision: 637066
URL: http://svn.apache.org/viewvc?rev=637066&view=rev
Log:
QPID-852 : Updated broker so that it closes consumers when there are no messages on the queue.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Mar 14 04:36:42 2008
@@ -700,6 +700,8 @@
{
_subscribers.setExclusive(true);
}
+
+ subscription.start();
}
private boolean isExclusive()
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Fri Mar 14 04:36:42 2008
@@ -45,8 +45,6 @@
void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst);
- boolean isAutoClose();
-
void close();
boolean isClosed();
@@ -60,4 +58,6 @@
Object getSendLock();
AMQChannel getChannel();
+
+ void start();
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=637066&r1=637065&r2=637066&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Fri Mar 14 04:36:42 2008
@@ -461,7 +461,7 @@
}
}
- public boolean isAutoClose()
+ private boolean isAutoClose()
{
return _autoClose;
}
@@ -523,19 +523,24 @@
{
_logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this);
- ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
- _sentClose = true;
-
- //fixme JIRA do this better
+ boolean unregisteredOK = false;
try
{
- channel.unsubscribeConsumer(protocolSession, consumerTag);
+ unregisteredOK = channel.unsubscribeConsumer(protocolSession, consumerTag);
}
catch (AMQException e)
{
// Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag.
+ _logger.info("Unable to UnsubscribeConsumer :" + consumerTag +" so not going to send CancelOK.");
}
+
+ if (unregisteredOK)
+ {
+ ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag);
+ _sentClose = true;
+ }
+
}
}
@@ -664,6 +669,21 @@
public AMQChannel getChannel()
{
return channel;
+ }
+
+ public void start()
+ {
+ //Check to see if we need to autoclose
+ if (filtersMessages())
+ {
+ if (isAutoClose())
+ {
+ if (_messages.isEmpty())
+ {
+ autoclose();
+ }
+ }
+ }
}
}