You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/12 16:41:48 UTC

svn commit: r1666224 [2/7] - in /qpid/trunk: ./ qpid/ qpid/cpp/src/ qpid/cpp/src/qpid/broker/ qpid/cpp/src/qpid/management/ qpid/cpp/src/tests/ qpid/java/ qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ qpid/java/bdbstore/src/...

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Thu Mar 12 15:41:46 2015
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -36,6 +37,10 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -63,12 +68,12 @@ import org.apache.qpid.server.queue.Base
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public abstract class AbstractExchange<T extends AbstractExchange<T>>
         extends AbstractConfiguredObject<T>
@@ -479,7 +484,7 @@ public abstract class AbstractExchange<T
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
-            throw new ConnectionScopedRuntimeException("Virtualhost state " + _virtualHost.getState() + " prevents the message from being sent");
+            throw new VirtualHostUnavailableException(this._virtualHost);
         }
 
         List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
@@ -593,9 +598,18 @@ public abstract class AbstractExchange<T
     }
 
     @Override
-    public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+    public boolean addBinding(final String bindingKey, final AMQQueue queue, final Map<String, Object> arguments)
     {
-        return makeBinding(null, bindingKey, queue, arguments, false);
+        return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+        {
+            @Override
+            public ListenableFuture<Boolean> call() throws Exception
+            {
+                return makeBindingAsync(null, bindingKey, queue, arguments, false);
+            }
+        }));
+
+
     }
 
     @Override
@@ -603,12 +617,20 @@ public abstract class AbstractExchange<T
                                   final AMQQueue queue,
                                   final Map<String, Object> arguments)
     {
-        final BindingImpl existingBinding = getBinding(bindingKey, queue);
-        return makeBinding(existingBinding == null ? null : existingBinding.getId(),
-                           bindingKey,
-                           queue,
-                           arguments,
-                           true);
+        return doSync(doOnConfigThread(new Callable<ListenableFuture<Boolean>>()
+        {
+            @Override
+            public ListenableFuture<Boolean> call() throws Exception
+            {
+
+                final BindingImpl existingBinding = getBinding(bindingKey, queue);
+                return makeBindingAsync(existingBinding == null ? null : existingBinding.getId(),
+                                   bindingKey,
+                                   queue,
+                                   arguments,
+                                   true);
+            }
+        }));
     }
 
 
@@ -634,7 +656,15 @@ public abstract class AbstractExchange<T
             doRemoveBinding(b);
             queue.removeBinding(b);
 
-            b.delete();
+            // TODO - RG - Fix bindings!
+            if(getTaskExecutor().isTaskExecutorThread())
+            {
+                b.deleteAsync();
+            }
+            else
+            {
+                b.delete();
+            }
         }
 
     }
@@ -651,7 +681,7 @@ public abstract class AbstractExchange<T
         return _bindingsMap.get(new BindingIdentifier(bindingKey,queue));
     }
 
-    private boolean makeBinding(UUID id,
+    private ListenableFuture<Boolean> makeBindingAsync(UUID id,
                                 String bindingKey,
                                 AMQQueue queue,
                                 Map<String, Object> arguments,
@@ -685,22 +715,45 @@ public abstract class AbstractExchange<T
                 attributes.put(Binding.ID, id);
                 attributes.put(Binding.ARGUMENTS, arguments);
 
-                BindingImpl b = new BindingImpl(attributes, queue, this);
-                b.create(); // Must be called before addBinding as it resolves automated attributes.
+                final BindingImpl b = new BindingImpl(attributes, queue, this);
 
-                addBinding(b);
-                return true;
+                final SettableFuture<Boolean> returnVal = SettableFuture.create();
+
+                Futures.addCallback(b.createAsync(), new FutureCallback<Void>()
+                {
+                    @Override
+                    public void onSuccess(final Void result)
+                    {
+                        try
+                        {
+                            addBinding(b);
+                            returnVal.set(true);
+                        }
+                        catch(Throwable t)
+                        {
+                            returnVal.setException(t);
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable t)
+                    {
+                        returnVal.setException(t);
+                    }
+                }, getTaskExecutor().getExecutor()); // Must be called before addBinding as it resolves automated attributes.
+
+                return returnVal;
             }
             else if(force)
             {
                 Map<String,Object> oldArguments = existingMapping.getArguments();
                 existingMapping.setArguments(arguments);
                 onBindingUpdated(existingMapping, oldArguments);
-                return true;
+                return Futures.immediateFuture(true);
             }
             else
             {
-                return false;
+                return Futures.immediateFuture(false);
             }
         }
     }
@@ -723,22 +776,24 @@ public abstract class AbstractExchange<T
 
 
     @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE)
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
 
     @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
-    private void doDeleteBeforeInitialize()
+    private ListenableFuture<Void>  doDeleteBeforeInitialize()
     {
         preSetAlternateExchange();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
 
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         try
         {
@@ -748,8 +803,9 @@ public abstract class AbstractExchange<T
         }
         catch (ExchangeIsAlternateException e)
         {
-            return;
+
         }
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -860,4 +916,5 @@ public abstract class AbstractExchange<T
         return binding;
     }
 
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java Thu Mar 12 15:41:46 2015
@@ -107,8 +107,4 @@ public interface ExchangeImpl<T extends
         void bindingRemoved(ExchangeImpl exchange, BindingImpl binding);
     }
 
