You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/08/07 14:21:06 UTC

svn commit: r1694667 - /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java

Author: kwall
Date: Fri Aug  7 12:21:06 2015
New Revision: 1694667

URL: http://svn.apache.org/r1694667
Log:
QPID-6670: [Java Broker] Ensure that if an exception occurs during AbstractQueue#deleteAndReturnCount, the returned future will fail too

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1694667&r1=1694666&r2=1694667&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Aug  7 12:21:06 2015
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -1889,42 +1890,56 @@ public abstract class AbstractQueue<X ex
 
             ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
 
-            doAfter(combinedFuture, new Runnable()
+            Futures.addCallback(combinedFuture, new FutureCallback<List<Void>>()
             {
                 @Override
-                public void run()
+                public void onSuccess(final List<Void> result)
                 {
-                    QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
-
-                    while (consumerNodeIterator.advance())
+                    try
                     {
-                        QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
-                        if (s != null)
+                        final QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+
+                        while (consumerNodeIterator.advance())
                         {
-                            s.queueDeleted();
+                            final QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
+                            if (s != null)
+                            {
+                                s.queueDeleted();
+                            }
                         }
-                    }
 
-                    List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
+                        final List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
 
-                    routeToAlternate(entries);
+                        routeToAlternate(entries);
 
-                    preSetAlternateExchange();
+                        preSetAlternateExchange();
 
-                    performQueueDeleteTasks();
-                    deleted();
+                        performQueueDeleteTasks();
+                        deleted();
 
-                    //Log Queue Deletion
-                    getEventLogger().message(_logSubject, QueueMessages.DELETED());
+                        //Log Queue Deletion
+                        getEventLogger().message(_logSubject, QueueMessages.DELETED());
 
-                    _deleteFuture.set(queueDepthMessages);
+                        _deleteFuture.set(queueDepthMessages);
+                    }
+                    catch(Throwable e)
+                    {
+                        _deleteFuture.setException(e);
+                    }
                 }
-            });
+
+                @Override
+                public void onFailure(final Throwable t)
+                {
+                    _deleteFuture.setException(t);
+                }
+            }, getTaskExecutor().getExecutor());
+
         }
         return _deleteFuture;
     }
 
-    protected void routeToAlternate(List<QueueEntry> entries)
+    private void routeToAlternate(List<QueueEntry> entries)
     {
         ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
@@ -1942,7 +1957,7 @@ public abstract class AbstractQueue<X ex
         txn.commit();
     }
 
-    protected void performQueueDeleteTasks()
+    private void performQueueDeleteTasks()
     {
         for (Action<? super AMQQueue> task : _deleteTaskList)
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org