You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/08/05 17:54:30 UTC

svn commit: r1694253 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/binding/ broker-core/src/main/java/org/apache/qpid/server/configuration/updater/ broker-c...

Author: kwall
Date: Wed Aug  5 15:54:30 2015
New Revision: 1694253

URL: http://svn.apache.org/r1694253
Log:
QPID-6670: [Java Broker] Delete future for queue/exchange must chain binding deletion too

* Changed Queue/Exchange doDelete so that the future returned completes only when the binding
  children are also deleted too. Uses established doAfter/Async patterns already adopted elsewhere
* On the 0-8..0-91 paths, close the session model object explicitly (like 0-10/1.0 paths) so that
  io thread blocks until session close (rather than running asynchronously).

Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java Wed Aug  5 15:54:30 2015
@@ -164,6 +164,14 @@ public class BDBHAReplicaVirtualHostImpl
     }
 
     @Override
+    public ListenableFuture<Void> removeExchangeAsync(final ExchangeImpl<?> exchange, final boolean force)
+            throws ExchangeIsAlternateException, RequiredExchangeException
+    {
+        throwUnsupportedForReplica();
+        return null;
+    }
+
+    @Override
     public MessageDestination getAttainedMessageDestination(final String name)
     {
         return null;
@@ -333,6 +341,13 @@ public class BDBHAReplicaVirtualHostImpl
     }
 
     @Override