-    public void addBindingListener(BindingListener listener);
-
-    public void removeBindingListener(BindingListener listener);
-
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java Thu Mar 12 15:41:46 2015
@@ -24,10 +24,6 @@ package org.apache.qpid.server.flow;
 
 public interface FlowCreditManager
 {
-    long getMessageCredit();
-
-    long getBytesCredit();
-
     public static interface FlowCreditManagerListener
     {
         void creditStateChanged(boolean hasCredit);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Thu Mar 12 15:41:46 2015
@@ -41,12 +41,23 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonProcessingException;
@@ -68,6 +79,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.encryption.ConfigurationSecretEncrypter;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.util.Strings;
 
@@ -162,7 +174,7 @@ public abstract class AbstractConfigured
 
     private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this);
 
-    @ManagedAttributeField( afterSet = "attainStateIfOpenedOrReopenFailed" )
+    @ManagedAttributeField
     private State _desiredState;
     private boolean _openComplete;
     private boolean _openFailed;
@@ -439,24 +451,84 @@ public abstract class AbstractConfigured
     @Override
     public final void open()
     {
-        if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+        doSync(openAsync());
+    }
+
+
+    public final ListenableFuture<Void> openAsync()
+    {
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+                                {
+                                    @Override
+                                    public ListenableFuture<Void> call() throws Exception
+                                    {
+                                        if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+                                        {
+                                            _openFailed = false;
+                                            OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
+                                            try
+                                            {
+                                                doResolution(true, exceptionHandler);
+                                                doValidation(true, exceptionHandler);
+                                                doOpening(true, exceptionHandler);
+                                                return doAttainState(exceptionHandler);
+                                            }
+                                            catch (RuntimeException e)
+                                            {
+                                                exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+                                                return Futures.immediateFuture(null);
+                                            }
+                                        }
+                                        else
+                                        {
+                                            return Futures.immediateFuture(null);
+                                        }
+
+                                    }
+                                });
+
+    }
+
+    protected final <T> ListenableFuture<T> doOnConfigThread(final Callable<ListenableFuture<T>> action)
+    {
+        final SettableFuture<T> returnVal = SettableFuture.create();
+
+        _taskExecutor.submit(new Task<Void>()
         {
-            _openFailed = false;
-            OpenExceptionHandler exceptionHandler = new OpenExceptionHandler();
-            try
-            {
-                doResolution(true, exceptionHandler);
-                doValidation(true, exceptionHandler);
-                doOpening(true, exceptionHandler);
-                doAttainState(exceptionHandler);
-            }
-            catch(RuntimeException e)
+
+            @Override
+            public Void execute()
             {
-                exceptionHandler.handleException(e, this);
+                try
+                {
+                    Futures.addCallback(action.call(), new FutureCallback<T>()
+                    {
+                        @Override
+                        public void onSuccess(final T result)
+                        {
+                            returnVal.set(result);
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable t)
+                        {
+                            returnVal.setException(t);
+                        }
+                    });
+                }
+                catch (Exception e)
+                {
+                    returnVal.setException(e);
+                }
+                return null;
             }
-        }
+        });
+
+        return returnVal;
     }
 
+
+
     public void registerWithParents()
     {
         for(ConfiguredObject<?> parent : _parents.values())
@@ -468,17 +540,78 @@ public abstract class AbstractConfigured
         }
     }
 
-    protected void closeChildren()
+    private class ChildCounter
     {
+        private final AtomicInteger _count = new AtomicInteger();
+        private final Runnable _task;
+
+        private ChildCounter(final Runnable task)
+        {
+            _task = task;
+        }
+
+        public void incrementCount()
+        {
+            _count.incrementAndGet();
+        }
+
+        public void decrementCount()
+        {
+            if(_count.decrementAndGet() == 0)
+            {
+                _task.run();
+            }
+        }
+    }
+
+    protected final ListenableFuture<Void> closeChildren()
+    {
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        final ChildCounter counter = new ChildCounter(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                returnVal.set(null);
+                LOGGER.debug("All children closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName() );
+
+            }
+        });
+        counter.incrementCount();
+
+
         applyToChildren(new Action<ConfiguredObject<?>>()
         {
             @Override
             public void performAction(final ConfiguredObject<?> child)
             {
-                child.close();
+                counter.incrementCount();
+                ListenableFuture<Void> close = child.closeAsync();
+                Futures.addCallback(close, new FutureCallback<Void>()
+                {
+                    @Override
+                    public void onSuccess(final Void result)
+                    {
+                        counter.decrementCount();
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable t)
+                    {
+                        LOGGER.error("Exception occurred while closing "
+                                     + child.getClass().getSimpleName()
+                                     + " : '"
+                                     + child.getName()
+                                     + "'", t);
+                        // No need to decrement counter as setting the exception will complete the future
+                        returnVal.setException(t);
+                    }
+                }, MoreExecutors.sameThreadExecutor());
             }
         });
 
+        counter.decrementCount();
+
         for(Collection<ConfiguredObject<?>> childList : _children.values())
         {
             childList.clear();
@@ -494,23 +627,60 @@ public abstract class AbstractConfigured
             childNameMap.clear();
         }
 
+        return returnVal;
+    }
+
+    @Override
+    public void close()
+    {
+        doSync(closeAsync());
     }
 
     @Override
-    public final void close()
+    public final ListenableFuture<Void> closeAsync()
     {
-        if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
         {
-            beforeClose();
-            closeChildren();
-            onClose();
-            unregister(false);
+            @Override
+            public ListenableFuture<Void> call() throws Exception
+            {
+                LOGGER.debug("Closing " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+
+                if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+                {
+
+                    return doAfter(beforeClose(), new Callable<ListenableFuture<Void>>()
+                    {
+                        @Override
+                        public ListenableFuture<Void> call() throws Exception
+                        {
+                            return closeChildren();
+                        }
+                    }).then(new Runnable()
+                            {
+                                @Override
+                                public void run()
+                                {
+                                    onClose();
+                                    unregister(false);
+                                    LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+                                }
+                            });
+                }
+                else
+                {
+                    LOGGER.debug("Closed " + AbstractConfiguredObject.this.getClass().getSimpleName() + " : " + getName());
+
+                    return Futures.immediateFuture(null);
+                }
+            }
+        });
 
-        }
     }
 
-    protected void beforeClose()
+    protected ListenableFuture<Void> beforeClose()
     {
+        return Futures.immediateFuture(null);
     }
 
     protected void onClose()
