You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/10/28 15:16:18 UTC
svn commit: r1634884 - in
/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server:
exchange/AbstractExchange.java model/AbstractConfiguredObject.java
model/BrokerModel.java queue/AbstractQueue.java
virtualhost/AbstractVirtualHost.java
Author: kwall
Date: Tue Oct 28 14:16:18 2014
New Revision: 1634884
URL: http://svn.apache.org/r1634884
Log:
QPID-6192: [Java Broker] On close, close the connections before exchanges/queues
* Exchanges/Queue now check virtualhost state prior to routing a message to queue/consumer.
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Oct 28 14:16:18 2014
@@ -507,8 +507,13 @@ public abstract class AbstractExchange<T
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
- List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ return 0;
+ }
+ List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
if(queues == null || queues.isEmpty())
{
Exchange altExchange = getAlternateExchange();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Oct 28 14:16:18 2014
@@ -490,6 +490,7 @@ public abstract class AbstractConfigured
{
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
+ beforeClose();
closeChildren();
onClose();
unregister(false);
@@ -497,6 +498,10 @@ public abstract class AbstractConfigured
}
}
+ protected void beforeClose()
+ {
+ }
+
protected void onClose()
{
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Tue Oct 28 14:16:18 2014
@@ -78,9 +78,9 @@ public final class BrokerModel extends M
addRelationship(VirtualHostNode.class, VirtualHost.class);
addRelationship(VirtualHostNode.class, RemoteReplicationNode.class);
+ addRelationship(VirtualHost.class, Connection.class);
addRelationship(VirtualHost.class, Exchange.class);
addRelationship(VirtualHost.class, Queue.class);
- addRelationship(VirtualHost.class, Connection.class);
addRelationship(Port.class, VirtualHostAlias.class);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Oct 28 14:16:18 2014
@@ -1923,6 +1923,12 @@ public abstract class AbstractQueue<X ex
sub.releaseSendLock();
}
}
+
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState());
+ return true;
+ }
}
}
finally
@@ -1967,12 +1973,13 @@ public abstract class AbstractQueue<X ex
boolean atTail = false;
boolean subActive = sub.isActive() && !sub.isSuspended();
+
if (subActive)
{
QueueEntry node = getNextAvailableEntry(sub);
- if (node != null && node.isAvailable())
+ if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable())
{
if (sub.hasInterest(node) && mightAssign(sub, node))
{
@@ -2178,6 +2185,12 @@ public abstract class AbstractQueue<X ex
sub.flushBatched();
break;
}
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState());
+
+ break;
+ }
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Oct 28 14:16:18 2014
@@ -664,9 +664,15 @@ public abstract class AbstractVirtualHos
return _broker.getSecurityManager();
}
- protected void onClose()
+ @Override
+ protected void beforeClose()
{
setState(State.UNAVAILABLE);
+ }
+
+ @Override
+ protected void onClose()
+ {
//Stop Connections
_connectionRegistry.close();
_dtxRegistry.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org