You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/10/28 18:36:42 UTC

svn commit: r1634929 - in /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server: exchange/AbstractExchange.java queue/AbstractQueue.java

Author: kwall
Date: Tue Oct 28 17:36:41 2014
New Revision: 1634929

URL: http://svn.apache.org/r1634929
Log:
QPID-6192: [Java Broker] Incorporate some feedbackk from Rob Godfrey.

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1634929&r1=1634928&r2=1634929&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Oct 28 17:36:41 2014
@@ -64,6 +64,7 @@ import org.apache.qpid.server.queue.Base
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -509,8 +510,7 @@ public abstract class AbstractExchange<T
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
-            _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
-            return 0;
+            throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
         }
 
         List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1634929&r1=1634928&r2=1634929&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Oct 28 17:36:41 2014
@@ -499,7 +499,11 @@ public abstract class AbstractQueue<X ex
     {
         try
         {
-            _asyncDelivery.execute(runnable);
+
+            if (_virtualHost.getState() != State.UNAVAILABLE)
+            {
+                _asyncDelivery.execute(runnable);
+            }
         }
         catch (RejectedExecutionException ree)
         {
@@ -1923,12 +1927,6 @@ public abstract class AbstractQueue<X ex
                         sub.releaseSendLock();
                     }
                 }
-
-                if (_virtualHost.getState() != State.ACTIVE)
-                {
-                    _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState());
-                    return true;
-                }
             }
         }
         finally
@@ -1979,7 +1977,14 @@ public abstract class AbstractQueue<X ex
 
             QueueEntry node  = getNextAvailableEntry(sub);
 
-            if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable())
+
+            if (_virtualHost.getState() != State.ACTIVE)
+            {
+                throw new ConnectionScopedRuntimeException("Delivery halted owing to " +
+                                                           "virtualhost state " + _virtualHost.getState());
+            }
+
+            if (node != null && node.isAvailable())
             {
                 if (sub.hasInterest(node) && mightAssign(sub, node))
                 {
@@ -2185,12 +2190,6 @@ public abstract class AbstractQueue<X ex
                                     sub.flushBatched();
                                     break;
                                 }
-                                if (_virtualHost.getState() != State.ACTIVE)
-                                {
-                                    _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState());
-
-                                    break;
-                                }
                             }
 
                         }
@@ -2544,6 +2543,11 @@ public abstract class AbstractQueue<X ex
                                                                                         final ServerTransaction txn,
                                                                                         final Action<? super MessageInstance> postEnqueueAction)
     {
+        if (_virtualHost.getState() != State.ACTIVE)
+        {
+            throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+        }
+
         if(!message.isReferenced(this))
         {
             txn.enqueue(this, message, new ServerTransaction.Action()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org