@@ -519,48 +689,65 @@ public abstract class AbstractConfigured
 
     public final void create()
     {
-        if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+        doSync(createAsync());
+    }
+
+    public final ListenableFuture<Void> createAsync()
+    {
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
         {
-            final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
-            if(currentUser != null)
+            @Override
+            public ListenableFuture<Void> call() throws Exception
             {
-                String currentUserName = currentUser.getName();
-                _attributes.put(LAST_UPDATED_BY, currentUserName);
-                _attributes.put(CREATED_BY, currentUserName);
-                _lastUpdatedBy = currentUserName;
-                _createdBy = currentUserName;
-            }
-            final long currentTime = System.currentTimeMillis();
-            _attributes.put(LAST_UPDATED_TIME, currentTime);
-            _attributes.put(CREATED_TIME, currentTime);
-            _lastUpdatedTime = currentTime;
-            _createdTime = currentTime;
+                if (_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
+                {
+                    final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
+                    if (currentUser != null)
+                    {
+                        String currentUserName = currentUser.getName();
+                        _attributes.put(LAST_UPDATED_BY, currentUserName);
+                        _attributes.put(CREATED_BY, currentUserName);
+                        _lastUpdatedBy = currentUserName;
+                        _createdBy = currentUserName;
+                    }
+                    final long currentTime = System.currentTimeMillis();
+                    _attributes.put(LAST_UPDATED_TIME, currentTime);
+                    _attributes.put(CREATED_TIME, currentTime);
+                    _lastUpdatedTime = currentTime;
+                    _createdTime = currentTime;
 
-            CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
-            try
-            {
-                doResolution(true, createExceptionHandler);
-                doValidation(true, createExceptionHandler);
-                validateOnCreate();
-                registerWithParents();
-            }
-            catch(RuntimeException e)
-            {
-                createExceptionHandler.handleException(e, this);
-            }
+                    CreateExceptionHandler createExceptionHandler = new CreateExceptionHandler();
+                    try
+                    {
+                        doResolution(true, createExceptionHandler);
+                        doValidation(true, createExceptionHandler);
+                        validateOnCreate();
+                        registerWithParents();
+                    }
+                    catch (RuntimeException e)
+                    {
+                        createExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+                    }
+
+                    final AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler =
+                            new CreateExceptionHandler(true);
+
+                    try
+                    {
+                        doCreation(true, unregisteringExceptionHandler);
+                        doOpening(true, unregisteringExceptionHandler);
+                        return doAttainState(unregisteringExceptionHandler);
+                    }
+                    catch (RuntimeException e)
+                    {
+                        unregisteringExceptionHandler.handleException(e, AbstractConfiguredObject.this);
+                    }
+                }
+                return Futures.immediateFuture(null);
 
-            AbstractConfiguredObjectExceptionHandler unregisteringExceptionHandler = new CreateExceptionHandler(true);
-            try
-            {
-                doCreation(true, unregisteringExceptionHandler);
-                doOpening(true, unregisteringExceptionHandler);
-                doAttainState(unregisteringExceptionHandler);
-            }
-            catch(RuntimeException e)
-            {
-                unregisteringExceptionHandler.handleException(e, this);
             }
-        }
+        });
+
     }
 
     protected void validateOnCreate()
@@ -610,8 +797,40 @@ public abstract class AbstractConfigured
     {
     }
 
-    private void doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
+    private ListenableFuture<Void> doAttainState(final AbstractConfiguredObjectExceptionHandler exceptionHandler)
     {
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        final ChildCounter counter = new ChildCounter(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    attainState().addListener(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            returnVal.set(null);
+                        }
+                    }, getTaskExecutor().getExecutor());
+                }
+                catch(RuntimeException e)
+                {
+                    try
+                    {
+                        exceptionHandler.handleException(e, AbstractConfiguredObject.this);
+                        returnVal.set(null);
+                    }
+                    catch(Throwable t)
+                    {
+                        returnVal.setException(t);
+                    }
+                }
+            }
+        });
+        counter.incrementCount();
         applyToChildren(new Action<ConfiguredObject<?>>()
         {
             @Override
@@ -619,22 +838,43 @@ public abstract class AbstractConfigured
             {
                 if (child instanceof AbstractConfiguredObject)
                 {
-                    AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
+                    final AbstractConfiguredObject configuredObject = (AbstractConfiguredObject) child;
                     if (configuredObject._dynamicState.get() == DynamicState.OPENED)
                     {
-                        try
-                        {
-                            configuredObject.doAttainState(exceptionHandler);
-                        }
-                        catch (RuntimeException e)
-                        {
-                            exceptionHandler.handleException(e, configuredObject);
-                        }
+                        counter.incrementCount();
+                        Futures.addCallback(configuredObject.doAttainState(exceptionHandler),
+                                            new FutureCallback()
+                                            {
+                                                @Override
+                                                public void onSuccess(final Object result)
+                                                {
+                                                    counter.decrementCount();
+                                                }
+
+                                                @Override
+                                                public void onFailure(final Throwable t)
+                                                {
+                                                    try
+                                                    {
+                                                        if (t instanceof RuntimeException)
+                                                        {
+                                                            exceptionHandler.handleException((RuntimeException) t,
+                                                                                             configuredObject);
+                                                        }
+                                                    }
+                                                    finally
+                                                    {
+                                                        counter.decrementCount();
+                                                    }
+                                                }
+                                            },getTaskExecutor().getExecutor());
+
                     }
                 }
             }
         });
-        attainState();
+        counter.decrementCount();
+        return returnVal;
     }
 
     protected void doOpening(boolean skipCheck, final AbstractConfiguredObjectExceptionHandler exceptionHandler)
@@ -890,16 +1130,17 @@ public abstract class AbstractConfigured
         }
     }
 
-    private void attainStateIfOpenedOrReopenFailed()
+    private ListenableFuture<Void> attainStateIfOpenedOrReopenFailed()
     {
         if (_openComplete || getDesiredState() == State.DELETED)
         {
-            attainState();
+            return attainState();
         }
         else if (_openFailed)
         {
-            open();
+            return openAsync();
         }
+        return Futures.immediateFuture(null);
     }
 
     protected void onOpen()
