You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/10/28 18:36:42 UTC
svn commit: r1634929 - in
/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server:
exchange/AbstractExchange.java queue/AbstractQueue.java
Author: kwall
Date: Tue Oct 28 17:36:41 2014
New Revision: 1634929
URL: http://svn.apache.org/r1634929
Log:
QPID-6192: [Java Broker] Incorporate some feedbackk from Rob Godfrey.
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1634929&r1=1634928&r2=1634929&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Oct 28 17:36:41 2014
@@ -64,6 +64,7 @@ import org.apache.qpid.server.queue.Base
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -509,8 +510,7 @@ public abstract class AbstractExchange<T
{
if (_virtualHost.getState() != State.ACTIVE)
{
- _logger.debug("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
- return 0;
+ throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
}
List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1634929&r1=1634928&r2=1634929&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Oct 28 17:36:41 2014
@@ -499,7 +499,11 @@ public abstract class AbstractQueue<X ex
{
try
{
- _asyncDelivery.execute(runnable);
+
+ if (_virtualHost.getState() != State.UNAVAILABLE)
+ {
+ _asyncDelivery.execute(runnable);
+ }
}
catch (RejectedExecutionException ree)
{
@@ -1923,12 +1927,6 @@ public abstract class AbstractQueue<X ex
sub.releaseSendLock();
}
}
-
- if (_virtualHost.getState() != State.ACTIVE)
- {
- _logger.debug("Subscription flush halted owing to virtualhost state " + _virtualHost.getState());
- return true;
- }
}
}
finally
@@ -1979,7 +1977,14 @@ public abstract class AbstractQueue<X ex
QueueEntry node = getNextAvailableEntry(sub);
- if (_virtualHost.getState() == State.ACTIVE && node != null && node.isAvailable())
+
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ throw new ConnectionScopedRuntimeException("Delivery halted owing to " +
+ "virtualhost state " + _virtualHost.getState());
+ }
+
+ if (node != null && node.isAvailable())
{
if (sub.hasInterest(node) && mightAssign(sub, node))
{
@@ -2185,12 +2190,6 @@ public abstract class AbstractQueue<X ex
sub.flushBatched();
break;
}
- if (_virtualHost.getState() != State.ACTIVE)
- {
- _logger.debug("Queue process halted owing to virtualhost state " + _virtualHost.getState());
-
- break;
- }
}
}
@@ -2544,6 +2543,11 @@ public abstract class AbstractQueue<X ex
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
+ if (_virtualHost.getState() != State.ACTIVE)
+ {
+ throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+ }
+
if(!message.isReferenced(this))
{
txn.enqueue(this, message, new ServerTransaction.Action()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org