+    public ListenableFuture<Integer> removeQueueAsync(final AMQQueue<?> queue)
+    {
+        throwUnsupportedForReplica();
+        return null;
+    }
+
+    @Override
     public int removeQueue(final AMQQueue<?> queue)
     {
         throwUnsupportedForReplica();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Wed Aug  5 15:54:30 2015
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -49,13 +48,12 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.model.StateTransition;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.util.StateChangeListener;
 
 public class BindingImpl
         extends AbstractConfiguredObject<BindingImpl>
         implements org.apache.qpid.server.model.Binding<BindingImpl>
 {
-    private String _bindingKey;
+    private final String _bindingKey;
     private final AMQQueue _queue;
     private final ExchangeImpl _exchange;
     @ManagedAttributeField
@@ -64,8 +62,6 @@ public class BindingImpl
     private BindingLogSubject _logSubject;
 
     final AtomicBoolean _deleted = new AtomicBoolean();
-    final CopyOnWriteArrayList<StateChangeListener<BindingImpl,State>> _stateChangeListeners =
-            new CopyOnWriteArrayList<StateChangeListener<BindingImpl, State>>();
 
     public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
     {
@@ -115,7 +111,7 @@ public class BindingImpl
     {
         if(!attributes.containsKey(DURABLE))
         {
-            attributes = new HashMap(attributes);
+            attributes = new HashMap<>(attributes);
             attributes.put(DURABLE, queue.isDurable() && exchange.isDurable());
         }
         return attributes;
@@ -193,6 +189,7 @@ public class BindingImpl
         return result;
     }
 
+    @Override
     public String toString()
     {
         return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + getId() + " }";
@@ -201,18 +198,27 @@ public class BindingImpl
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
     private ListenableFuture<Void> doDelete()
     {
-        if(_deleted.compareAndSet(false,true))
+        if (_deleted.compareAndSet(false, true))
         {
-            for(StateChangeListener<BindingImpl,State> listener : _stateChangeListeners)
+            ListenableFuture<Void> removeBinding = _exchange.removeBindingAsync(this);
+            return doAfter(removeBinding, new Runnable()
             {
-                listener.stateChanged(this, State.ACTIVE, State.DELETED);
-            }
+                @Override
+                public void run()
+                {
+                    getEventLogger().message(_logSubject, BindingMessages.DELETED());
+                    deleted();
+                    setState(State.DELETED);
+                }
+            });
+        }
+        else
+        {
             getEventLogger().message(_logSubject, BindingMessages.DELETED());
+            deleted();
+            setState(State.DELETED);
+            return Futures.immediateFuture(null);
         }
-
-        deleted();
-        setState(State.DELETED);
-        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
@@ -222,16 +228,6 @@ public class BindingImpl
         return Futures.immediateFuture(null);
     }
 
-    public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener)
-    {
-        _stateChangeListeners.add(listener);
-    }
-
-    public void removeStateChangeListener(StateChangeListener<BindingImpl,State> listener)
-    {
-        _stateChangeListeners.remove(listener);
-    }
-
     private EventLogger getEventLogger()
     {
         return _exchange.getEventLogger();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java Wed Aug  5 15:54:30 2015
@@ -79,7 +79,7 @@ public class TaskExecutorImpl implements
     {
         if (_running.compareAndSet(false, true))
         {
-            LOGGER.debug("Starting task executor");
+            LOGGER.debug("Starting task executor {}", _name);
             _executor = Executors.newFixedThreadPool(1, new ThreadFactory()
             {
                 @Override
@@ -101,7 +101,7 @@ public class TaskExecutorImpl implements
             ExecutorService executor = _executor;
             if (executor != null)
             {
-                LOGGER.debug("Stopping task executor immediately");
+                LOGGER.debug("Stopping task executor {} immediately", _name);
                 List<Runnable> cancelledTasks = executor.shutdownNow();
                 for (Runnable runnable : cancelledTasks)
                 {
@@ -126,7 +126,7 @@ public class TaskExecutorImpl implements
             ExecutorService executor = _executor;
             if (executor != null)
             {
-                LOGGER.debug("Stopping task executor");
+                LOGGER.debug("Stopping task executor {}", _name);
                 executor.shutdown();
                 _executor = null;
                 _taskThread = null;
@@ -138,19 +138,18 @@ public class TaskExecutorImpl implements
     @Override
     public <T> Future<T> submit(Task<T> task)
     {
-        checkState();
-        LOGGER.debug("Submitting task: {}", task);
-        Future<T> future = null;
+        checkState(task);
         if (isTaskExecutorThread())
         {
+            LOGGER.debug("Running task {} immediately", task);
             T result = executeTask(task);
             return new ImmediateFuture(result);
         }
         else
         {
-            future = _executor.submit(new CallableWrapper(task));
+            LOGGER.debug("Submitting task {} to executor {}", task, _name);
+            return _executor.submit(new CallableWrapper(task));
         }
-        return future;
     }
 
     @Override
@@ -303,11 +302,12 @@ public class TaskExecutorImpl implements
         return Thread.currentThread() == _taskThread;
     }
 
-    private void checkState()
+    private <T> void checkState(Task<T> task)
     {
         if (!_running.get())
         {
-            throw new IllegalStateException("Task executor is not in ACTIVE state");
+            LOGGER.error("Task executor {} is not in ACTIVE state, unable to execute : {} ", _name, task);
+            throw new IllegalStateException("Task executor " + _name + " is not in ACTIVE state");
         }
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Aug  5 15:54:30 2015
@@ -70,7 +70,6 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -89,7 +88,7 @@ public abstract class AbstractExchange<T
 
     private VirtualHostImpl _virtualHost;
 
-    private final List<Action<ExchangeImpl>> _closeTaskList = new CopyOnWriteArrayList<Action<ExchangeImpl>>();
+    private final List<Action<ExchangeImpl>> _closeTaskList = new CopyOnWriteArrayList<>();
 
     /**
      * Whether the exchange is automatically deleted once all queues have detached from it
@@ -113,8 +112,6 @@ public abstract class AbstractExchange<T
 
     private final ConcurrentMap<BindingIdentifier, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingIdentifier, BindingImpl>();
 
-    private StateChangeListener<BindingImpl, State> _bindingListener;
-
     public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost)
     {
         super(parentsMap(vhost), attributes);
@@ -127,18 +124,6 @@ public abstract class AbstractExchange<T
         _virtualHost = vhost;
 
         _logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
-
-        _bindingListener = new StateChangeListener<BindingImpl, State>()
-        {
-            @Override
-            public void stateChanged(final BindingImpl binding, final State oldState, final State newState)
-            {
-                if(newState == State.DELETED)
-                {
-                    removeBinding(binding);
-                }
-            }
-        };
     }
 
     @Override
@@ -187,7 +172,7 @@ public abstract class AbstractExchange<T
     }
 
     @Override
-    public void deleteWithChecks()
+    public ListenableFuture<Void> deleteWithChecks()
     {
         if(hasReferrers())
         {
@@ -201,28 +186,44 @@ public abstract class AbstractExchange<T
 
         if(_closed.compareAndSet(false,true))
         {
-            List<BindingImpl> bindings = new ArrayList<BindingImpl>(_bindings);
+            List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>();
+            ListenableFuture<Void> atLeastOne = Futures.immediateFuture(null);
+            removeBindingFutures.add(atLeastOne);
+
+            List<BindingImpl> bindings = new ArrayList<>(_bindings);
             for(BindingImpl binding : bindings)
             {
-                binding.removeStateChangeListener(_bindingListener);
-                removeBinding(binding);
+                ListenableFuture<Void> deleteFuture = binding.deleteAsync();
+                removeBindingFutures.add(deleteFuture);
             }
 
-            if(_alternateExchange != null)
+            ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
+            return doAfter(combinedFuture, new Runnable()
             {
-                ((ExchangeImpl)_alternateExchange).removeReference(this);
-            }
+                @Override
+                public void run()
+                {
+                    if (_alternateExchange != null)
+                    {
+                        ((ExchangeImpl) _alternateExchange).removeReference(AbstractExchange.this);
+                    }
 
-            getEventLogger().message(_logSubject, ExchangeMessages.DELETED());
-
-            for(Action<ExchangeImpl> task : _closeTaskList)
-            {
-                task.performAction(this);
-            }
-            _closeTaskList.clear();
+                    getEventLogger().message(_logSubject, ExchangeMessages.DELETED());
 
+                    for (Action<ExchangeImpl> task : _closeTaskList)
+                    {
+                        task.performAction(AbstractExchange.this);
+                    }
+                    _closeTaskList.clear();
+                    deleted();
+                }
+            });
+        }
+        else
+        {
+            deleted();
+            return Futures.immediateFuture(null);
         }
-        deleted();
     }
 
     @Override
@@ -633,8 +634,8 @@ public abstract class AbstractExchange<T
         }));
     }
 
-
-    private void removeBinding(final BindingImpl binding)
+    @Override
+    public ListenableFuture<Void> removeBindingAsync(final BindingImpl binding)
     {
         String bindingKey = binding.getBindingKey();
         AMQQueue queue = binding.getAMQQueue();
@@ -657,39 +658,43 @@ public abstract class AbstractExchange<T
             queue.removeBinding(b);
 
             // TODO - RG - Fix bindings!
-            if(getTaskExecutor().isTaskExecutorThread())
-            {
-                b.deleteAsync();
-            }
-            else
-            {
-                b.delete();
-            }
+            return autoDeleteIfNecessaryAsync();
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
+        }
 
-            autoDeleteIfNeccessary();
+    }
+
+    private ListenableFuture<Void> autoDeleteIfNecessaryAsync()
+    {
+        if (isAutoDeletePending())
+        {
+            _logger.debug("Auto-deleting exchange: {}", this);
 
+            return deleteAsync();
         }
 
+        return Futures.immediateFuture(null);
     }
 
-    private void autoDeleteIfNeccessary()
+    private void autoDeleteIfNecessary()
     {
-        if ((getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
-            && getBindingCount() == 0)
+        if (isAutoDeletePending())
         {
             _logger.debug("Auto-deleting exchange: {}", this);
 
-            if(getTaskExecutor().isTaskExecutorThread())
-            {
-                deleteAsync();
-            }
-            else
-            {
-                delete();
-            }
+            delete();
         }
     }
 
+    private boolean isAutoDeletePending()
+    {
+        return (getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS )
+            && getBindingCount() == 0;
+    }
+
     public BindingImpl getBinding(String bindingKey, AMQQueue queue)
     {
         assert queue != null;
@@ -782,8 +787,6 @@ public abstract class AbstractExchange<T
     @Override
     public void addBinding(final BindingImpl b)
     {
-        b.addStateChangeListener(_bindingListener);
-
         BindingIdentifier identifier = new BindingIdentifier(b.getName(), b.getAMQQueue());
 
         _bindingsMap.put(identifier, b);
@@ -818,9 +821,16 @@ public abstract class AbstractExchange<T
     {
         try
         {
-            _virtualHost.removeExchange(this,true);
-            preSetAlternateExchange();
-            setState(State.DELETED);
+            ListenableFuture<Void> removeExchangeFuture = _virtualHost.removeExchangeAsync(this, true);
+            return doAfter(removeExchangeFuture, new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    preSetAlternateExchange();
+                    setState(State.DELETED);
+                }
+            });
         }
         catch (ExchangeIsAlternateException e)
         {
@@ -917,7 +927,7 @@ public abstract class AbstractExchange<T
         else
         {
             binding.delete();
-            autoDeleteIfNeccessary();
+            autoDeleteIfNecessary();
             return true;
         }
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java Wed Aug  5 15:54:30 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.exchange;
 
 import java.util.Map;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.MessageDestination;
@@ -46,7 +48,7 @@ public interface ExchangeImpl<T extends
                            AMQQueue queue,
                            Map<String, Object> arguments);
 
-    void deleteWithChecks();
+    ListenableFuture<Void> deleteWithChecks();
 
     /**
      * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
@@ -94,6 +96,8 @@ public interface ExchangeImpl<T extends
 
     boolean hasReferrers();
 
+    ListenableFuture<Void> removeBindingAsync(BindingImpl binding);
+
     BindingImpl getBinding(String bindingName, AMQQueue queue);
 
     EventLogger getEventLogger();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Aug  5 15:54:30 2015
@@ -255,7 +255,7 @@ public class TopicExchange extends Abstr
         {
             Map<String,Object> bindingArgs = _bindings.remove(binding);
 
-            _logger.debug("deregisterQueue {}", bindingArgs);
+            _logger.debug("deregisterQueue args: {}", bindingArgs);
 
             String bindingKey = TopicNormalizer.normalize(binding.getBindingKey());
             TopicExchangeResult result = _topicExchangeResults.get(bindingKey);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java Wed Aug  5 15:54:30 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
 import java.util.Collection;
 import java.util.Map;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.message.MessageDestination;
 
 @ManagedObject( description = Exchange.CLASS_DESCRIPTION )
@@ -69,5 +71,5 @@ public interface Exchange<X extends Exch
 
 
 
-    void deleteWithChecks();
+    ListenableFuture<Void> deleteWithChecks();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Wed Aug  5 15:54:30 2015
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.message.MessageInfo;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.apache.qpid.server.store.MessageDurability;
@@ -181,7 +183,7 @@ public interface Queue<X extends Queue<X
 
     void visit(QueueEntryVisitor visitor);
 
-    int deleteAndReturnCount();
+    ListenableFuture<Integer> deleteAndReturnCount();
 
     void setNotificationListener(QueueNotificationListener listener);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Wed Aug  5 15:54:30 2015
@@ -46,6 +46,7 @@ public final class SessionAdapter extend
 {
     // Attributes
     private final AMQSessionModel _session;
+    private final Action _deleteModelTask;
 
     public SessionAdapter(final AbstractAMQPConnection<?> connectionAdapter,
                           final AMQSessionModel session)
@@ -68,7 +69,7 @@ public final class SessionAdapter extend
             }
         });
         session.setModelObject(this);
-        session.addDeleteTask(new Action()
+        _deleteModelTask = new Action()
         {
             @Override
             public void performAction(final Object object)
@@ -76,7 +77,8 @@ public final class SessionAdapter extend
                 session.removeDeleteTask(this);
                 deleteAsync();
             }
-        });
+        };
+        session.addDeleteTask(_deleteModelTask);
         setState(State.ACTIVE);
     }
 
@@ -177,6 +179,7 @@ public final class SessionAdapter extend
     {
         deleted();
         setState(State.DELETED);
+        _session.removeDeleteTask(_deleteModelTask);
         return Futures.immediateFuture(null);
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Aug  5 15:54:30 2015
@@ -38,7 +38,11 @@ import java.util.concurrent.ConcurrentLi
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -52,6 +56,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInfoFacade;
 import org.apache.qpid.server.model.CustomRestHeaders;
 import org.apache.qpid.server.model.RestContentHeader;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1866,88 +1871,131 @@ public abstract class AbstractQueue<X ex
         _deleteTaskList.remove(task);
     }
 
-    // TODO list all thrown exceptions
-    public int deleteAndReturnCount()
+    public ListenableFuture<Integer> deleteAndReturnCount()
     {
         // Check access
         _virtualHost.getSecurityManager().authoriseDelete(this);
 
+        final ListenableFuture<Integer> returnCountFuture = Futures.immediateFuture(getQueueDepthMessages());
         if (!_deleted.getAndSet(true))
         {
+            final List<ListenableFuture<Void>> removeBindingFutures = new ArrayList<>();
+            final ListenableFuture<Void> atLeastOne = Futures.immediateFuture(null);
+            removeBindingFutures.add(atLeastOne);
+            final ArrayList<BindingImpl> bindingCopy = new ArrayList<>(_bindings);
 
-            final ArrayList<BindingImpl> bindingCopy = new ArrayList<BindingImpl>(_bindings);
-
+            // TODO - RG - Need to sort out bindings!
             for (BindingImpl b : bindingCopy)
             {
-                // TODO - RG - Need to sort out bindings!
-                if(getTaskExecutor().isTaskExecutorThread())
-                {
-                    b.deleteAsync();
-                }
-                else
-                {
-                    b.delete();
-                }
+                final ListenableFuture<Void> removeFuture = b.deleteAsync();
+                removeBindingFutures.add(removeFuture);
             }
 
-            QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
-
-            while (consumerNodeIterator.advance())
+            ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(removeBindingFutures);
+            final ListenableFuture<Void> result = doAfter(combinedFuture, new Runnable()
             {
-                QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
-                if (s != null)
+                @Override
+                public void run()
                 {
-                    s.queueDeleted();
-                }
-            }
+                    QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
 
+                    while (consumerNodeIterator.advance())
+                    {
+                        QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
+                        if (s != null)
+                        {
+                            s.queueDeleted();
+                        }
+                    }
 
-            List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
-            {
+                    List<QueueEntry> entries = getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
 
-                public boolean accept(QueueEntry entry)
+                    routeToAlternate(entries);
+
+                    preSetAlternateExchange();
+
+                    performQueueDeleteTasks();
+                    deleted();
+
+                    //Log Queue Deletion
+                    getEventLogger().message(_logSubject, QueueMessages.DELETED());
+                }
+            });
+
+            return new ListenableFuture<Integer>()
+            {
+                @Override
+                public void addListener(final Runnable listener, final Executor executor)
                 {
-                    return entry.acquire();
+                    result.addListener(listener, executor);
                 }
 
-                public boolean filterComplete()
+                @Override
+                public boolean cancel(final boolean mayInterruptIfRunning)
                 {
-                    return false;
+                    return result.cancel(mayInterruptIfRunning);
                 }
-            });
 
-            ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
+                @Override
+                public boolean isCancelled()
+                {
+                    return result.isCancelled();
+                }
 
+                @Override
+                public boolean isDone()
+                {
+                    return result.isDone();
+                }
 
-            for(final QueueEntry entry : entries)
-            {
-                // TODO log requeues with a post enqueue action
-                int requeues = entry.routeToAlternate(null, txn);
+                @Override
+                public Integer get() throws InterruptedException, ExecutionException
+                {
+                    result.get();
+                    return returnCountFuture.get();
+                }
 
-                if(requeues == 0)
+                @Override
+                public Integer get(final long timeout, final TimeUnit unit)
+                        throws InterruptedException, ExecutionException, TimeoutException
                 {
-                    // TODO log discard
+                    result.get(timeout, unit);
+                    return returnCountFuture.get();
                 }
-            }
+            };
+        }
+        else
+        {
+           return returnCountFuture;
+        }
+    }
 
-            txn.commit();
+    protected void routeToAlternate(List<QueueEntry> entries)
+    {
+        ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
-            preSetAlternateExchange();
+        for(final QueueEntry entry : entries)
+        {
+            // TODO log requeues with a post enqueue action
+            int requeues = entry.routeToAlternate(null, txn);
 
-            for (Action<? super AMQQueue> task : _deleteTaskList)
+            if(requeues == 0)
             {
-                task.performAction(this);
+                // TODO log discard
             }
+        }
 
-            _deleteTaskList.clear();
-            closeAsync();
-            deleted();
-            //Log Queue Deletion
-            getEventLogger().message(_logSubject, QueueMessages.DELETED());
+        txn.commit();
+    }
 
+    protected void performQueueDeleteTasks()
+    {
+        for (Action<? super AMQQueue> task : _deleteTaskList)
+        {
+            task.performAction(this);
         }
-        return getQueueDepthMessages();
 
+        _deleteTaskList.clear();
     }
 
     @Override
@@ -2551,6 +2599,19 @@ public abstract class AbstractQueue<X ex
 
     }
 
+    private static class AcquireAllQueueEntryFilter implements QueueEntryFilter
+    {
+        public boolean accept(QueueEntry entry)
+        {
+            return entry.acquire();
+        }
+
+        public boolean filterComplete()
+        {
+            return false;
+        }
+    }
+
     private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
     {
 
@@ -2994,10 +3055,17 @@ public abstract class AbstractQueue<X ex
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
     private ListenableFuture<Void> doDelete()
     {
-        _virtualHost.removeQueue(this);
-        preSetAlternateExchange();
-        setState(State.DELETED);
-        return Futures.immediateFuture(null);
+        ListenableFuture<Integer> removeFuture = _virtualHost.removeQueueAsync(this);
+        return doAfter(removeFuture, new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                preSetAlternateExchange();
+                setState(State.DELETED);
+            }
+        });
+
     }
 
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Aug  5 15:54:30 2015
@@ -706,10 +706,14 @@ public abstract class AbstractVirtualHos
     @Override
     public int removeQueue(AMQQueue<?> queue)
     {
-        int purged = queue.deleteAndReturnCount();
+        return doSync(removeQueueAsync(queue));
+    }
 
-        return purged;
-}
+    @Override
+    public ListenableFuture<Integer> removeQueueAsync(final AMQQueue<?> queue)
+    {
+        return queue.deleteAndReturnCount();
+    }
 
     public AMQQueue<?> createQueue(Map<String, Object> attributes) throws QueueExistsException
     {
@@ -856,12 +860,18 @@ public abstract class AbstractVirtualHos
 
     }
 
+    @Override
+    public void removeExchange(final ExchangeImpl<?> exchange, final boolean force)
+            throws ExchangeIsAlternateException, RequiredExchangeException
+    {
+        doSync(removeExchangeAsync(exchange, force));
+    }
 
     @Override
-    public void removeExchange(ExchangeImpl exchange, boolean force)
+    public ListenableFuture<Void> removeExchangeAsync(ExchangeImpl exchange, boolean force)
             throws ExchangeIsAlternateException, RequiredExchangeException
     {
-        exchange.deleteWithChecks();
+        return exchange.deleteWithChecks();
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Wed Aug  5 15:54:30 2015
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
@@ -60,6 +62,8 @@ public interface VirtualHostImpl< X exte
 
     int removeQueue(Q queue);
 
+    ListenableFuture<Integer> removeQueueAsync(Q queue);
+
     Q createQueue(Map<String, Object> arguments) throws QueueExistsException;
 
     E createExchange(Map<String,Object> attributes)
@@ -67,6 +71,9 @@ public interface VirtualHostImpl< X exte
                    NoFactoryForTypeException;
 
     void removeExchange(E exchange, boolean force) throws ExchangeIsAlternateException,
+                                                                                 RequiredExchangeException;
+
+    ListenableFuture<Void> removeExchangeAsync(E exchange, boolean force) throws ExchangeIsAlternateException,
                                                                  RequiredExchangeException;
 
     E getAttainedExchange(String name);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Wed Aug  5 15:54:30 2015
@@ -165,6 +165,14 @@ class RedirectingVirtualHostImpl
     }
 
     @Override
+    public ListenableFuture<Void> removeExchangeAsync(final ExchangeImpl<?> exchange, final boolean force)
+            throws ExchangeIsAlternateException, RequiredExchangeException
+    {
+        throwUnsupportedForRedirector();
+        return null;
+    }
+
+    @Override
     public MessageDestination getAttainedMessageDestination(final String name)
     {
         return null;
@@ -335,6 +343,13 @@ class RedirectingVirtualHostImpl
     }
 
     @Override
+    public ListenableFuture<Integer> removeQueueAsync(final AMQQueue<?> queue)
+    {
+        throwUnsupportedForRedirector();
+        return null;
+    }
+
+    @Override
     public int removeQueue(final AMQQueue<?> queue)
     {
         throwUnsupportedForRedirector();

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1694253&r1=1694252&r2=1694253&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Aug  5 15:54:30 2015
@@ -889,6 +889,10 @@ public class AMQChannel
         {
             unsubscribeAllConsumers();
             setDefaultQueue(null);
+            if(_modelObject != null)
+            {
+                _modelObject.delete();
+            }
             for (Action<? super AMQChannel> task : _taskList)
             {
                 task.performAction(this);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org