@@ -907,10 +1148,11 @@ public abstract class AbstractConfigured
 
     }
 
-    protected void attainState()
+    protected ListenableFuture<Void> attainState()
     {
         State currentState = getState();
         State desiredState = getDesiredState();
+        ListenableFuture<Void> returnVal;
         if(currentState != desiredState)
         {
             Method stateChangingMethod = getStateChangeMethod(currentState, desiredState);
@@ -918,7 +1160,7 @@ public abstract class AbstractConfigured
             {
                 try
                 {
-                    stateChangingMethod.invoke(this);
+                    returnVal = (ListenableFuture<Void>) stateChangingMethod.invoke(this);
                 }
                 catch (IllegalAccessException e)
                 {
@@ -938,7 +1180,16 @@ public abstract class AbstractConfigured
                     throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying);
                 }
             }
+            else
+            {
+                returnVal = Futures.immediateFuture(null);
+            }
         }
+        else
+        {
+            returnVal = Futures.immediateFuture(null);
+        }
+        return returnVal;
     }
 
     private Method getStateChangeMethod(final State currentState, final State desiredState)
@@ -1013,44 +1264,72 @@ public abstract class AbstractConfigured
     }
 
 
-    private State setDesiredState(final State desiredState)
+    private ListenableFuture<Void> setDesiredState(final State desiredState)
             throws IllegalStateTransitionException, AccessControlException
     {
-
-        return runTask(new Task<State>()
+        return doOnConfigThread(new Callable<ListenableFuture<Void>>()
+        {
+            @Override
+            public ListenableFuture<Void> call() throws Exception
+            {
+                final State state = getState();
+                final State currentDesiredState = getDesiredState();
+                if(desiredState == currentDesiredState && desiredState != state)
+                {
+                    return doAfter(attainStateIfOpenedOrReopenFailed(), new Runnable()
+                    {
+                        @Override
+                        public void run()
                         {
-                            @Override
-                            public State execute()
+                            final State currentState = getState();
+                            if (currentState != state)
                             {
+                                notifyStateChanged(state, currentState);
+                            }
 
-                                State state = getState();
-                                if(desiredState == getDesiredState() && desiredState != state)
-                                {
-                                    attainStateIfOpenedOrReopenFailed();
-                                    final State currentState = getState();
-                                    if (currentState != state)
-                                    {
-                                        notifyStateChanged(state, currentState);
-                                    }
-                                    return currentState;
-                                }
-                                else
-                                {
-                                    setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE,
-                                                                                           desiredState));
+                        }
+                    });
+                }
+                else
+                {
+                    ConfiguredObject<?> proxyForValidation =
+                            createProxyForValidation(Collections.<String, Object>singletonMap(
+                                    ConfiguredObject.DESIRED_STATE,
+                                    desiredState));
+                    Set<String> desiredStateOnlySet = Collections.unmodifiableSet(
+                            Collections.singleton(ConfiguredObject.DESIRED_STATE));
+                    authoriseSetAttributes(proxyForValidation, desiredStateOnlySet);
+                    validateChange(proxyForValidation, desiredStateOnlySet);
+
+                    if (changeAttribute(ConfiguredObject.DESIRED_STATE, currentDesiredState, desiredState))
+                    {
+                        attributeSet(ConfiguredObject.DESIRED_STATE,
+                                     currentDesiredState,
+                                     desiredState);
+
+                        return doAfter(attainStateIfOpenedOrReopenFailed(),new Runnable()
+                                       {
+                                           @Override
+                                           public void run()
+                                           {
+                                               if (getState() == desiredState)
+                                               {
+                                                   notifyStateChanged(state, desiredState);
+                                               }
+
+                                           }
+                                       }
+                            );
+                    }
+                    else
+                    {
+                        return Futures.immediateFuture(null);
+                    }
+                }
+
+            }
+        });
 
-                                    if (getState() == desiredState)
-                                    {
-                                        notifyStateChanged(state, desiredState);
-                                        return desiredState;
-                                    }
-                                    else
-                                    {
-                                        return getState();
-                                    }
-                                }
-                            }
-                        });
     }
 
     @Override
@@ -1429,20 +1708,62 @@ public abstract class AbstractConfigured
 
     public final void stop()
     {
-        setDesiredState(State.STOPPED);
+        doSync(setDesiredState(State.STOPPED));
     }
 
     public final void delete()
     {
-        if(getState() == State.UNINITIALIZED)
+        doSync(deleteAsync());
+    }
+
+    protected final <R>  R doSync(ListenableFuture<R> async)
+    {
+        try
+        {
+            return async.get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new ServerScopedRuntimeException(e);
+        }
+        catch (ExecutionException e)
         {
-            _desiredState = State.DELETED;
+            Throwable cause = e.getCause();
+            if(cause instanceof RuntimeException)
+            {
+                throw (RuntimeException) cause;
+            }
+            else if(cause instanceof Error)
+            {
+                throw (Error) cause;
+            }
+            else if(cause != null)
+            {
+                throw new ServerScopedRuntimeException(cause);
+            }
+            else
+            {
+                throw new ServerScopedRuntimeException(e);
+            }
+
         }
-        setDesiredState(State.DELETED);
+    }
+
+    public final ListenableFuture<Void> deleteAsync()
+    {
+        return setDesiredState(State.DELETED);
+    }
+
+    public final void start()
+    {
+        doSync(startAsync());
+    }
 
+    public ListenableFuture<Void> startAsync()
+    {
+        return setDesiredState(State.ACTIVE);
     }
 
-    public final void start() { setDesiredState(State.ACTIVE); }
 
     protected void deleted()
     {
@@ -1527,24 +1848,175 @@ public abstract class AbstractConfigured
         _taskExecutor.run(task);
     }
 
