You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/10/28 15:16:18 UTC

svn commit: r1634884 - in /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server: exchange/AbstractExchange.java model/AbstractConfiguredObject.java model/BrokerModel.java queue/AbstractQueue.java virtualhost/AbstractVirtualHost.java

Author: kwall
Date: Tue Oct 28 14:16:18 2014
New Revision: 1634884

URL: http://svn.apache.org/r1634884
Log:
QPID-6192: [Java Broker] On close, close the connections before exchanges/queues

* Exchanges/Queue now check virtualhost state prior to routing a message to queue/consumer.

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Oct 28 14:16:18 2014
@@ -507,8 +507,13 @@ public abstract class AbstractExchange<T
                                                                                         final ServerTransaction txn,
                                                                                         final Action<? super MessageInstance> postEnqueueAction)
     {
-        List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
+        if (_virtualHost.getState() != State.ACTIVE)
+        {
+            _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+            return 0;
+        }
 
+        List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
         if(queues == null || queues.isEmpty())
         {
             Exchange altExchange = getAlternateExchange();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Tue Oct 28 14:16:18 2014
@@ -490,6 +490,7 @@ public abstract class AbstractConfigured
     {
         if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
         {
+            beforeClose();
             closeChildren();
             onClose();
             unregister(false);
@@ -497,6 +498,10 @@ public abstract class AbstractConfigured
         }
     }
 
+    protected void beforeClose()
+    {
+    }
+
     protected void onClose()
     {
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Tue Oct 28 14:16:18 2014
@@ -78,9 +78,9 @@ public final class BrokerModel extends M
         addRelationship(VirtualHostNode.class, VirtualHost.class);
         addRelationship(VirtualHostNode.class, RemoteReplicationNode.class);
 
+        addRelationship(VirtualHost.class, Connection.class);
         addRelationship(VirtualHost.class, Exchange.class);
         addRelationship(VirtualHost.class, Queue.class);
-        addRelationship(VirtualHost.class, Connection.class);
 
         addRelationship(Port.class, VirtualHostAlias.class);
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Oct 28 14:16:18 2014
@@ -1923,6 +1923,12 @@ public abstract class AbstractQueue<X ex
                         sub.releaseSendLock();
                     }
                 }
+
+                if (_virtualHost.getState() != State.ACTIVE)
+                {
+                    _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState());
+                    return true;
+                }
             }
         }
         finally
@@ -1967,12 +1973,13 @@ public abstract class AbstractQueue<X ex
         boolean atTail = false;
 
         boolean subActive = sub.isActive() && !sub.isSuspended();
+
         if (subActive)
         {
 
             QueueEntry node  = getNextAvailableEntry(sub);
 
-            if (node != null && node.isAvailable())
+            if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable())
             {
                 if (sub.hasInterest(node) && mightAssign(sub, node))
                 {
@@ -2178,6 +2185,12 @@ public abstract class AbstractQueue<X ex
                                     sub.flushBatched();
                                     break;
                                 }
+                                if (_virtualHost.getState() != State.ACTIVE)
+                                {
+                                    _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState());
+
+                                    break;
+                                }
                             }
 
                         }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1634884&r1=1634883&r2=1634884&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Oct 28 14:16:18 2014
@@ -664,9 +664,15 @@ public abstract class AbstractVirtualHos
         return _broker.getSecurityManager();
     }
 
-    protected void onClose()
+    @Override
+    protected void beforeClose()
     {
         setState(State.UNAVAILABLE);
+    }
+
+    @Override
+    protected void onClose()
+    {
         //Stop Connections
         _connectionRegistry.close();
         _dtxRegistry.close();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org