You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/08/07 14:21:06 UTC
svn commit: r1694667 -
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Author: kwall
Date: Fri Aug 7 12:21:06 2015
New Revision: 1694667
URL: http://svn.apache.org/r1694667
Log:
QPID-6670: [Java Broker] Ensure that if an exception occurs during AbstractQueue#deleteAndReturnCount, the returned future will fail too
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1694667&r1=1694666&r2=1694667&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Aug 7 12:21:06 2015
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -1889,42 +1890,56 @@ public abstract class AbstractQueue<X ex
ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
- doAfter(combinedFuture, new Runnable()
+ Futures.addCallback(combinedFuture, new FutureCallback<List<Void>>()
{
@Override
- public void run()
+ public void onSuccess(final List<Void> result)
{
- QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
-
- while (consumerNodeIterator.advance())
+ try
{
- QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
- if (s != null)
+ final QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+
+ while (consumerNodeIterator.advance())
{
- s.queueDeleted();
+ final QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
+ if (s != null)
+ {
+ s.queueDeleted();
+ }
}
- }
- List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
+ final List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
- routeToAlternate(entries);
+ routeToAlternate(entries);
- preSetAlternateExchange();
+ preSetAlternateExchange();
- performQueueDeleteTasks();
- deleted();
+ performQueueDeleteTasks();
+ deleted();
- //Log Queue Deletion
- getEventLogger().message(_logSubject, QueueMessages.DELETED());
+ //Log Queue Deletion
+ getEventLogger().message(_logSubject, QueueMessages.DELETED());
- _deleteFuture.set(queueDepthMessages);
+ _deleteFuture.set(queueDepthMessages);
+ }
+ catch(Throwable e)
+ {
+ _deleteFuture.setException(e);
+ }
}
- });
+
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ _deleteFuture.setException(t);
+ }
+ }, getTaskExecutor().getExecutor());
+
}
return _deleteFuture;
}
- protected void routeToAlternate(List<QueueEntry> entries)
+ private void routeToAlternate(List<QueueEntry> entries)
{
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
@@ -1942,7 +1957,7 @@ public abstract class AbstractQueue<X ex
txn.commit();
}
- protected void performQueueDeleteTasks()
+ private void performQueueDeleteTasks()
{
for (Action<? super AMQQueue> task : _deleteTaskList)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org