+    @Override
+    public void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+    {
+        doSync(setAttributesAsync(attributes));
+    }
+
+    protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Runnable second)
+    {
+        return doAfter(getTaskExecutor().getExecutor(), first, second);
+    }
+
+    protected static final ChainedListenableFuture doAfter(Executor executor, ListenableFuture<Void> first, final Runnable second)
+    {
+        final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+        Futures.addCallback(first, new FutureCallback<Void>()
+        {
+            @Override
+            public void onSuccess(final Void result)
+            {
+                try
+                {
+                    second.run();
+                    returnVal.set(null);
+                }
+                catch(Throwable e)
+                {
+                    returnVal.setException(e);
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                returnVal.setException(t);
+            }
+        }, executor);
+
+        return returnVal;
+    }
+
+    public static interface ChainedListenableFuture extends ListenableFuture<Void>
+    {
+        ChainedListenableFuture then(Runnable r);
+        ChainedListenableFuture then(Callable<ListenableFuture<Void>> r);
+    }
+
+    public static class ChainedSettableFuture extends AbstractFuture<Void> implements ChainedListenableFuture
+    {
+        private final Executor _exector;
+
+        public ChainedSettableFuture(final Executor executor)
+        {
+            _exector = executor;
+        }
+
+        @Override
+        public boolean set(Void value)
+        {
+            return super.set(value);
+        }
+
+        @Override
+        public boolean setException(Throwable throwable)
+        {
+            return super.setException(throwable);
+        }
+
+        @Override
+        public ChainedListenableFuture then(final Runnable r)
+        {
+            return doAfter(_exector, this, r);
+        }
+
+        @Override
+        public ChainedListenableFuture then(final Callable<ListenableFuture<Void>> r)
+        {
+            return doAfter(_exector, this,r);
+        }
+    }
+
+    protected final ChainedListenableFuture doAfter(ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+    {
+        return doAfter(getTaskExecutor().getExecutor(), first, second);
+    }
+
+    protected static final ChainedListenableFuture doAfter(final Executor executor, ListenableFuture<Void> first, final Callable<ListenableFuture<Void>> second)
+    {
+        final ChainedSettableFuture returnVal = new ChainedSettableFuture(executor);
+        Futures.addCallback(first, new FutureCallback<Void>()
+        {
+            @Override
+            public void onSuccess(final Void result)
+            {
+                try
+                {
+                    final ListenableFuture<Void> future = second.call();
+                    Futures.addCallback(future, new FutureCallback<Void>()
+                    {
+                        @Override
+                        public void onSuccess(final Void result)
+                        {
+                            returnVal.set(null);
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable t)
+                        {
+                            returnVal.setException(t);
+                        }
+                    }, executor);
+
+                }
+                catch(Throwable e)
+                {
+                    returnVal.setException(e);
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                returnVal.setException(t);
+            }
+        }, executor);
+
+        return returnVal;
+    }
 
     @Override
-    public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+    public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
     {
+        final Map<String,Object> updateAttributes = new HashMap<>(attributes);
+        Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
         runTask(new VoidTask()
         {
             @Override
             public void execute()
             {
                 authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
-                changeAttributes(attributes);
+                validateChange(createProxyForValidation(attributes), attributes.keySet());
+
+                changeAttributes(updateAttributes);
             }
         });
+        if(desiredState != null)
+        {
+            State state;
+            if(desiredState instanceof State)
+            {
+                state = (State)desiredState;
+            }
+            else if(desiredState instanceof String)
+            {
+                state = State.valueOf((String)desiredState);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
+            }
+            return setDesiredState(state);
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
+        }
     }
 
     protected void changeAttributes(final Map<String, Object> attributes)
     {
-        validateChange(createProxyForValidation(attributes), attributes.keySet());
         Collection<String> names = getAttributeNames();
         for (String name : names)
         {
@@ -1938,6 +2410,74 @@ public abstract class AbstractConfigured
         }
     }
 
+    private static class CloseResult implements FutureResult
+    {
+        private volatile FutureResult _childFutureResult;
+
+        @Override
+        public boolean isComplete()
+        {
+            return _childFutureResult != null && _childFutureResult.isComplete();
+        }
+
+        @Override
+        public void waitForCompletion()
+        {
+            synchronized (this)
+            {
+                while (_childFutureResult == null)
+                {
+                    try
+                    {
+                        wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+
+                    }
+                }
+            }
+            _childFutureResult.waitForCompletion();
+
+        }
+
+        @Override
+        public void waitForCompletion(final long timeout) throws TimeoutException
+        {
+            long startTime = System.currentTimeMillis();
+            long remaining = timeout;
+
+            synchronized (this)
+            {
+                while (_childFutureResult == null && remaining > 0)
+                {
+                    try
+                    {
+                        wait(remaining);
+                    }
+                    catch (InterruptedException e)
+                    {
+
+                    }
+                    remaining = startTime + timeout - System.currentTimeMillis();
+
+                    if(remaining <= 0)
+                    {
+                        throw new TimeoutException("Completion did not occur within given timeout: " + timeout);
+                    }
+                }
+            }
+            _childFutureResult.waitForCompletion(remaining);
+
+        }
+
+        public synchronized void setChildFutureResult(final FutureResult childFutureResult)
+        {
+            _childFutureResult = childFutureResult;
+            notifyAll();
+        }
+    }
+
 
     private static class AttributeGettingHandler implements InvocationHandler
     {
@@ -2127,7 +2667,8 @@ public abstract class AbstractConfigured
             {
                 if (source.getState() != State.DELETED)
                 {
-                    source.delete();
+                    // TODO - RG - This isn't right :-(
+                    source.deleteAsync();
                 }
             }
             finally

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,10 @@ package org.apache.qpid.server.model;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
 import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
 import org.apache.qpid.server.store.ConfiguredObjectDependency;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -59,6 +63,26 @@ abstract public class AbstractConfigured
         return instance;
     }
 
+
+    @Override
+    public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory,
+                    final Map<String, Object> attributes,
+                    final ConfiguredObject<?>... parents)
+    {
+        final SettableFuture<X> returnVal = SettableFuture.create();
+        final X instance = createInstance(attributes, parents);
+        final ListenableFuture<Void> createFuture = instance.createAsync();
+        createFuture.addListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                returnVal.set(instance);
+            }
+        }, MoreExecutors.sameThreadExecutor());
+        return returnVal;
+    }
+
     protected abstract X createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents);
 
     public final <C extends ConfiguredObject<?>> C getParent(Class<C> parentClass, ConfiguredObject<?>... parents)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java Thu Mar 12 15:41:46 2015
@@ -31,6 +31,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
@@ -194,11 +197,11 @@ public abstract class AbstractSystemConf
     }
 
     @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
-    protected void activate()
+    protected ListenableFuture<Void> activate()
     {
         final EventLogger eventLogger = _eventLogger;
 
-        EventLogger startupLogger;
+        final EventLogger startupLogger;
         if (isStartupLoggedToSystemOut())
         {
             //Create the composite (logging+SystemOut MessageLogger to be used during startup
@@ -232,17 +235,34 @@ public abstract class AbstractSystemConf
         BrokerStoreUpgraderAndRecoverer upgrader = new BrokerStoreUpgraderAndRecoverer(this);
         upgrader.perform();
 
-        Broker broker = getBroker();
+        final Broker broker = getBroker();
 
         broker.setEventLogger(startupLogger);
-        broker.open();
-
-        if (broker.getState() == State.ACTIVE)
-        {
-            startupLogger.message(BrokerMessages.READY());
-            broker.setEventLogger(eventLogger);
-        }
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        broker.openAsync().addListener(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+
+                            if (broker.getState() == State.ACTIVE)
+                            {
+                                startupLogger.message(BrokerMessages.READY());
+                                broker.setEventLogger(eventLogger);
+                            }
+                        }
+                        finally
+                        {
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor()
+                                      );
 
+        return returnVal;
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java Thu Mar 12 15:41:46 2015
@@ -45,5 +45,4 @@ public interface Binding<X extends Bindi
     @ManagedStatistic
     long getMatches();
 
-    void delete();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Thu Mar 12 15:41:46 2015
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 
@@ -236,6 +238,8 @@ public interface ConfiguredObject<X exte
                                                ConfiguredObject... otherParents);
 
     void setAttributes(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+    ListenableFuture<Void> setAttributesAsync(Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException;
+
 
     Class<? extends ConfiguredObject> getCategoryClass();
     Class<? extends ConfiguredObject> getTypeClass();
@@ -248,8 +252,12 @@ public interface ConfiguredObject<X exte
     ConfiguredObjectRecord asObjectRecord();
 
     void open();
+    ListenableFuture<Void> openAsync();
 
     void close();
+    ListenableFuture<Void> closeAsync();
+
+    ListenableFuture<Void> deleteAsync();
 
     TaskExecutor getTaskExecutor();
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
 import java.util.Collection;
 import java.util.Map;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.UnresolvedConfiguredObject;
@@ -34,6 +36,8 @@ public interface ConfiguredObjectFactory
 
     <X extends ConfiguredObject<X>> X create(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
 
+    <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz, Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
 
 
     <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass,

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java Thu Mar 12 15:41:46 2015
@@ -26,6 +26,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -112,6 +114,18 @@ public class ConfiguredObjectFactoryImpl
         return factory.create(this, attributes, parents);
     }
 
+
+    @Override
+    public <X extends ConfiguredObject<X>> ListenableFuture<X> createAsync(Class<X> clazz,
+                                                    final Map<String, Object> attributes,
+                                                    final ConfiguredObject<?>... parents)
+    {
+        ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(clazz, attributes);
+
+        return factory.createAsync(this, attributes, parents);
+    }
+
+
     @Override
     public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass,
                                                                                                          Map<String, Object> attributes)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java Thu Mar 12 15:41:46 2015
@@ -40,6 +40,7 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.plugin.ConfiguredObjectRegistration;
@@ -801,20 +802,37 @@ public class ConfiguredObjectTypeRegistr
         {
             if(m.isAnnotationPresent(StateTransition.class))
             {
-                if(m.getParameterTypes().length == 0)
+                if(ListenableFuture.class.isAssignableFrom(m.getReturnType()))
                 {
-                    m.setAccessible(true);
-                    StateTransition annotation = m.getAnnotation(StateTransition.class);
+                    if (m.getParameterTypes().length == 0)
+                    {
+                        m.setAccessible(true);
+                        StateTransition annotation = m.getAnnotation(StateTransition.class);
+
+                        for (State state : annotation.currentState())
+                        {
+                            addStateTransition(state, annotation.desiredState(), m, map);
+                        }
 
-                    for(State state : annotation.currentState())
+                    }
+                    else
                     {
-                        addStateTransition(state, annotation.desiredState(), m, map);
+                        throw new ServerScopedRuntimeException(
+                                "A state transition method must have no arguments. Method "
+                                + m.getName()
+                                + " on "
+                                + clazz.getName()
+                                + " does not meet this criteria.");
                     }
-
                 }
                 else
                 {
-                    throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria.");
+                    throw new ServerScopedRuntimeException(
+                            "A state transition method must return a ListenableFuture. Method "
+                            + m.getName()
+                            + " on "
+                            + clazz.getName()
+                            + " does not meet this criteria.");
                 }
             }
         }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Thu Mar 12 15:41:46 2015
@@ -103,7 +103,6 @@ public interface Connection<X extends Co
     //children
     Collection<Session> getSessions();
 
-    void delete();
 
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java Thu Mar 12 15:41:46 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model;
 import java.util.Collection;
 import java.util.Set;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 @ManagedObject
 public interface Port<X extends Port<X>> extends ConfiguredObject<X>
 {
@@ -76,4 +78,6 @@ public interface Port<X extends Port<X>>
 
     void start();
 
+    ListenableFuture<Void> startAsync();
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Thu Mar 12 15:41:46 2015
@@ -147,8 +147,6 @@ public interface VirtualHost<X extends V
 
     void stop();
 
-    void delete();
-
     String getRedirectHost(AmqpPort<?> port);
 
     public static interface Transaction

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java Thu Mar 12 15:41:46 2015
@@ -35,6 +35,9 @@ import java.util.regex.Pattern;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.common.QpidProperties;
@@ -234,13 +237,40 @@ public class BrokerAdapter extends Abstr
     }
 
     @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         if(_parent.isManagementMode())
         {
-            _managementModeAuthenticationProvider.open();
+            final SettableFuture<Void> returnVal = SettableFuture.create();
+
+            _managementModeAuthenticationProvider.openAsync().addListener(
+                    new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                activateWithoutManagementMode();
+                            }
+                            finally
+                            {
+                                returnVal.set(null);
+                            }
+                        }
+                    }, getTaskExecutor().getExecutor()
+                                                                         );
+            return returnVal;
+        }
+        else
+        {
+            activateWithoutManagementMode();
+            return Futures.immediateFuture(null);
         }
+    }
 
+    private void activateWithoutManagementMode()
+    {
         boolean hasBrokerAnyErroredChildren = false;
 
         for (final Class<? extends ConfiguredObject> childClass : getModel().getChildTypes(getCategoryClass()))

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Thu Mar 12 15:41:46 2015
@@ -27,10 +27,13 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.CloseFuture;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Port;
@@ -51,6 +54,7 @@ public final class ConnectionAdapter ext
     private final Action _underlyingConnectionDeleteTask;
     private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
     private AMQConnectionModel _underlyingConnection;
+    private final AtomicBoolean _closing = new AtomicBoolean();
 
     public ConnectionAdapter(final AMQConnectionModel conn)
     {
@@ -156,17 +160,59 @@ public final class ConnectionAdapter ext
     }
 
     @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
-        closeUnderlyingConnection();
-        deleted();
-        setState(State.DELETED);
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        asyncClose().addListener(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            deleted();
+                            setState(State.DELETED);
+                        }
+                        finally
+                        {
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor()
+                                );
+        return returnVal;
+    }
+
+    @Override
+    protected ListenableFuture<Void> beforeClose()
+    {
+        _closing.set(true);
+
+        return asyncClose();
+
+    }
+
+    private ListenableFuture<Void> asyncClose()
+    {
+        final SettableFuture<Void> closeFuture = SettableFuture.create();
+
+        _underlyingConnection.addDeleteTask(new Action()
+        {
+            @Override
+            public void performAction(final Object object)
+            {
+                closeFuture.set(null);
+            }
+        });
+
+        _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+        return closeFuture;
     }
 
     @Override
     protected void onClose()
     {
-        closeUnderlyingConnection();
     }
 
     @Override
@@ -233,23 +279,54 @@ public final class ConnectionAdapter ext
         // SessionAdapter installs delete task to cause session model object to delete
     }
 
-    private void closeUnderlyingConnection()
+
+    private static class ConnectionCloseFuture implements CloseFuture
     {
-        if (_underlyingClosed.compareAndSet(false, true))
+        private boolean _closed;
+
+        public synchronized void connectionClosed()
+        {
+            _closed = true;
+            notifyAll();
+
+        }
+
+        @Override
+        public void runWhenComplete(final Runnable closeRunnable)
         {
-            _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
-            try
+            if (_closed )
             {
-                _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+                closeRunnable.run();
             }
-            catch (Exception e)
+            else
             {
-                LOGGER.warn("Exception closing connection "
-                             + _underlyingConnection.getConnectionId()
-                             + " from "
-                             + _underlyingConnection.getRemoteAddressString(), e);
-            }
+                Thread t = new Thread(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        synchronized (ConnectionCloseFuture.this)
+                        {
+                            while (!_closed)
+                            {
+                                try
+                                {
+                                    ConnectionCloseFuture.this.wait();
+                                }
+                                catch (InterruptedException e)
+                                {
+                                }
+                            }
+
+                            closeRunnable.run();
+                        }
+                    }
+                });
+
+                t.setDaemon(true);
+                t.start();
 
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java Thu Mar 12 15:41:46 2015
@@ -31,6 +31,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -145,7 +148,8 @@ public class FileBasedGroupProviderImpl
             GroupAdapter groupAdapter = new GroupAdapter(attrMap);
             principals.add(groupAdapter);
             groupAdapter.registerWithParents();
-            groupAdapter.open();
+            // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread
+            groupAdapter.openAsync();
         }
 
     }
@@ -261,7 +265,7 @@ public class FileBasedGroupProviderImpl
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         if (_groupDatabase != null)
         {
@@ -278,29 +282,48 @@ public class FileBasedGroupProviderImpl
                 throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath()));
             }
         }
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED )
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
-        close();
-        File file = new File(getPath());
-        if (file.exists())
-        {
-            if (!file.delete())
-            {
-                throw new IllegalConfigurationException("Cannot delete group file");
-            }
-        }
-
-        deleted();
-        setState(State.DELETED);
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        closeAsync().addListener(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            File file = new File(getPath());
+                            if (file.exists())
+                            {
+                                if (!file.delete())
+                                {
+                                    throw new IllegalConfigurationException("Cannot delete group file");
+                                }
+                            }
+
+                            deleted();
+                            setState(State.DELETED);
+                        }
+                        finally
+                        {
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor()
+                           );
+        return returnVal;
     }
 
     @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
-    private void startQuiesced()
+    private ListenableFuture<Void> startQuiesced()
     {
         setState(State.QUIESCED);
+        return Futures.immediateFuture(null);
     }
 
     public Set<Principal> getGroupPrincipalsForUser(String username)
@@ -352,9 +375,10 @@ public class FileBasedGroupProviderImpl
         }
 
         @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
-        private void activate()
+        private ListenableFuture<Void> activate()
         {
             setState(State.ACTIVE);
+            return Futures.immediateFuture(null);
         }
 
         @Override
@@ -371,7 +395,8 @@ public class FileBasedGroupProviderImpl
                 attrMap.put(GroupMember.NAME, principal.getName());
                 GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap);
                 groupMemberAdapter.registerWithParents();
-                groupMemberAdapter.open();
+                // todo - this will be safe, but the synchronous open should not be called from the management thread
+                groupMemberAdapter.openAsync();
                 members.add(groupMemberAdapter);
             }
             _groupPrincipal = new GroupPrincipal(getName());
@@ -432,11 +457,12 @@ public class FileBasedGroupProviderImpl
         }
 
         @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED )
-        private void doDelete()
+        private ListenableFuture<Void> doDelete()
         {
             _groupDatabase.removeGroup(getName());
             deleted();
             setState(State.DELETED);
+            return Futures.immediateFuture(null);
         }
 
         @Override
@@ -494,17 +520,19 @@ public class FileBasedGroupProviderImpl
             }
 
             @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
-            private void activate()
+            private ListenableFuture<Void> activate()
             {
                 setState(State.ACTIVE);
+                return Futures.immediateFuture(null);
             }
 
             @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
-            private void doDelete()
+            private ListenableFuture<Void> doDelete()
             {
                 _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
                 deleted();
                 setState(State.DELETED);
+                return Futures.immediateFuture(null);
             }
 
             @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java Thu Mar 12 15:41:46 2015
@@ -37,16 +37,17 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.util.BaseAction;
-import org.apache.qpid.server.util.FileHelper;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonProcessingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 import org.codehaus.jackson.type.TypeReference;
 
+import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.AuthenticationProvider;
@@ -55,6 +56,8 @@ import org.apache.qpid.server.model.Mana
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
 
 
 public class FileSystemPreferencesProviderImpl
@@ -128,7 +131,7 @@ public class FileSystemPreferencesProvid
     }
 
     @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         if (_store != null)
         {
@@ -138,6 +141,7 @@ public class FileSystemPreferencesProvid
         {
             throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() );
         }
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -171,33 +175,52 @@ public class FileSystemPreferencesProvid
     }
 
     @StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED)
-    private void doQuiesce()
+    private ListenableFuture<Void> doQuiesce()
     {
         if(_store != null)
         {
             _store.close();
         }
         setState(State.QUIESCED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED )
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
-        close();
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        closeAsync().addListener(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            if(_store != null)
+                            {
+                                _store.close();
+                                _store.delete();
+                                deleted();
+                                _authenticationProvider.setPreferencesProvider(null);
+
+                            }
+                            setState(State.DELETED);
+                        }
+                        finally
+                        {
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor()
+                           );
 
-        if(_store != null)
-        {
-            _store.close();
-            _store.delete();
-            deleted();
-            _authenticationProvider.setPreferencesProvider(null);
+        return returnVal;
 
-        }
-        setState(State.DELETED);
     }
 
     @StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE )
-    private void restart()
+    private ListenableFuture<Void> restart()
     {
         if (_store == null)
         {
@@ -206,6 +229,7 @@ public class FileSystemPreferencesProvid
 
         _store.open();
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Thu Mar 12 15:41:46 2015
@@ -26,6 +26,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
@@ -169,10 +172,11 @@ final class SessionAdapter extends Abstr
     }
 
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java Thu Mar 12 15:41:46 2015
@@ -27,6 +27,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -226,14 +229,24 @@ abstract public class AbstractPort<X ext
     }
 
     @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED )
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
-        close();
-        setState(State.DELETED);
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        closeAsync().addListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                setState(State.DELETED);
+                returnVal.set(null);
+
+            }
+        }, getTaskExecutor().getExecutor());
+        return returnVal;
     }
 
     @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE )
-    protected void activate()
+    protected ListenableFuture<Void> activate()
     {
         try
         {
@@ -244,12 +257,14 @@ abstract public class AbstractPort<X ext
             setState(State.ERRORED);
             throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e);
         }
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
-    private void startQuiesced()
+    private ListenableFuture<Void> startQuiesced()
     {
         setState(State.QUIESCED);
+        return Futures.immediateFuture(null);
     }
 
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java Thu Mar 12 15:41:46 2015
@@ -40,6 +40,8 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -118,6 +120,8 @@ public class AmqpPortImpl extends Abstra
 
     private final Broker<?> _broker;
     private AcceptingTransport _transport;
+    private final AtomicBoolean _closing = new AtomicBoolean();
+    private final SettableFuture _noConnectionsRemain = SettableFuture.create();
 
     @ManagedObjectFactoryConstructor
     public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -254,6 +258,19 @@ public class AmqpPortImpl extends Abstra
     }
 
     @Override
+    protected ListenableFuture<Void> beforeClose()
+    {
+        _closing.set(true);
+
+        if (_connectionCount.get() == 0)
+        {
+            _noConnectionsRemain.set(null);
+        }
+
+        return _noConnectionsRemain;
+    }
+
+    @Override
     protected void onClose()
     {
         if (_transport != null)
@@ -262,6 +279,8 @@ public class AmqpPortImpl extends Abstra
             {
                 _broker.getEventLogger().message(BrokerMessages.SHUTTING_DOWN(String.valueOf(transport), getPort()));
             }
+
+
             _transport.close();
         }
     }
@@ -500,6 +519,11 @@ public class AmqpPortImpl extends Abstra
            _connectionCountWarningGiven.compareAndSet(true,false);
         }
 
+        if (_closing.get() && _connectionCount.get() == 0)
+        {
+            _noConnectionsRemain.set(null);
+        }
+
         return openConnections;
     }
 
@@ -511,7 +535,7 @@ public class AmqpPortImpl extends Abstra
     @Override
     public boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress)
     {
-        return _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections;
+        return !_closing.get() && ( _maxOpenConnections < 0 || _connectionCount.get() < _maxOpenConnections );
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org